mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Merge pull request #18 from matrix-org/michaelkaye/add_cleanup_script
Add a s3_media_upload script
This commit is contained in:
commit
ad284721a0
3 changed files with 486 additions and 1 deletions
29
README.md
29
README.md
|
@ -32,3 +32,32 @@ media_storage_providers:
|
|||
This module uses `boto3`, and so the credentials should be specified as
|
||||
described [here](https://boto3.readthedocs.io/en/latest/guide/configuration.html#guide-configuration).
|
||||
|
||||
Regular cleanup job
|
||||
-------------------
|
||||
|
||||
There is additionally a `s3_media_upload.py` which can be used in a regular job to
|
||||
upload content to s3, then delete that from local disk. This script can be used in
|
||||
combination with configuration for the storage provider to pull media from s3, but
|
||||
upload it asynchronously.
|
||||
|
||||
Once the package is installed, the script should be run somewhat like the
|
||||
following. We suggest using tmux or screen as these can take a long time on larger
|
||||
servers.
|
||||
|
||||
More options are available in the command help.
|
||||
|
||||
```
|
||||
> cd s3_media_upload
|
||||
# cache.db will be created if absent. database.yaml is required to contain PG credentials
|
||||
> ls
|
||||
cache.db database.yaml
|
||||
# Update cache from /path/to/media/store looking for files not used within 2 months
|
||||
> s3_media_upload update /path/to/media/store 2m
|
||||
Syncing files that haven't been accessed since: 2018-10-18 11:06:21.520602
|
||||
Synced 0 new rows
|
||||
100%|█████████████████████████████████████████████████████████████| 1074/1074 [00:33<00:00, 25.97files/s]
|
||||
Updated 0 as deleted
|
||||
|
||||
> s3_media_upload upload /path/to/media/store matrix_s3_bucket_name --storage-class STANDARD_IA --delete
|
||||
# prepare to wait a long time
|
||||
```
|
||||
|
|
450
scripts/s3_media_upload
Normal file
450
scripts/s3_media_upload
Normal file
|
@ -0,0 +1,450 @@
|
|||
#!/usr/bin/python
|
||||
import argparse
|
||||
import datetime
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
import boto3
|
||||
import botocore
|
||||
import humanize
|
||||
import psycopg2
|
||||
import tqdm
|
||||
import yaml
|
||||
|
||||
# Schema for our sqlite database cache
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS media (
|
||||
origin TEXT NOT NULL, -- empty string if local media
|
||||
media_id TEXT NOT NULL,
|
||||
filesystem_id TEXT NOT NULL,
|
||||
-- Type is "local" or "remote"
|
||||
type TEXT NOT NULL,
|
||||
known_deleted BOOLEAN NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id);
|
||||
CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted);
|
||||
"""
|
||||
|
||||
|
||||
def parse_duration(string):
|
||||
"""Parse a string into a duration supports suffix of d, m or y.
|
||||
"""
|
||||
suffix = string[-1]
|
||||
number = string[:-1]
|
||||
|
||||
try:
|
||||
number = int(number)
|
||||
except ValueError:
|
||||
raise argparse.ArgumentTypeError(
|
||||
"duration must be an integer followed by a 'd', 'm' or 'y' suffix"
|
||||
)
|
||||
|
||||
now = datetime.datetime.now()
|
||||
if suffix == "d":
|
||||
then = now - datetime.timedelta(days=number)
|
||||
elif suffix == "m":
|
||||
then = now - datetime.timedelta(days=30 * number)
|
||||
elif suffix == "y":
|
||||
then = now - datetime.timedelta(days=365 * number)
|
||||
else:
|
||||
raise argparse.ArgumentTypeError(
|
||||
"duration must end in 'd', 'm' or 'y'"
|
||||
)
|
||||
|
||||
return then
|
||||
|
||||
|
||||
def run_update_db(postgres_conn, sqlite_conn, before_date):
|
||||
"""Entry point for update-db command
|
||||
"""
|
||||
|
||||
local_sql = """
|
||||
SELECT '', media_id, media_id
|
||||
FROM local_media_repository
|
||||
WHERE
|
||||
last_access_ts < %s
|
||||
AND url_cache IS NULL
|
||||
"""
|
||||
|
||||
remote_sql = """
|
||||
SELECT media_origin, media_id, filesystem_id
|
||||
FROM remote_media_cache
|
||||
WHERE
|
||||
last_access_ts < %s
|
||||
"""
|
||||
|
||||
last_access_ts = int(before_date.timestamp() * 1000)
|
||||
|
||||
print(
|
||||
"Syncing files that haven't been accessed since:",
|
||||
before_date.isoformat(" "),
|
||||
)
|
||||
|
||||
update_count = 0
|
||||
with sqlite_conn:
|
||||
sqlite_cur = sqlite_conn.cursor()
|
||||
with postgres_conn.cursor() as pg_curs:
|
||||
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
||||
pg_curs.execute(sql, (last_access_ts,))
|
||||
|
||||
for (origin, media_id, filesystem_id) in pg_curs:
|
||||
sqlite_cur.execute(
|
||||
"""
|
||||
INSERT OR IGNORE INTO media
|
||||
(origin, media_id, filesystem_id, type, known_deleted)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(origin, media_id, filesystem_id, mtype, False),
|
||||
)
|
||||
update_count += sqlite_cur.rowcount
|
||||
|
||||
print("Synced", update_count, "new rows")
|
||||
|
||||
postgres_conn.close()
|
||||
|
||||
|
||||
def run_check_delete(sqlite_conn, base_path):
|
||||
"""Entry point for check-deleted command
|
||||
"""
|
||||
deleted = []
|
||||
for origin, media_id, filesystem_id, m_type in tqdm.tqdm(
|
||||
get_not_deleted(sqlite_conn),
|
||||
unit="files",
|
||||
total=get_not_deleted_count(sqlite_conn),
|
||||
):
|
||||
if m_type == "local":
|
||||
file_path = os.path.join(
|
||||
base_path,
|
||||
"local_content",
|
||||
filesystem_id[:2],
|
||||
filesystem_id[2:4],
|
||||
filesystem_id[4:],
|
||||
)
|
||||
elif m_type == "remote":
|
||||
file_path = os.path.join(
|
||||
base_path,
|
||||
"remote_content",
|
||||
origin,
|
||||
filesystem_id[:2],
|
||||
filesystem_id[2:4],
|
||||
filesystem_id[4:],
|
||||
)
|
||||
else:
|
||||
raise Exception("Unexpected media type %r", m_type)
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
deleted.append((origin, media_id))
|
||||
|
||||
with sqlite_conn:
|
||||
sqlite_conn.executemany(
|
||||
"""
|
||||
UPDATE media SET known_deleted = ?
|
||||
WHERE origin = ? AND media_id = ?
|
||||
""",
|
||||
((True, o, m) for o, m in deleted),
|
||||
)
|
||||
|
||||
print("Updated", len(deleted), "as deleted")
|
||||
|
||||
|
||||
def mark_as_deleted(sqlite_conn, origin, media_id):
|
||||
with sqlite_conn:
|
||||
sqlite_conn.execute(
|
||||
"""
|
||||
UPDATE media SET known_deleted = ?
|
||||
WHERE origin = ? AND media_id = ?
|
||||
""",
|
||||
(True, origin, media_id),
|
||||
)
|
||||
|
||||
|
||||
def get_not_deleted_count(sqlite_conn):
|
||||
"""Get count of all rows in our cache that we don't think have been deleted
|
||||
"""
|
||||
cur = sqlite_conn.cursor()
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT COALESCE(count(*), 0) FROM media
|
||||
WHERE NOT known_deleted
|
||||
"""
|
||||
)
|
||||
count, = cur.fetchone()
|
||||
return count
|
||||
|
||||
|
||||
def get_not_deleted(sqlite_conn):
|
||||
"""Get all rows in our cache that we don't think have been deleted
|
||||
"""
|
||||
cur = sqlite_conn.cursor()
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT origin, media_id, filesystem_id, type FROM media
|
||||
WHERE NOT known_deleted
|
||||
"""
|
||||
)
|
||||
return cur
|
||||
|
||||
|
||||
def to_path(origin, filesystem_id, m_type):
|
||||
"""Get a relative path to the given media
|
||||
"""
|
||||
if m_type == "local":
|
||||
file_path = os.path.join(
|
||||
"local_content",
|
||||
filesystem_id[:2],
|
||||
filesystem_id[2:4],
|
||||
filesystem_id[4:],
|
||||
)
|
||||
elif m_type == "remote":
|
||||
file_path = os.path.join(
|
||||
"remote_content",
|
||||
origin,
|
||||
filesystem_id[:2],
|
||||
filesystem_id[2:4],
|
||||
filesystem_id[4:],
|
||||
)
|
||||
else:
|
||||
raise Exception("Unexpected media type %r", m_type)
|
||||
|
||||
return file_path
|
||||
|
||||
|
||||
def check_file_in_s3(s3, bucket, key):
|
||||
"""Check the file exists in S3 (though it could be different)
|
||||
"""
|
||||
try:
|
||||
s3.head_object(Bucket=bucket, Key=key)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if int(e.response["Error"]["Code"]) == 404:
|
||||
return False
|
||||
raise
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def run_write(sqlite_conn, output_file):
|
||||
"""Entry point for write command
|
||||
"""
|
||||
for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn):
|
||||
file_path = to_path(origin, filesystem_id, m_type)
|
||||
print(file_path, file=output_file)
|
||||
|
||||
|
||||
def run_upload(
|
||||
s3, bucket, sqlite_conn, base_path, should_delete, storage_class
|
||||
):
|
||||
"""Entry point for upload command
|
||||
"""
|
||||
total = get_not_deleted_count(sqlite_conn)
|
||||
|
||||
uploaded = 0
|
||||
uploaded_bytes = 0
|
||||
deleted = 0
|
||||
deleted_bytes = 0
|
||||
|
||||
# This is a progress bar
|
||||
it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total)
|
||||
for origin, media_id, filesystem_id, m_type in it:
|
||||
rel_file_path = to_path(origin, filesystem_id, m_type)
|
||||
|
||||
local_path = os.path.join(base_path, rel_file_path)
|
||||
path_exists = os.path.exists(local_path)
|
||||
if not path_exists:
|
||||
mark_as_deleted(sqlite_conn, origin, media_id)
|
||||
continue
|
||||
|
||||
if not check_file_in_s3(s3, bucket, rel_file_path):
|
||||
try:
|
||||
s3.upload_file(
|
||||
local_path,
|
||||
bucket,
|
||||
rel_file_path,
|
||||
ExtraArgs={"StorageClass": storage_class},
|
||||
)
|
||||
except Exception as e:
|
||||
print("Failed to upload file %s: %s", local_path, e)
|
||||
continue
|
||||
|
||||
uploaded += 1
|
||||
uploaded_bytes += os.path.getsize(local_path)
|
||||
|
||||
if should_delete:
|
||||
size = os.path.getsize(local_path)
|
||||
os.remove(local_path)
|
||||
|
||||
try:
|
||||
# This may have lead to an empty directory, so lets remove all
|
||||
# that are empty
|
||||
os.removedirs(os.path.dirname(local_path))
|
||||
except Exception:
|
||||
# The directory might not be empty, or maybe we don't have
|
||||
# permission. Either way doesn't really matter.
|
||||
pass
|
||||
|
||||
mark_as_deleted(sqlite_conn, origin, media_id)
|
||||
|
||||
deleted += 1
|
||||
deleted_bytes += size
|
||||
|
||||
print("Uploaded", uploaded, "files out of", total)
|
||||
print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True))
|
||||
print("Deleted", deleted, "files")
|
||||
print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True))
|
||||
|
||||
|
||||
def get_sqlite_conn(parser):
|
||||
"""Attempt to get a sqlite connection to cache.db, or exit.
|
||||
"""
|
||||
try:
|
||||
sqlite_conn = sqlite3.connect("cache.db")
|
||||
sqlite_conn.executescript(SCHEMA)
|
||||
except sqlite3.Error as e:
|
||||
parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,))
|
||||
|
||||
return sqlite_conn
|
||||
|
||||
|
||||
def get_postgres_conn(parser):
|
||||
"""Attempt to get a postgres connection based on database.yaml, or exit.
|
||||
"""
|
||||
try:
|
||||
database_yaml = yaml.load(open("database.yaml"))
|
||||
except FileNotFoundError:
|
||||
parser.error("Could not find database.yaml")
|
||||
except yaml.YAMLError as e:
|
||||
parser.error("database.yaml is not valid yaml: %s" % (e,))
|
||||
|
||||
try:
|
||||
postgres_conn = psycopg2.connect(**database_yaml)
|
||||
except psycopg2.Error as e:
|
||||
parser.error("Could not connect to postgres database: %s" % (e,))
|
||||
|
||||
return postgres_conn
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="s3_media_upload")
|
||||
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
|
||||
|
||||
update_db_parser = subparsers.add_parser(
|
||||
"update-db", help="Syncs rows from database to local cache"
|
||||
)
|
||||
update_db_parser.add_argument(
|
||||
"duration",
|
||||
type=parse_duration,
|
||||
help="Fetch rows that haven't been accessed in the duration given,"
|
||||
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
|
||||
" are d, m or y. NOTE: Currently does not remove entries from the cache",
|
||||
)
|
||||
|
||||
deleted_parser = subparsers.add_parser(
|
||||
"check-deleted",
|
||||
help="Check whether files in the local cache still exist under given"
|
||||
" path",
|
||||
)
|
||||
deleted_parser.add_argument(
|
||||
"base_path", help="Base path of the media store directory"
|
||||
)
|
||||
|
||||
update_parser = subparsers.add_parser(
|
||||
"update",
|
||||
help="Updates local cache. Equivalent to running update-db and"
|
||||
" check-deleted",
|
||||
)
|
||||
update_parser.add_argument(
|
||||
"base_path", help="Base path of the media store directory"
|
||||
)
|
||||
update_parser.add_argument(
|
||||
"duration",
|
||||
type=parse_duration,
|
||||
help="Fetch rows that haven't been accessed in the duration given,"
|
||||
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
|
||||
" are d, m or y. NOTE: Currently does not remove entries from the cache",
|
||||
)
|
||||
|
||||
write_parser = subparsers.add_parser(
|
||||
"write",
|
||||
help="Outputs all files in local cache that we may not have deleted,"
|
||||
" check-deleted should be run first to update cache.",
|
||||
)
|
||||
write_parser.add_argument(
|
||||
"out",
|
||||
type=argparse.FileType("w", encoding="UTF-8"),
|
||||
default="-",
|
||||
nargs="?",
|
||||
help="File to output list to, or '-' for stdout",
|
||||
)
|
||||
|
||||
upload_parser = subparsers.add_parser(
|
||||
"upload", help="Uploads media to s3 based on local cache"
|
||||
)
|
||||
upload_parser.add_argument(
|
||||
"base_path", help="Base path of the media store directory"
|
||||
)
|
||||
upload_parser.add_argument("bucket", help="S3 bucket to upload to")
|
||||
upload_parser.add_argument(
|
||||
"--storage-class",
|
||||
help="S3 storage class to use",
|
||||
nargs="?",
|
||||
choices=[
|
||||
"STANDARD",
|
||||
"REDUCED_REDUNDANCY",
|
||||
"STANDARD_IA",
|
||||
"ONEZONE_IA",
|
||||
],
|
||||
default="STANDARD",
|
||||
)
|
||||
|
||||
upload_parser.add_argument(
|
||||
"--delete",
|
||||
action="store_const",
|
||||
const=True,
|
||||
help="Deletes local copy from media store on succesful upload",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.cmd == "write":
|
||||
sqlite_conn = get_sqlite_conn(parser)
|
||||
run_write(sqlite_conn, args.out)
|
||||
return
|
||||
|
||||
if args.cmd == "update-db":
|
||||
sqlite_conn = get_sqlite_conn(parser)
|
||||
postgres_conn = get_postgres_conn(parser)
|
||||
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
||||
return
|
||||
|
||||
if args.cmd == "check-deleted":
|
||||
sqlite_conn = get_sqlite_conn(parser)
|
||||
run_check_delete(sqlite_conn, args.base_path)
|
||||
return
|
||||
|
||||
if args.cmd == "update":
|
||||
sqlite_conn = get_sqlite_conn(parser)
|
||||
postgres_conn = get_postgres_conn(parser)
|
||||
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
||||
run_check_delete(sqlite_conn, args.base_path)
|
||||
return
|
||||
|
||||
if args.cmd == "upload":
|
||||
sqlite_conn = get_sqlite_conn(parser)
|
||||
s3 = boto3.client("s3")
|
||||
run_upload(
|
||||
s3,
|
||||
args.bucket,
|
||||
sqlite_conn,
|
||||
args.base_path,
|
||||
should_delete=args.delete,
|
||||
storage_class=args.storage_class,
|
||||
)
|
||||
return
|
||||
|
||||
parser.error("Valid subcommand must be specified")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
8
setup.py
8
setup.py
|
@ -8,7 +8,13 @@ setup(
|
|||
zip_safe=False,
|
||||
url="https://github.com/matrix-org/synapse-s3-storage-provider",
|
||||
py_modules=['s3_storage_provider'],
|
||||
scripts=['scripts/s3_media_upload'],
|
||||
install_requires=[
|
||||
'boto3'
|
||||
'boto3>=1.9.23<2.0',
|
||||
'botocore>=1.12.23<2.0',
|
||||
'humanize>=0.5.1<0.6',
|
||||
'psycopg2-binary>=2.7.5<3.0',
|
||||
'PyYAML>=3.13<4.0',
|
||||
'tqdm>=4.26.0<5.0'
|
||||
],
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue