From a5b15d644dd858e9a07748ca2c3bb3633118dacb Mon Sep 17 00:00:00 2001 From: Sean Date: Fri, 10 Sep 2021 11:39:50 +0100 Subject: [PATCH] Misc improvements to `scripts/s3_media_upload` and fix CI (#59) scripts/s3_media_upload: * Mark `scripts/s3_media_upload` as executable * Fix `scripts/s3_media_upload` shebang to respect virtual environments * Format `scripts/s3_media_upload` * Remove unused imports from `scripts/s3_media_upload` * Include `scripts/s3_media_upload` in CI checks * Refactor `s3_media_upload`'s `run_check_delete` to use `to_path` instead of duplicating code CI: * Fix branch names in CI config --- .github/workflows/ci.yml | 4 +- scripts/s3_media_upload | 218 +++++++++++++++++---------------------- tox.ini | 6 +- 3 files changed, 102 insertions(+), 126 deletions(-) mode change 100644 => 100755 scripts/s3_media_upload diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c74200..efcb3a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [ master ] + branches: [ main ] pull_request: - branches: [ master ] + branches: [ main ] workflow_dispatch: diff --git a/scripts/s3_media_upload b/scripts/s3_media_upload old mode 100644 new mode 100755 index 928b406..39c0bbc --- a/scripts/s3_media_upload +++ b/scripts/s3_media_upload @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python import argparse import datetime import os @@ -10,7 +10,6 @@ import humanize import psycopg2 import tqdm import yaml -import sys # Schema for our sqlite database cache SCHEMA = """ @@ -29,6 +28,7 @@ SCHEMA = """ progress = True + def parse_duration(string): """Parse a string into a duration supports suffix of d, m or y. """ @@ -50,119 +50,18 @@ def parse_duration(string): elif suffix == "y": then = now - datetime.timedelta(days=365 * number) else: - raise argparse.ArgumentTypeError( - "duration must end in 'd', 'm' or 'y'" - ) + raise argparse.ArgumentTypeError("duration must end in 'd', 'm' or 'y'") return then -def run_update_db(postgres_conn, sqlite_conn, before_date): - """Entry point for update-db command - """ - - local_sql = """ - SELECT '', media_id, media_id - FROM local_media_repository - WHERE - COALESCE(last_access_ts, created_ts) < %s - AND url_cache IS NULL - """ - - remote_sql = """ - SELECT media_origin, media_id, filesystem_id - FROM remote_media_cache - WHERE - COALESCE(last_access_ts, created_ts) < %s - """ - - last_access_ts = int(before_date.timestamp() * 1000) - - print( - "Syncing files that haven't been accessed since:", - before_date.isoformat(" "), - ) - - update_count = 0 - with sqlite_conn: - sqlite_cur = sqlite_conn.cursor() - with postgres_conn.cursor() as pg_curs: - for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): - pg_curs.execute(sql, (last_access_ts,)) - - for (origin, media_id, filesystem_id) in pg_curs: - sqlite_cur.execute( - """ - INSERT OR IGNORE INTO media - (origin, media_id, filesystem_id, type, known_deleted) - VALUES (?, ?, ?, ?, ?) - """, - (origin, media_id, filesystem_id, mtype, False), - ) - update_count += sqlite_cur.rowcount - - print("Synced", update_count, "new rows") - - postgres_conn.close() - - -def run_check_delete(sqlite_conn, base_path): - """Entry point for check-deleted command - """ - deleted = [] - if progress: - it = tqdm.tqdm( - get_not_deleted(sqlite_conn), - unit="files", - total=get_not_deleted_count(sqlite_conn) - ) - else: - it = get_not_deleted(sqlite_conn) - print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files") - - for origin, media_id, filesystem_id, m_type in it: - if m_type == "local": - file_path = os.path.join( - base_path, - "local_content", - filesystem_id[:2], - filesystem_id[2:4], - filesystem_id[4:], - ) - elif m_type == "remote": - file_path = os.path.join( - base_path, - "remote_content", - origin, - filesystem_id[:2], - filesystem_id[2:4], - filesystem_id[4:], - ) - else: - raise Exception("Unexpected media type %r", m_type) - - if not os.path.exists(file_path): - deleted.append((origin, media_id)) - - with sqlite_conn: - sqlite_conn.executemany( - """ - UPDATE media SET known_deleted = ? - WHERE origin = ? AND media_id = ? - """, - ((True, o, m) for o, m in deleted), - ) - - print("Updated", len(deleted), "as deleted") - - def mark_as_deleted(sqlite_conn, origin, media_id): with sqlite_conn: sqlite_conn.execute( """ UPDATE media SET known_deleted = ? WHERE origin = ? AND media_id = ? - """, + """, (True, origin, media_id), ) @@ -176,9 +75,9 @@ def get_not_deleted_count(sqlite_conn): """ SELECT COALESCE(count(*), 0) FROM media WHERE NOT known_deleted - """ + """ ) - count, = cur.fetchone() + (count,) = cur.fetchone() return count @@ -191,7 +90,7 @@ def get_not_deleted(sqlite_conn): """ SELECT origin, media_id, filesystem_id, type FROM media WHERE NOT known_deleted - """ + """ ) return cur @@ -201,10 +100,7 @@ def to_path(origin, filesystem_id, m_type): """ if m_type == "local": file_path = os.path.join( - "local_content", - filesystem_id[:2], - filesystem_id[2:4], - filesystem_id[4:], + "local_content", filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:], ) elif m_type == "remote": file_path = os.path.join( @@ -241,9 +137,87 @@ def run_write(sqlite_conn, output_file): print(file_path, file=output_file) -def run_upload( - s3, bucket, sqlite_conn, base_path, should_delete, storage_class -): +def run_update_db(postgres_conn, sqlite_conn, before_date): + """Entry point for update-db command + """ + + local_sql = """ + SELECT '', media_id, media_id + FROM local_media_repository + WHERE + COALESCE(last_access_ts, created_ts) < %s + AND url_cache IS NULL + """ + + remote_sql = """ + SELECT media_origin, media_id, filesystem_id + FROM remote_media_cache + WHERE + COALESCE(last_access_ts, created_ts) < %s + """ + + last_access_ts = int(before_date.timestamp() * 1000) + + print( + "Syncing files that haven't been accessed since:", before_date.isoformat(" "), + ) + + update_count = 0 + with sqlite_conn: + sqlite_cur = sqlite_conn.cursor() + with postgres_conn.cursor() as pg_curs: + for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): + pg_curs.execute(sql, (last_access_ts,)) + + for (origin, media_id, filesystem_id) in pg_curs: + sqlite_cur.execute( + """ + INSERT OR IGNORE INTO media + (origin, media_id, filesystem_id, type, known_deleted) + VALUES (?, ?, ?, ?, ?) + """, + (origin, media_id, filesystem_id, mtype, False), + ) + update_count += sqlite_cur.rowcount + + print("Synced", update_count, "new rows") + + postgres_conn.close() + + +def run_check_delete(sqlite_conn, base_path): + """Entry point for check-deleted command + """ + deleted = [] + if progress: + it = tqdm.tqdm( + get_not_deleted(sqlite_conn), + unit="files", + total=get_not_deleted_count(sqlite_conn), + ) + else: + it = get_not_deleted(sqlite_conn) + print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files") + + for origin, media_id, filesystem_id, m_type in it: + rel_file_path = to_path(origin, filesystem_id, m_type) + file_path = os.path.join(base_path, rel_file_path) + if not os.path.exists(file_path): + deleted.append((origin, media_id)) + + with sqlite_conn: + sqlite_conn.executemany( + """ + UPDATE media SET known_deleted = ? + WHERE origin = ? AND media_id = ? + """, + ((True, o, m) for o, m in deleted), + ) + + print("Updated", len(deleted), "as deleted") + + +def run_upload(s3, bucket, sqlite_conn, base_path, should_delete, storage_class): """Entry point for upload command """ total = get_not_deleted_count(sqlite_conn) @@ -340,7 +314,12 @@ def get_postgres_conn(parser): def main(): parser = argparse.ArgumentParser(prog="s3_media_upload") - parser.add_argument("--no-progress", help="do not show progress bars", action="store_true", dest="no_progress") + parser.add_argument( + "--no-progress", + help="do not show progress bars", + action="store_true", + dest="no_progress", + ) subparsers = parser.add_subparsers(help="command to run", dest="cmd") update_db_parser = subparsers.add_parser( @@ -356,8 +335,7 @@ def main(): deleted_parser = subparsers.add_parser( "check-deleted", - help="Check whether files in the local cache still exist under given" - " path", + help="Check whether files in the local cache still exist under given path", ) deleted_parser.add_argument( "base_path", help="Base path of the media store directory" @@ -421,9 +399,7 @@ def main(): ) upload_parser.add_argument( - "--endpoint-url", - help="S3 endpoint url to use", - default=None + "--endpoint-url", help="S3 endpoint url to use", default=None ) args = parser.parse_args() diff --git a/tox.ini b/tox.ini index 2734b0f..a45d06b 100644 --- a/tox.ini +++ b/tox.ini @@ -51,10 +51,10 @@ deps = # We pin so that our tests don't start failing on new releases of black. black==19.10b0 commands = - python -m black --check --diff . - /bin/sh -c "flake8 s3_storage_provider.py setup.py" + python -m black --check --diff . scripts/s3_media_upload + /bin/sh -c "flake8 s3_storage_provider.py setup.py scripts/s3_media_upload" [testenv:check_isort] skip_install = True deps = isort -commands = /bin/sh -c "isort -c -sp setup.cfg -rc s3_storage_provider.py setup.py " +commands = /bin/sh -c "isort -c -sp setup.cfg -rc s3_storage_provider.py setup.py scripts/s3_media_upload"