Add a s3_media_upload script

This commit is contained in:
Michael Kaye 2019-06-03 15:06:06 +01:00
parent 197709f1f0
commit fcfc95d0e6
3 changed files with 486 additions and 1 deletions

View file

@ -32,3 +32,32 @@ media_storage_providers:
This module uses `boto3`, and so the credentials should be specified as This module uses `boto3`, and so the credentials should be specified as
described [here](https://boto3.readthedocs.io/en/latest/guide/configuration.html#guide-configuration). described [here](https://boto3.readthedocs.io/en/latest/guide/configuration.html#guide-configuration).
Regular cleanup job
-------------------
There is additionally a `s3_media_upload.py` which can be used in a regular job to
upload content to s3, then delete that from local disk. This script can be used in
combination with configuration for the storage provider to pull media from s3, but
upload it asynchronously.
Once the package is installed, the script should be run somewhat like the
following. We suggest using tmux or screen as these can take a long time on larger
servers.
More options are available in the command help.
```
> cd s3_media_upload
# cache.db will be created if absent. database.yaml is required to contain PG credentials
> ls
cache.db database.yaml
# Update cache from /path/to/media/store looking for files not used within 2 months
> s3_media_upload update /path/to/media/store 2m
Syncing files that haven't been accessed since: 2018-10-18 11:06:21.520602
Synced 0 new rows
100%|█████████████████████████████████████████████████████████████| 1074/1074 [00:33<00:00, 25.97files/s]
Updated 0 as deleted
> s3_media_upload upload /path/to/media/store matrix_s3_bucket_name --storage-class STANDARD_IA --delete
# prepare to wait a long time
```

450
scripts/s3_media_upload Normal file
View file

@ -0,0 +1,450 @@
#!/usr/bin/python
import argparse
import datetime
import os
import sqlite3
import boto3
import botocore
import humanize
import psycopg2
import tqdm
import yaml
# Schema for our sqlite database cache
SCHEMA = """
CREATE TABLE IF NOT EXISTS media (
origin TEXT NOT NULL, -- empty string if local media
media_id TEXT NOT NULL,
filesystem_id TEXT NOT NULL,
-- Type is "local" or "remote"
type TEXT NOT NULL,
known_deleted BOOLEAN NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id);
CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted);
"""
def parse_duration(string):
"""Parse a string into a duration supports suffix of d, m or y.
"""
suffix = string[-1]
number = string[:-1]
try:
number = int(number)
except ValueError:
raise argparse.ArgumentTypeError(
"duration must be an integer followed by a 'd', 'm' or 'y' suffix"
)
now = datetime.datetime.now()
if suffix == "d":
then = now - datetime.timedelta(days=number)
elif suffix == "m":
then = now - datetime.timedelta(days=30 * number)
elif suffix == "y":
then = now - datetime.timedelta(days=365 * number)
else:
raise argparse.ArgumentTypeError(
"duration must end in 'd', 'm' or 'y'"
)
return then
def run_update_db(postgres_conn, sqlite_conn, before_date):
"""Entry point for update-db command
"""
local_sql = """
SELECT '', media_id, media_id
FROM local_media_repository
WHERE
last_access_ts < %s
AND url_cache IS NULL
"""
remote_sql = """
SELECT media_origin, media_id, filesystem_id
FROM remote_media_cache
WHERE
last_access_ts < %s
"""
last_access_ts = int(before_date.timestamp() * 1000)
print(
"Syncing files that haven't been accessed since:",
before_date.isoformat(" "),
)
update_count = 0
with sqlite_conn:
sqlite_cur = sqlite_conn.cursor()
with postgres_conn.cursor() as pg_curs:
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
pg_curs.execute(sql, (last_access_ts,))
for (origin, media_id, filesystem_id) in pg_curs:
sqlite_cur.execute(
"""
INSERT OR IGNORE INTO media
(origin, media_id, filesystem_id, type, known_deleted)
VALUES (?, ?, ?, ?, ?)
""",
(origin, media_id, filesystem_id, mtype, False),
)
update_count += sqlite_cur.rowcount
print("Synced", update_count, "new rows")
postgres_conn.close()
def run_check_delete(sqlite_conn, base_path):
"""Entry point for check-deleted command
"""
deleted = []
for origin, media_id, filesystem_id, m_type in tqdm.tqdm(
get_not_deleted(sqlite_conn),
unit="files",
total=get_not_deleted_count(sqlite_conn),
):
if m_type == "local":
file_path = os.path.join(
base_path,
"local_content",
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
elif m_type == "remote":
file_path = os.path.join(
base_path,
"remote_content",
origin,
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
else:
raise Exception("Unexpected media type %r", m_type)
if not os.path.exists(file_path):
deleted.append((origin, media_id))
with sqlite_conn:
sqlite_conn.executemany(
"""
UPDATE media SET known_deleted = ?
WHERE origin = ? AND media_id = ?
""",
((True, o, m) for o, m in deleted),
)
print("Updated", len(deleted), "as deleted")
def mark_as_deleted(sqlite_conn, origin, media_id):
with sqlite_conn:
sqlite_conn.execute(
"""
UPDATE media SET known_deleted = ?
WHERE origin = ? AND media_id = ?
""",
(True, origin, media_id),
)
def get_not_deleted_count(sqlite_conn):
"""Get count of all rows in our cache that we don't think have been deleted
"""
cur = sqlite_conn.cursor()
cur.execute(
"""
SELECT COALESCE(count(*), 0) FROM media
WHERE NOT known_deleted
"""
)
count, = cur.fetchone()
return count
def get_not_deleted(sqlite_conn):
"""Get all rows in our cache that we don't think have been deleted
"""
cur = sqlite_conn.cursor()
cur.execute(
"""
SELECT origin, media_id, filesystem_id, type FROM media
WHERE NOT known_deleted
"""
)
return cur
def to_path(origin, filesystem_id, m_type):
"""Get a relative path to the given media
"""
if m_type == "local":
file_path = os.path.join(
"local_content",
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
elif m_type == "remote":
file_path = os.path.join(
"remote_content",
origin,
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
else:
raise Exception("Unexpected media type %r", m_type)
return file_path
def check_file_in_s3(s3, bucket, key):
"""Check the file exists in S3 (though it could be different)
"""
try:
s3.head_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as e:
if int(e.response["Error"]["Code"]) == 404:
return False
raise
return True
def run_write(sqlite_conn, output_file):
"""Entry point for write command
"""
for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn):
file_path = to_path(origin, filesystem_id, m_type)
print(file_path, file=output_file)
def run_upload(
s3, bucket, sqlite_conn, base_path, should_delete, storage_class
):
"""Entry point for upload command
"""
total = get_not_deleted_count(sqlite_conn)
uploaded = 0
uploaded_bytes = 0
deleted = 0
deleted_bytes = 0
# This is a progress bar
it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total)
for origin, media_id, filesystem_id, m_type in it:
rel_file_path = to_path(origin, filesystem_id, m_type)
local_path = os.path.join(base_path, rel_file_path)
path_exists = os.path.exists(local_path)
if not path_exists:
mark_as_deleted(sqlite_conn, origin, media_id)
continue
if not check_file_in_s3(s3, bucket, rel_file_path):
try:
s3.upload_file(
local_path,
bucket,
rel_file_path,
ExtraArgs={"StorageClass": storage_class},
)
except Exception as e:
print("Failed to upload file %s: %s", local_path, e)
continue
uploaded += 1
uploaded_bytes += os.path.getsize(local_path)
if should_delete:
size = os.path.getsize(local_path)
os.remove(local_path)
try:
# This may have lead to an empty directory, so lets remove all
# that are empty
os.removedirs(os.path.dirname(local_path))
except Exception:
# The directory might not be empty, or maybe we don't have
# permission. Either way doesn't really matter.
pass
mark_as_deleted(sqlite_conn, origin, media_id)
deleted += 1
deleted_bytes += size
print("Uploaded", uploaded, "files out of", total)
print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True))
print("Deleted", deleted, "files")
print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True))
def get_sqlite_conn(parser):
"""Attempt to get a sqlite connection to cache.db, or exit.
"""
try:
sqlite_conn = sqlite3.connect("cache.db")
sqlite_conn.executescript(SCHEMA)
except sqlite3.Error as e:
parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,))
return sqlite_conn
def get_postgres_conn(parser):
"""Attempt to get a postgres connection based on database.yaml, or exit.
"""
try:
database_yaml = yaml.load(open("database.yaml"))
except FileNotFoundError:
parser.error("Could not find database.yaml")
except yaml.YAMLError as e:
parser.error("database.yaml is not valid yaml: %s" % (e,))
try:
postgres_conn = psycopg2.connect(**database_yaml)
except psycopg2.Error as e:
parser.error("Could not connect to postgres database: %s" % (e,))
return postgres_conn
def main():
parser = argparse.ArgumentParser(prog="s3_media_upload")
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
update_db_parser = subparsers.add_parser(
"update-db", help="Syncs rows from database to local cache"
)
update_db_parser.add_argument(
"duration",
type=parse_duration,
help="Fetch rows that haven't been accessed in the duration given,"
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
" are d, m or y. NOTE: Currently does not remove entries from the cache",
)
deleted_parser = subparsers.add_parser(
"check-deleted",
help="Check whether files in the local cache still exist under given"
" path",
)
deleted_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
update_parser = subparsers.add_parser(
"update",
help="Updates local cache. Equivalent to running update-db and"
" check-deleted",
)
update_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
update_parser.add_argument(
"duration",
type=parse_duration,
help="Fetch rows that haven't been accessed in the duration given,"
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
" are d, m or y. NOTE: Currently does not remove entries from the cache",
)
write_parser = subparsers.add_parser(
"write",
help="Outputs all files in local cache that we may not have deleted,"
" check-deleted should be run first to update cache.",
)
write_parser.add_argument(
"out",
type=argparse.FileType("w", encoding="UTF-8"),
default="-",
nargs="?",
help="File to output list to, or '-' for stdout",
)
upload_parser = subparsers.add_parser(
"upload", help="Uploads media to s3 based on local cache"
)
upload_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
upload_parser.add_argument("bucket", help="S3 bucket to upload to")
upload_parser.add_argument(
"--storage-class",
help="S3 storage class to use",
nargs="?",
choices=[
"STANDARD",
"REDUCED_REDUNDANCY",
"STANDARD_IA",
"ONEZONE_IA",
],
default="STANDARD",
)
upload_parser.add_argument(
"--delete",
action="store_const",
const=True,
help="Deletes local copy from media store on succesful upload",
)
args = parser.parse_args()
if args.cmd == "write":
sqlite_conn = get_sqlite_conn(parser)
run_write(sqlite_conn, args.out)
return
if args.cmd == "update-db":
sqlite_conn = get_sqlite_conn(parser)
postgres_conn = get_postgres_conn(parser)
run_update_db(postgres_conn, sqlite_conn, args.duration)
return
if args.cmd == "check-deleted":
sqlite_conn = get_sqlite_conn(parser)
run_check_delete(sqlite_conn, args.base_path)
return
if args.cmd == "update":
sqlite_conn = get_sqlite_conn(parser)
postgres_conn = get_postgres_conn(parser)
run_update_db(postgres_conn, sqlite_conn, args.duration)
run_check_delete(sqlite_conn, args.base_path)
return
if args.cmd == "upload":
sqlite_conn = get_sqlite_conn(parser)
s3 = boto3.client("s3")
run_upload(
s3,
args.bucket,
sqlite_conn,
args.base_path,
should_delete=args.delete,
storage_class=args.storage_class,
)
return
parser.error("Valid subcommand must be specified")
if __name__ == "__main__":
main()

View file

@ -8,7 +8,13 @@ setup(
zip_safe=False, zip_safe=False,
url="https://github.com/matrix-org/synapse-s3-storage-provider", url="https://github.com/matrix-org/synapse-s3-storage-provider",
py_modules=['s3_storage_provider'], py_modules=['s3_storage_provider'],
scripts=['scripts/s3_media_upload'],
install_requires=[ install_requires=[
'boto3' 'boto3>=1.9.23<2.0',
'botocore>=1.12.23<2.0',
'humanize>=0.5.1<0.6',
'psycopg2-binary>=2.7.5<3.0',
'PyYAML>=3.13<4.0',
'tqdm>=4.26.0<5.0'
], ],
) )