From fcfc95d0e624c341d5ef321d14e774aebcb5ce89 Mon Sep 17 00:00:00 2001 From: Michael Kaye <1917473+michaelkaye@users.noreply.github.com> Date: Mon, 3 Jun 2019 15:06:06 +0100 Subject: [PATCH] Add a s3_media_upload script --- README.md | 29 +++ scripts/s3_media_upload | 450 ++++++++++++++++++++++++++++++++++++++++ setup.py | 8 +- 3 files changed, 486 insertions(+), 1 deletion(-) create mode 100644 scripts/s3_media_upload diff --git a/README.md b/README.md index 37c41e9..7ea8793 100644 --- a/README.md +++ b/README.md @@ -32,3 +32,32 @@ media_storage_providers: This module uses `boto3`, and so the credentials should be specified as described [here](https://boto3.readthedocs.io/en/latest/guide/configuration.html#guide-configuration). +Regular cleanup job +------------------- + +There is additionally a `s3_media_upload.py` which can be used in a regular job to +upload content to s3, then delete that from local disk. This script can be used in +combination with configuration for the storage provider to pull media from s3, but +upload it asynchronously. + +Once the package is installed, the script should be run somewhat like the +following. We suggest using tmux or screen as these can take a long time on larger +servers. + +More options are available in the command help. + +``` +> cd s3_media_upload +# cache.db will be created if absent. database.yaml is required to contain PG credentials +> ls +cache.db database.yaml +# Update cache from /path/to/media/store looking for files not used within 2 months +> s3_media_upload update /path/to/media/store 2m +Syncing files that haven't been accessed since: 2018-10-18 11:06:21.520602 +Synced 0 new rows +100%|█████████████████████████████████████████████████████████████| 1074/1074 [00:33<00:00, 25.97files/s] +Updated 0 as deleted + +> s3_media_upload upload /path/to/media/store matrix_s3_bucket_name --storage-class STANDARD_IA --delete +# prepare to wait a long time +``` diff --git a/scripts/s3_media_upload b/scripts/s3_media_upload new file mode 100644 index 0000000..b8e08d8 --- /dev/null +++ b/scripts/s3_media_upload @@ -0,0 +1,450 @@ +#!/usr/bin/python +import argparse +import datetime +import os +import sqlite3 + +import boto3 +import botocore +import humanize +import psycopg2 +import tqdm +import yaml + +# Schema for our sqlite database cache +SCHEMA = """ + CREATE TABLE IF NOT EXISTS media ( + origin TEXT NOT NULL, -- empty string if local media + media_id TEXT NOT NULL, + filesystem_id TEXT NOT NULL, + -- Type is "local" or "remote" + type TEXT NOT NULL, + known_deleted BOOLEAN NOT NULL + ); + + CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id); + CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted); +""" + + +def parse_duration(string): + """Parse a string into a duration supports suffix of d, m or y. + """ + suffix = string[-1] + number = string[:-1] + + try: + number = int(number) + except ValueError: + raise argparse.ArgumentTypeError( + "duration must be an integer followed by a 'd', 'm' or 'y' suffix" + ) + + now = datetime.datetime.now() + if suffix == "d": + then = now - datetime.timedelta(days=number) + elif suffix == "m": + then = now - datetime.timedelta(days=30 * number) + elif suffix == "y": + then = now - datetime.timedelta(days=365 * number) + else: + raise argparse.ArgumentTypeError( + "duration must end in 'd', 'm' or 'y'" + ) + + return then + + +def run_update_db(postgres_conn, sqlite_conn, before_date): + """Entry point for update-db command + """ + + local_sql = """ + SELECT '', media_id, media_id + FROM local_media_repository + WHERE + last_access_ts < %s + AND url_cache IS NULL + """ + + remote_sql = """ + SELECT media_origin, media_id, filesystem_id + FROM remote_media_cache + WHERE + last_access_ts < %s + """ + + last_access_ts = int(before_date.timestamp() * 1000) + + print( + "Syncing files that haven't been accessed since:", + before_date.isoformat(" "), + ) + + update_count = 0 + with sqlite_conn: + sqlite_cur = sqlite_conn.cursor() + with postgres_conn.cursor() as pg_curs: + for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")): + pg_curs.execute(sql, (last_access_ts,)) + + for (origin, media_id, filesystem_id) in pg_curs: + sqlite_cur.execute( + """ + INSERT OR IGNORE INTO media + (origin, media_id, filesystem_id, type, known_deleted) + VALUES (?, ?, ?, ?, ?) + """, + (origin, media_id, filesystem_id, mtype, False), + ) + update_count += sqlite_cur.rowcount + + print("Synced", update_count, "new rows") + + postgres_conn.close() + + +def run_check_delete(sqlite_conn, base_path): + """Entry point for check-deleted command + """ + deleted = [] + for origin, media_id, filesystem_id, m_type in tqdm.tqdm( + get_not_deleted(sqlite_conn), + unit="files", + total=get_not_deleted_count(sqlite_conn), + ): + if m_type == "local": + file_path = os.path.join( + base_path, + "local_content", + filesystem_id[:2], + filesystem_id[2:4], + filesystem_id[4:], + ) + elif m_type == "remote": + file_path = os.path.join( + base_path, + "remote_content", + origin, + filesystem_id[:2], + filesystem_id[2:4], + filesystem_id[4:], + ) + else: + raise Exception("Unexpected media type %r", m_type) + + if not os.path.exists(file_path): + deleted.append((origin, media_id)) + + with sqlite_conn: + sqlite_conn.executemany( + """ + UPDATE media SET known_deleted = ? + WHERE origin = ? AND media_id = ? + """, + ((True, o, m) for o, m in deleted), + ) + + print("Updated", len(deleted), "as deleted") + + +def mark_as_deleted(sqlite_conn, origin, media_id): + with sqlite_conn: + sqlite_conn.execute( + """ + UPDATE media SET known_deleted = ? + WHERE origin = ? AND media_id = ? + """, + (True, origin, media_id), + ) + + +def get_not_deleted_count(sqlite_conn): + """Get count of all rows in our cache that we don't think have been deleted + """ + cur = sqlite_conn.cursor() + + cur.execute( + """ + SELECT COALESCE(count(*), 0) FROM media + WHERE NOT known_deleted + """ + ) + count, = cur.fetchone() + return count + + +def get_not_deleted(sqlite_conn): + """Get all rows in our cache that we don't think have been deleted + """ + cur = sqlite_conn.cursor() + + cur.execute( + """ + SELECT origin, media_id, filesystem_id, type FROM media + WHERE NOT known_deleted + """ + ) + return cur + + +def to_path(origin, filesystem_id, m_type): + """Get a relative path to the given media + """ + if m_type == "local": + file_path = os.path.join( + "local_content", + filesystem_id[:2], + filesystem_id[2:4], + filesystem_id[4:], + ) + elif m_type == "remote": + file_path = os.path.join( + "remote_content", + origin, + filesystem_id[:2], + filesystem_id[2:4], + filesystem_id[4:], + ) + else: + raise Exception("Unexpected media type %r", m_type) + + return file_path + + +def check_file_in_s3(s3, bucket, key): + """Check the file exists in S3 (though it could be different) + """ + try: + s3.head_object(Bucket=bucket, Key=key) + except botocore.exceptions.ClientError as e: + if int(e.response["Error"]["Code"]) == 404: + return False + raise + + return True + + +def run_write(sqlite_conn, output_file): + """Entry point for write command + """ + for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn): + file_path = to_path(origin, filesystem_id, m_type) + print(file_path, file=output_file) + + +def run_upload( + s3, bucket, sqlite_conn, base_path, should_delete, storage_class +): + """Entry point for upload command + """ + total = get_not_deleted_count(sqlite_conn) + + uploaded = 0 + uploaded_bytes = 0 + deleted = 0 + deleted_bytes = 0 + + # This is a progress bar + it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total) + for origin, media_id, filesystem_id, m_type in it: + rel_file_path = to_path(origin, filesystem_id, m_type) + + local_path = os.path.join(base_path, rel_file_path) + path_exists = os.path.exists(local_path) + if not path_exists: + mark_as_deleted(sqlite_conn, origin, media_id) + continue + + if not check_file_in_s3(s3, bucket, rel_file_path): + try: + s3.upload_file( + local_path, + bucket, + rel_file_path, + ExtraArgs={"StorageClass": storage_class}, + ) + except Exception as e: + print("Failed to upload file %s: %s", local_path, e) + continue + + uploaded += 1 + uploaded_bytes += os.path.getsize(local_path) + + if should_delete: + size = os.path.getsize(local_path) + os.remove(local_path) + + try: + # This may have lead to an empty directory, so lets remove all + # that are empty + os.removedirs(os.path.dirname(local_path)) + except Exception: + # The directory might not be empty, or maybe we don't have + # permission. Either way doesn't really matter. + pass + + mark_as_deleted(sqlite_conn, origin, media_id) + + deleted += 1 + deleted_bytes += size + + print("Uploaded", uploaded, "files out of", total) + print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True)) + print("Deleted", deleted, "files") + print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True)) + + +def get_sqlite_conn(parser): + """Attempt to get a sqlite connection to cache.db, or exit. + """ + try: + sqlite_conn = sqlite3.connect("cache.db") + sqlite_conn.executescript(SCHEMA) + except sqlite3.Error as e: + parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,)) + + return sqlite_conn + + +def get_postgres_conn(parser): + """Attempt to get a postgres connection based on database.yaml, or exit. + """ + try: + database_yaml = yaml.load(open("database.yaml")) + except FileNotFoundError: + parser.error("Could not find database.yaml") + except yaml.YAMLError as e: + parser.error("database.yaml is not valid yaml: %s" % (e,)) + + try: + postgres_conn = psycopg2.connect(**database_yaml) + except psycopg2.Error as e: + parser.error("Could not connect to postgres database: %s" % (e,)) + + return postgres_conn + + +def main(): + parser = argparse.ArgumentParser(prog="s3_media_upload") + subparsers = parser.add_subparsers(help="command to run", dest="cmd") + + update_db_parser = subparsers.add_parser( + "update-db", help="Syncs rows from database to local cache" + ) + update_db_parser.add_argument( + "duration", + type=parse_duration, + help="Fetch rows that haven't been accessed in the duration given," + " accepts duration of the form of e.g. 1m (one month). Valid suffixes" + " are d, m or y. NOTE: Currently does not remove entries from the cache", + ) + + deleted_parser = subparsers.add_parser( + "check-deleted", + help="Check whether files in the local cache still exist under given" + " path", + ) + deleted_parser.add_argument( + "base_path", help="Base path of the media store directory" + ) + + update_parser = subparsers.add_parser( + "update", + help="Updates local cache. Equivalent to running update-db and" + " check-deleted", + ) + update_parser.add_argument( + "base_path", help="Base path of the media store directory" + ) + update_parser.add_argument( + "duration", + type=parse_duration, + help="Fetch rows that haven't been accessed in the duration given," + " accepts duration of the form of e.g. 1m (one month). Valid suffixes" + " are d, m or y. NOTE: Currently does not remove entries from the cache", + ) + + write_parser = subparsers.add_parser( + "write", + help="Outputs all files in local cache that we may not have deleted," + " check-deleted should be run first to update cache.", + ) + write_parser.add_argument( + "out", + type=argparse.FileType("w", encoding="UTF-8"), + default="-", + nargs="?", + help="File to output list to, or '-' for stdout", + ) + + upload_parser = subparsers.add_parser( + "upload", help="Uploads media to s3 based on local cache" + ) + upload_parser.add_argument( + "base_path", help="Base path of the media store directory" + ) + upload_parser.add_argument("bucket", help="S3 bucket to upload to") + upload_parser.add_argument( + "--storage-class", + help="S3 storage class to use", + nargs="?", + choices=[ + "STANDARD", + "REDUCED_REDUNDANCY", + "STANDARD_IA", + "ONEZONE_IA", + ], + default="STANDARD", + ) + + upload_parser.add_argument( + "--delete", + action="store_const", + const=True, + help="Deletes local copy from media store on succesful upload", + ) + + args = parser.parse_args() + + if args.cmd == "write": + sqlite_conn = get_sqlite_conn(parser) + run_write(sqlite_conn, args.out) + return + + if args.cmd == "update-db": + sqlite_conn = get_sqlite_conn(parser) + postgres_conn = get_postgres_conn(parser) + run_update_db(postgres_conn, sqlite_conn, args.duration) + return + + if args.cmd == "check-deleted": + sqlite_conn = get_sqlite_conn(parser) + run_check_delete(sqlite_conn, args.base_path) + return + + if args.cmd == "update": + sqlite_conn = get_sqlite_conn(parser) + postgres_conn = get_postgres_conn(parser) + run_update_db(postgres_conn, sqlite_conn, args.duration) + run_check_delete(sqlite_conn, args.base_path) + return + + if args.cmd == "upload": + sqlite_conn = get_sqlite_conn(parser) + s3 = boto3.client("s3") + run_upload( + s3, + args.bucket, + sqlite_conn, + args.base_path, + should_delete=args.delete, + storage_class=args.storage_class, + ) + return + + parser.error("Valid subcommand must be specified") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 9b8e8e9..1ac4a0d 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,13 @@ setup( zip_safe=False, url="https://github.com/matrix-org/synapse-s3-storage-provider", py_modules=['s3_storage_provider'], + scripts=['scripts/s3_media_upload'], install_requires=[ - 'boto3' + 'boto3>=1.9.23<2.0', + 'botocore>=1.12.23<2.0', + 'humanize>=0.5.1<0.6', + 'psycopg2-binary>=2.7.5<3.0', + 'PyYAML>=3.13<4.0', + 'tqdm>=4.26.0<5.0' ], )