#!/usr/bin/python import argparse import datetime import os import sqlite3 import boto3 import botocore import humanize import psycopg2 import tqdm import yaml import sys # Schema for our sqlite database cache SCHEMA = """ CREATE TABLE IF NOT EXISTS media ( origin TEXT NOT NULL, -- empty string if local media media_id TEXT NOT NULL, filesystem_id TEXT NOT NULL, -- Type is "local" or "remote" type TEXT NOT NULL, known_deleted BOOLEAN NOT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id); CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted); """ progress = True def parse_duration(string): """Parse a string into a duration supports suffix of d, m or y. """ suffix = string[-1] number = string[:-1] try: number = int(number) except ValueError: raise argparse.ArgumentTypeError( "duration must be an integer followed by a 'd', 'm' or 'y' suffix" ) now = datetime.datetime.now() if suffix == "d": then = now - datetime.timedelta(days=number) elif suffix == "m": then = now - datetime.timedelta(days=30 * number) elif suffix == "y": then = now - datetime.timedelta(days=365 * number) else: raise argparse.ArgumentTypeError( "duration must end in 'd', 'm' or 'y'" ) return then def run_update_db(postgres_conn, sqlite_conn, before_date): """Entry point for update-db command """ local_sql = """ SELECT '', media_id, media_id FROM local_media_repository WHERE COALESCE(last_access_ts, created_ts) < %s AND url_cache IS NULL """ remote_sql = """ SELECT media_origin, media_id, filesystem_id FROM remote_media_cache WHERE COALESCE(last_access_ts, created_ts) < %s """ last_access_ts = int(before_date.timestamp() * 1000) print( "Syncing files that haven't been accessed since:", before_date.isoformat(" "), ) update_count = 0 with sqlite_conn: sqlite_cur = sqlite_conn.cursor() with postgres_conn.cursor() as pg_curs: for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): pg_curs.execute(sql, (last_access_ts,)) for (origin, media_id, filesystem_id) in pg_curs: sqlite_cur.execute( """ INSERT OR IGNORE INTO media (origin, media_id, filesystem_id, type, known_deleted) VALUES (?, ?, ?, ?, ?) """, (origin, media_id, filesystem_id, mtype, False), ) update_count += sqlite_cur.rowcount print("Synced", update_count, "new rows") postgres_conn.close() def run_check_delete(sqlite_conn, base_path): """Entry point for check-deleted command """ deleted = [] if progress: it = tqdm.tqdm( get_not_deleted(sqlite_conn), unit="files", total=get_not_deleted_count(sqlite_conn) ) else: it = get_not_deleted(sqlite_conn) print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files") for origin, media_id, filesystem_id, m_type in it: if m_type == "local": file_path = os.path.join( base_path, "local_content", filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:], ) elif m_type == "remote": file_path = os.path.join( base_path, "remote_content", origin, filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:], ) else: raise Exception("Unexpected media type %r", m_type) if not os.path.exists(file_path): deleted.append((origin, media_id)) with sqlite_conn: sqlite_conn.executemany( """ UPDATE media SET known_deleted = ? WHERE origin = ? AND media_id = ? """, ((True, o, m) for o, m in deleted), ) print("Updated", len(deleted), "as deleted") def mark_as_deleted(sqlite_conn, origin, media_id): with sqlite_conn: sqlite_conn.execute( """ UPDATE media SET known_deleted = ? WHERE origin = ? AND media_id = ? """, (True, origin, media_id), ) def get_not_deleted_count(sqlite_conn): """Get count of all rows in our cache that we don't think have been deleted """ cur = sqlite_conn.cursor() cur.execute( """ SELECT COALESCE(count(*), 0) FROM media WHERE NOT known_deleted """ ) count, = cur.fetchone() return count def get_not_deleted(sqlite_conn): """Get all rows in our cache that we don't think have been deleted """ cur = sqlite_conn.cursor() cur.execute( """ SELECT origin, media_id, filesystem_id, type FROM media WHERE NOT known_deleted """ ) return cur def to_path(origin, filesystem_id, m_type): """Get a relative path to the given media """ if m_type == "local": file_path = os.path.join( "local_content", filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:], ) elif m_type == "remote": file_path = os.path.join( "remote_content", origin, filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:], ) else: raise Exception("Unexpected media type %r", m_type) return file_path def check_file_in_s3(s3, bucket, key): """Check the file exists in S3 (though it could be different) """ try: s3.head_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as e: if int(e.response["Error"]["Code"]) == 404: return False raise return True def run_write(sqlite_conn, output_file): """Entry point for write command """ for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn): file_path = to_path(origin, filesystem_id, m_type) print(file_path, file=output_file) def run_upload( s3, bucket, sqlite_conn, base_path, should_delete, storage_class ): """Entry point for upload command """ total = get_not_deleted_count(sqlite_conn) uploaded = 0 uploaded_bytes = 0 deleted = 0 deleted_bytes = 0 # This is a progress bar if progress: it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total) else: print("Uploading ", total, " files") it = get_not_deleted(sqlite_conn) for origin, media_id, filesystem_id, m_type in it: rel_file_path = to_path(origin, filesystem_id, m_type) local_path = os.path.join(base_path, rel_file_path) path_exists = os.path.exists(local_path) if not path_exists: mark_as_deleted(sqlite_conn, origin, media_id) continue if not check_file_in_s3(s3, bucket, rel_file_path): try: s3.upload_file( local_path, bucket, rel_file_path, ExtraArgs={"StorageClass": storage_class}, ) except Exception as e: print("Failed to upload file %s: %s", local_path, e) continue uploaded += 1 uploaded_bytes += os.path.getsize(local_path) if should_delete: size = os.path.getsize(local_path) os.remove(local_path) try: # This may have lead to an empty directory, so lets remove all # that are empty os.removedirs(os.path.dirname(local_path)) except Exception: # The directory might not be empty, or maybe we don't have # permission. Either way doesn't really matter. pass mark_as_deleted(sqlite_conn, origin, media_id) deleted += 1 deleted_bytes += size print("Uploaded", uploaded, "files out of", total) print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True)) print("Deleted", deleted, "files") print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True)) def get_sqlite_conn(parser): """Attempt to get a sqlite connection to cache.db, or exit. """ try: sqlite_conn = sqlite3.connect("cache.db") sqlite_conn.executescript(SCHEMA) except sqlite3.Error as e: parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,)) return sqlite_conn def get_postgres_conn(parser): """Attempt to get a postgres connection based on database.yaml, or exit. """ try: database_yaml = yaml.safe_load(open("database.yaml")) except FileNotFoundError: parser.error("Could not find database.yaml") except yaml.YAMLError as e: parser.error("database.yaml is not valid yaml: %s" % (e,)) try: postgres_conn = psycopg2.connect(**database_yaml) except psycopg2.Error as e: parser.error("Could not connect to postgres database: %s" % (e,)) return postgres_conn def main(): parser = argparse.ArgumentParser(prog="s3_media_upload") parser.add_argument("--no-progress", help="do not show progress bars", action="store_true", dest="no_progress") subparsers = parser.add_subparsers(help="command to run", dest="cmd") update_db_parser = subparsers.add_parser( "update-db", help="Syncs rows from database to local cache" ) update_db_parser.add_argument( "duration", type=parse_duration, help="Fetch rows that haven't been accessed in the duration given," " accepts duration of the form of e.g. 1m (one month). Valid suffixes" " are d, m or y. NOTE: Currently does not remove entries from the cache", ) deleted_parser = subparsers.add_parser( "check-deleted", help="Check whether files in the local cache still exist under given" " path", ) deleted_parser.add_argument( "base_path", help="Base path of the media store directory" ) update_parser = subparsers.add_parser( "update", help="Updates local cache. Equivalent to running update-db and" " check-deleted", ) update_parser.add_argument( "base_path", help="Base path of the media store directory" ) update_parser.add_argument( "duration", type=parse_duration, help="Fetch rows that haven't been accessed in the duration given," " accepts duration of the form of e.g. 1m (one month). Valid suffixes" " are d, m or y. NOTE: Currently does not remove entries from the cache", ) write_parser = subparsers.add_parser( "write", help="Outputs all files in local cache that we may not have deleted," " check-deleted should be run first to update cache.", ) write_parser.add_argument( "out", type=argparse.FileType("w", encoding="UTF-8"), default="-", nargs="?", help="File to output list to, or '-' for stdout", ) upload_parser = subparsers.add_parser( "upload", help="Uploads media to s3 based on local cache" ) upload_parser.add_argument( "base_path", help="Base path of the media store directory" ) upload_parser.add_argument("bucket", help="S3 bucket to upload to") upload_parser.add_argument( "--storage-class", help="S3 storage class to use", nargs="?", choices=[ "STANDARD", "REDUCED_REDUNDANCY", "STANDARD_IA", "ONEZONE_IA", ], default="STANDARD", ) upload_parser.add_argument( "--delete", action="store_const", const=True, help="Deletes local copy from media store on succesful upload", ) upload_parser.add_argument( "--endpoint-url", help="S3 endpoint url to use", default=None ) args = parser.parse_args() if args.no_progress: global progress progress = False if args.cmd == "write": sqlite_conn = get_sqlite_conn(parser) run_write(sqlite_conn, args.out) return if args.cmd == "update-db": sqlite_conn = get_sqlite_conn(parser) postgres_conn = get_postgres_conn(parser) run_update_db(postgres_conn, sqlite_conn, args.duration) return if args.cmd == "check-deleted": sqlite_conn = get_sqlite_conn(parser) run_check_delete(sqlite_conn, args.base_path) return if args.cmd == "update": sqlite_conn = get_sqlite_conn(parser) postgres_conn = get_postgres_conn(parser) run_update_db(postgres_conn, sqlite_conn, args.duration) run_check_delete(sqlite_conn, args.base_path) return if args.cmd == "upload": sqlite_conn = get_sqlite_conn(parser) s3 = boto3.client("s3", endpoint_url=args.endpoint_url) run_upload( s3, args.bucket, sqlite_conn, args.base_path, should_delete=args.delete, storage_class=args.storage_class, ) return parser.error("Valid subcommand must be specified") if __name__ == "__main__": main()