From 8926b4e4178edcda1a32fdb39bd36cef1a1a9d40 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 13 Jan 2022 19:14:52 +0100 Subject: [PATCH] Add support for Synapse running on sqlite3 database in `scripts/s3_media_upload` (#71) Signed-off-by: Alex Auvolat --- scripts/s3_media_upload | 74 +++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/scripts/s3_media_upload b/scripts/s3_media_upload index 99ce56d..6be77e7 100755 --- a/scripts/s3_media_upload +++ b/scripts/s3_media_upload @@ -193,7 +193,7 @@ def run_write(sqlite_conn, output_file): print(thumbnail_path, file=output_file) -def run_update_db(postgres_conn, sqlite_conn, before_date): +def run_update_db(synapse_db_conn, sqlite_conn, before_date): """Entry point for update-db command """ @@ -219,26 +219,48 @@ def run_update_db(postgres_conn, sqlite_conn, before_date): ) update_count = 0 + with sqlite_conn: sqlite_cur = sqlite_conn.cursor() - with postgres_conn.cursor() as pg_curs: - for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): - pg_curs.execute(sql, (last_access_ts,)) - for (origin, media_id, filesystem_id) in pg_curs: - sqlite_cur.execute( - """ - INSERT OR IGNORE INTO media - (origin, media_id, filesystem_id, type, known_deleted) - VALUES (?, ?, ?, ?, ?) - """, - (origin, media_id, filesystem_id, mtype, False), + if isinstance(synapse_db_conn, sqlite3.Connection): + synapse_db_curs = synapse_db_conn.cursor() + for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): + synapse_db_curs.execute(sql.replace("%s", "?"), (last_access_ts,)) + update_count += update_db_process_rows( + mtype, sqlite_cur, synapse_db_curs + ) + + else: + with synapse_db_conn.cursor() as synapse_db_curs: + for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): + synapse_db_curs.execute(sql, (last_access_ts,)) + update_count += update_db_process_rows( + mtype, sqlite_cur, synapse_db_curs ) - update_count += sqlite_cur.rowcount print("Synced", update_count, "new rows") - postgres_conn.close() + synapse_db_conn.close() + + +def update_db_process_rows(mtype, sqlite_cur, synapse_db_curs): + """Process rows extracted from Synapse's database and insert them in cache + """ + update_count = 0 + + for (origin, media_id, filesystem_id) in synapse_db_curs: + sqlite_cur.execute( + """ + INSERT OR IGNORE INTO media + (origin, media_id, filesystem_id, type, known_deleted) + VALUES (?, ?, ?, ?, ?) + """, + (origin, media_id, filesystem_id, mtype, False), + ) + update_count += sqlite_cur.rowcount + + return update_count def run_check_delete(sqlite_conn, base_path): @@ -368,8 +390,9 @@ def get_sqlite_conn(parser): return sqlite_conn -def get_postgres_conn(parser): - """Attempt to get a postgres connection based on database.yaml, or exit. +def get_synapse_db_conn(parser): + """Attempt to get a connection based on database.yaml to Synapse's + database, or exit. """ try: database_yaml = yaml.safe_load(open("database.yaml")) @@ -379,11 +402,18 @@ def get_postgres_conn(parser): parser.error("database.yaml is not valid yaml: %s" % (e,)) try: - postgres_conn = psycopg2.connect(**database_yaml) + if "sqlite" in database_yaml: + synapse_db_conn = sqlite3.connect(**database_yaml["sqlite"]) + elif "postgres" in database_yaml: + synapse_db_conn = psycopg2.connect(**database_yaml["postgres"]) + else: + synapse_db_conn = psycopg2.connect(**database_yaml) + except sqlite3.OperationalError as e: + parser.error("Could not connect to sqlite3 database: %s" % (e,)) except psycopg2.Error as e: parser.error("Could not connect to postgres database: %s" % (e,)) - return postgres_conn + return synapse_db_conn def main(): @@ -488,8 +518,8 @@ def main(): if args.cmd == "update-db": sqlite_conn = get_sqlite_conn(parser) - postgres_conn = get_postgres_conn(parser) - run_update_db(postgres_conn, sqlite_conn, args.duration) + synapse_db_conn = get_synapse_db_conn(parser) + run_update_db(synapse_db_conn, sqlite_conn, args.duration) return if args.cmd == "check-deleted": @@ -499,8 +529,8 @@ def main(): if args.cmd == "update": sqlite_conn = get_sqlite_conn(parser) - postgres_conn = get_postgres_conn(parser) - run_update_db(postgres_conn, sqlite_conn, args.duration) + synapse_db_conn = get_synapse_db_conn(parser) + run_update_db(synapse_db_conn, sqlite_conn, args.duration) run_check_delete(sqlite_conn, args.base_path) return