mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Add support for Synapse running on sqlite3 database in scripts/s3_media_upload
(#71)
Signed-off-by: Alex Auvolat <alex@adnab.me>
This commit is contained in:
parent
f821cff55e
commit
8926b4e417
1 changed files with 52 additions and 22 deletions
|
@ -193,7 +193,7 @@ def run_write(sqlite_conn, output_file):
|
||||||
print(thumbnail_path, file=output_file)
|
print(thumbnail_path, file=output_file)
|
||||||
|
|
||||||
|
|
||||||
def run_update_db(postgres_conn, sqlite_conn, before_date):
|
def run_update_db(synapse_db_conn, sqlite_conn, before_date):
|
||||||
"""Entry point for update-db command
|
"""Entry point for update-db command
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -219,13 +219,37 @@ def run_update_db(postgres_conn, sqlite_conn, before_date):
|
||||||
)
|
)
|
||||||
|
|
||||||
update_count = 0
|
update_count = 0
|
||||||
|
|
||||||
with sqlite_conn:
|
with sqlite_conn:
|
||||||
sqlite_cur = sqlite_conn.cursor()
|
sqlite_cur = sqlite_conn.cursor()
|
||||||
with postgres_conn.cursor() as pg_curs:
|
|
||||||
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
|
||||||
pg_curs.execute(sql, (last_access_ts,))
|
|
||||||
|
|
||||||
for (origin, media_id, filesystem_id) in pg_curs:
|
if isinstance(synapse_db_conn, sqlite3.Connection):
|
||||||
|
synapse_db_curs = synapse_db_conn.cursor()
|
||||||
|
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
||||||
|
synapse_db_curs.execute(sql.replace("%s", "?"), (last_access_ts,))
|
||||||
|
update_count += update_db_process_rows(
|
||||||
|
mtype, sqlite_cur, synapse_db_curs
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
with synapse_db_conn.cursor() as synapse_db_curs:
|
||||||
|
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
||||||
|
synapse_db_curs.execute(sql, (last_access_ts,))
|
||||||
|
update_count += update_db_process_rows(
|
||||||
|
mtype, sqlite_cur, synapse_db_curs
|
||||||
|
)
|
||||||
|
|
||||||
|
print("Synced", update_count, "new rows")
|
||||||
|
|
||||||
|
synapse_db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def update_db_process_rows(mtype, sqlite_cur, synapse_db_curs):
|
||||||
|
"""Process rows extracted from Synapse's database and insert them in cache
|
||||||
|
"""
|
||||||
|
update_count = 0
|
||||||
|
|
||||||
|
for (origin, media_id, filesystem_id) in synapse_db_curs:
|
||||||
sqlite_cur.execute(
|
sqlite_cur.execute(
|
||||||
"""
|
"""
|
||||||
INSERT OR IGNORE INTO media
|
INSERT OR IGNORE INTO media
|
||||||
|
@ -236,9 +260,7 @@ def run_update_db(postgres_conn, sqlite_conn, before_date):
|
||||||
)
|
)
|
||||||
update_count += sqlite_cur.rowcount
|
update_count += sqlite_cur.rowcount
|
||||||
|
|
||||||
print("Synced", update_count, "new rows")
|
return update_count
|
||||||
|
|
||||||
postgres_conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def run_check_delete(sqlite_conn, base_path):
|
def run_check_delete(sqlite_conn, base_path):
|
||||||
|
@ -368,8 +390,9 @@ def get_sqlite_conn(parser):
|
||||||
return sqlite_conn
|
return sqlite_conn
|
||||||
|
|
||||||
|
|
||||||
def get_postgres_conn(parser):
|
def get_synapse_db_conn(parser):
|
||||||
"""Attempt to get a postgres connection based on database.yaml, or exit.
|
"""Attempt to get a connection based on database.yaml to Synapse's
|
||||||
|
database, or exit.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
database_yaml = yaml.safe_load(open("database.yaml"))
|
database_yaml = yaml.safe_load(open("database.yaml"))
|
||||||
|
@ -379,11 +402,18 @@ def get_postgres_conn(parser):
|
||||||
parser.error("database.yaml is not valid yaml: %s" % (e,))
|
parser.error("database.yaml is not valid yaml: %s" % (e,))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
postgres_conn = psycopg2.connect(**database_yaml)
|
if "sqlite" in database_yaml:
|
||||||
|
synapse_db_conn = sqlite3.connect(**database_yaml["sqlite"])
|
||||||
|
elif "postgres" in database_yaml:
|
||||||
|
synapse_db_conn = psycopg2.connect(**database_yaml["postgres"])
|
||||||
|
else:
|
||||||
|
synapse_db_conn = psycopg2.connect(**database_yaml)
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
parser.error("Could not connect to sqlite3 database: %s" % (e,))
|
||||||
except psycopg2.Error as e:
|
except psycopg2.Error as e:
|
||||||
parser.error("Could not connect to postgres database: %s" % (e,))
|
parser.error("Could not connect to postgres database: %s" % (e,))
|
||||||
|
|
||||||
return postgres_conn
|
return synapse_db_conn
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
@ -488,8 +518,8 @@ def main():
|
||||||
|
|
||||||
if args.cmd == "update-db":
|
if args.cmd == "update-db":
|
||||||
sqlite_conn = get_sqlite_conn(parser)
|
sqlite_conn = get_sqlite_conn(parser)
|
||||||
postgres_conn = get_postgres_conn(parser)
|
synapse_db_conn = get_synapse_db_conn(parser)
|
||||||
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
run_update_db(synapse_db_conn, sqlite_conn, args.duration)
|
||||||
return
|
return
|
||||||
|
|
||||||
if args.cmd == "check-deleted":
|
if args.cmd == "check-deleted":
|
||||||
|
@ -499,8 +529,8 @@ def main():
|
||||||
|
|
||||||
if args.cmd == "update":
|
if args.cmd == "update":
|
||||||
sqlite_conn = get_sqlite_conn(parser)
|
sqlite_conn = get_sqlite_conn(parser)
|
||||||
postgres_conn = get_postgres_conn(parser)
|
synapse_db_conn = get_synapse_db_conn(parser)
|
||||||
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
run_update_db(synapse_db_conn, sqlite_conn, args.duration)
|
||||||
run_check_delete(sqlite_conn, args.base_path)
|
run_check_delete(sqlite_conn, args.base_path)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue