mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
474 lines
14 KiB
Python
474 lines
14 KiB
Python
#!/usr/bin/python
|
|
import argparse
|
|
import datetime
|
|
import os
|
|
import sqlite3
|
|
|
|
import boto3
|
|
import botocore
|
|
import humanize
|
|
import psycopg2
|
|
import tqdm
|
|
import yaml
|
|
import sys
|
|
|
|
# Schema for our sqlite database cache
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS media (
|
|
origin TEXT NOT NULL, -- empty string if local media
|
|
media_id TEXT NOT NULL,
|
|
filesystem_id TEXT NOT NULL,
|
|
-- Type is "local" or "remote"
|
|
type TEXT NOT NULL,
|
|
known_deleted BOOLEAN NOT NULL
|
|
);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id);
|
|
CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted);
|
|
"""
|
|
|
|
progress = True
|
|
|
|
def parse_duration(string):
|
|
"""Parse a string into a duration supports suffix of d, m or y.
|
|
"""
|
|
suffix = string[-1]
|
|
number = string[:-1]
|
|
|
|
try:
|
|
number = int(number)
|
|
except ValueError:
|
|
raise argparse.ArgumentTypeError(
|
|
"duration must be an integer followed by a 'd', 'm' or 'y' suffix"
|
|
)
|
|
|
|
now = datetime.datetime.now()
|
|
if suffix == "d":
|
|
then = now - datetime.timedelta(days=number)
|
|
elif suffix == "m":
|
|
then = now - datetime.timedelta(days=30 * number)
|
|
elif suffix == "y":
|
|
then = now - datetime.timedelta(days=365 * number)
|
|
else:
|
|
raise argparse.ArgumentTypeError(
|
|
"duration must end in 'd', 'm' or 'y'"
|
|
)
|
|
|
|
return then
|
|
|
|
|
|
def run_update_db(postgres_conn, sqlite_conn, before_date):
|
|
"""Entry point for update-db command
|
|
"""
|
|
|
|
local_sql = """
|
|
SELECT '', media_id, media_id
|
|
FROM local_media_repository
|
|
WHERE
|
|
COALESCE(last_access_ts, created_ts) < %s
|
|
AND url_cache IS NULL
|
|
"""
|
|
|
|
remote_sql = """
|
|
SELECT media_origin, media_id, filesystem_id
|
|
FROM remote_media_cache
|
|
WHERE
|
|
COALESCE(last_access_ts, created_ts) < %s
|
|
"""
|
|
|
|
last_access_ts = int(before_date.timestamp() * 1000)
|
|
|
|
print(
|
|
"Syncing files that haven't been accessed since:",
|
|
before_date.isoformat(" "),
|
|
)
|
|
|
|
update_count = 0
|
|
with sqlite_conn:
|
|
sqlite_cur = sqlite_conn.cursor()
|
|
with postgres_conn.cursor() as pg_curs:
|
|
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
|
pg_curs.execute(sql, (last_access_ts,))
|
|
|
|
for (origin, media_id, filesystem_id) in pg_curs:
|
|
sqlite_cur.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO media
|
|
(origin, media_id, filesystem_id, type, known_deleted)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(origin, media_id, filesystem_id, mtype, False),
|
|
)
|
|
update_count += sqlite_cur.rowcount
|
|
|
|
print("Synced", update_count, "new rows")
|
|
|
|
postgres_conn.close()
|
|
|
|
|
|
def run_check_delete(sqlite_conn, base_path):
|
|
"""Entry point for check-deleted command
|
|
"""
|
|
deleted = []
|
|
if progress:
|
|
it = tqdm.tqdm(
|
|
get_not_deleted(sqlite_conn),
|
|
unit="files",
|
|
total=get_not_deleted_count(sqlite_conn)
|
|
)
|
|
else:
|
|
it = get_not_deleted(sqlite_conn)
|
|
print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files")
|
|
|
|
for origin, media_id, filesystem_id, m_type in it:
|
|
if m_type == "local":
|
|
file_path = os.path.join(
|
|
base_path,
|
|
"local_content",
|
|
filesystem_id[:2],
|
|
filesystem_id[2:4],
|
|
filesystem_id[4:],
|
|
)
|
|
elif m_type == "remote":
|
|
file_path = os.path.join(
|
|
base_path,
|
|
"remote_content",
|
|
origin,
|
|
filesystem_id[:2],
|
|
filesystem_id[2:4],
|
|
filesystem_id[4:],
|
|
)
|
|
else:
|
|
raise Exception("Unexpected media type %r", m_type)
|
|
|
|
if not os.path.exists(file_path):
|
|
deleted.append((origin, media_id))
|
|
|
|
with sqlite_conn:
|
|
sqlite_conn.executemany(
|
|
"""
|
|
UPDATE media SET known_deleted = ?
|
|
WHERE origin = ? AND media_id = ?
|
|
""",
|
|
((True, o, m) for o, m in deleted),
|
|
)
|
|
|
|
print("Updated", len(deleted), "as deleted")
|
|
|
|
|
|
def mark_as_deleted(sqlite_conn, origin, media_id):
|
|
with sqlite_conn:
|
|
sqlite_conn.execute(
|
|
"""
|
|
UPDATE media SET known_deleted = ?
|
|
WHERE origin = ? AND media_id = ?
|
|
""",
|
|
(True, origin, media_id),
|
|
)
|
|
|
|
|
|
def get_not_deleted_count(sqlite_conn):
|
|
"""Get count of all rows in our cache that we don't think have been deleted
|
|
"""
|
|
cur = sqlite_conn.cursor()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT COALESCE(count(*), 0) FROM media
|
|
WHERE NOT known_deleted
|
|
"""
|
|
)
|
|
count, = cur.fetchone()
|
|
return count
|
|
|
|
|
|
def get_not_deleted(sqlite_conn):
|
|
"""Get all rows in our cache that we don't think have been deleted
|
|
"""
|
|
cur = sqlite_conn.cursor()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT origin, media_id, filesystem_id, type FROM media
|
|
WHERE NOT known_deleted
|
|
"""
|
|
)
|
|
return cur
|
|
|
|
|
|
def to_path(origin, filesystem_id, m_type):
|
|
"""Get a relative path to the given media
|
|
"""
|
|
if m_type == "local":
|
|
file_path = os.path.join(
|
|
"local_content",
|
|
filesystem_id[:2],
|
|
filesystem_id[2:4],
|
|
filesystem_id[4:],
|
|
)
|
|
elif m_type == "remote":
|
|
file_path = os.path.join(
|
|
"remote_content",
|
|
origin,
|
|
filesystem_id[:2],
|
|
filesystem_id[2:4],
|
|
filesystem_id[4:],
|
|
)
|
|
else:
|
|
raise Exception("Unexpected media type %r", m_type)
|
|
|
|
return file_path
|
|
|
|
|
|
def check_file_in_s3(s3, bucket, key):
|
|
"""Check the file exists in S3 (though it could be different)
|
|
"""
|
|
try:
|
|
s3.head_object(Bucket=bucket, Key=key)
|
|
except botocore.exceptions.ClientError as e:
|
|
if int(e.response["Error"]["Code"]) == 404:
|
|
return False
|
|
raise
|
|
|
|
return True
|
|
|
|
|
|
def run_write(sqlite_conn, output_file):
|
|
"""Entry point for write command
|
|
"""
|
|
for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn):
|
|
file_path = to_path(origin, filesystem_id, m_type)
|
|
print(file_path, file=output_file)
|
|
|
|
|
|
def run_upload(
|
|
s3, bucket, sqlite_conn, base_path, should_delete, storage_class
|
|
):
|
|
"""Entry point for upload command
|
|
"""
|
|
total = get_not_deleted_count(sqlite_conn)
|
|
|
|
uploaded = 0
|
|
uploaded_bytes = 0
|
|
deleted = 0
|
|
deleted_bytes = 0
|
|
|
|
# This is a progress bar
|
|
if progress:
|
|
it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total)
|
|
else:
|
|
print("Uploading ", total, " files")
|
|
it = get_not_deleted(sqlite_conn)
|
|
|
|
for origin, media_id, filesystem_id, m_type in it:
|
|
rel_file_path = to_path(origin, filesystem_id, m_type)
|
|
|
|
local_path = os.path.join(base_path, rel_file_path)
|
|
path_exists = os.path.exists(local_path)
|
|
if not path_exists:
|
|
mark_as_deleted(sqlite_conn, origin, media_id)
|
|
continue
|
|
|
|
if not check_file_in_s3(s3, bucket, rel_file_path):
|
|
try:
|
|
s3.upload_file(
|
|
local_path,
|
|
bucket,
|
|
rel_file_path,
|
|
ExtraArgs={"StorageClass": storage_class},
|
|
)
|
|
except Exception as e:
|
|
print("Failed to upload file %s: %s", local_path, e)
|
|
continue
|
|
|
|
uploaded += 1
|
|
uploaded_bytes += os.path.getsize(local_path)
|
|
|
|
if should_delete:
|
|
size = os.path.getsize(local_path)
|
|
os.remove(local_path)
|
|
|
|
try:
|
|
# This may have lead to an empty directory, so lets remove all
|
|
# that are empty
|
|
os.removedirs(os.path.dirname(local_path))
|
|
except Exception:
|
|
# The directory might not be empty, or maybe we don't have
|
|
# permission. Either way doesn't really matter.
|
|
pass
|
|
|
|
mark_as_deleted(sqlite_conn, origin, media_id)
|
|
|
|
deleted += 1
|
|
deleted_bytes += size
|
|
|
|
print("Uploaded", uploaded, "files out of", total)
|
|
print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True))
|
|
print("Deleted", deleted, "files")
|
|
print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True))
|
|
|
|
|
|
def get_sqlite_conn(parser):
|
|
"""Attempt to get a sqlite connection to cache.db, or exit.
|
|
"""
|
|
try:
|
|
sqlite_conn = sqlite3.connect("cache.db")
|
|
sqlite_conn.executescript(SCHEMA)
|
|
except sqlite3.Error as e:
|
|
parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,))
|
|
|
|
return sqlite_conn
|
|
|
|
|
|
def get_postgres_conn(parser):
|
|
"""Attempt to get a postgres connection based on database.yaml, or exit.
|
|
"""
|
|
try:
|
|
database_yaml = yaml.safe_load(open("database.yaml"))
|
|
except FileNotFoundError:
|
|
parser.error("Could not find database.yaml")
|
|
except yaml.YAMLError as e:
|
|
parser.error("database.yaml is not valid yaml: %s" % (e,))
|
|
|
|
try:
|
|
postgres_conn = psycopg2.connect(**database_yaml)
|
|
except psycopg2.Error as e:
|
|
parser.error("Could not connect to postgres database: %s" % (e,))
|
|
|
|
return postgres_conn
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(prog="s3_media_upload")
|
|
parser.add_argument("--no-progress", help="do not show progress bars", action="store_true", dest="no_progress")
|
|
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
|
|
|
|
update_db_parser = subparsers.add_parser(
|
|
"update-db", help="Syncs rows from database to local cache"
|
|
)
|
|
update_db_parser.add_argument(
|
|
"duration",
|
|
type=parse_duration,
|
|
help="Fetch rows that haven't been accessed in the duration given,"
|
|
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
|
|
" are d, m or y. NOTE: Currently does not remove entries from the cache",
|
|
)
|
|
|
|
deleted_parser = subparsers.add_parser(
|
|
"check-deleted",
|
|
help="Check whether files in the local cache still exist under given"
|
|
" path",
|
|
)
|
|
deleted_parser.add_argument(
|
|
"base_path", help="Base path of the media store directory"
|
|
)
|
|
|
|
update_parser = subparsers.add_parser(
|
|
"update",
|
|
help="Updates local cache. Equivalent to running update-db and"
|
|
" check-deleted",
|
|
)
|
|
update_parser.add_argument(
|
|
"base_path", help="Base path of the media store directory"
|
|
)
|
|
update_parser.add_argument(
|
|
"duration",
|
|
type=parse_duration,
|
|
help="Fetch rows that haven't been accessed in the duration given,"
|
|
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
|
|
" are d, m or y. NOTE: Currently does not remove entries from the cache",
|
|
)
|
|
|
|
write_parser = subparsers.add_parser(
|
|
"write",
|
|
help="Outputs all files in local cache that we may not have deleted,"
|
|
" check-deleted should be run first to update cache.",
|
|
)
|
|
write_parser.add_argument(
|
|
"out",
|
|
type=argparse.FileType("w", encoding="UTF-8"),
|
|
default="-",
|
|
nargs="?",
|
|
help="File to output list to, or '-' for stdout",
|
|
)
|
|
|
|
upload_parser = subparsers.add_parser(
|
|
"upload", help="Uploads media to s3 based on local cache"
|
|
)
|
|
upload_parser.add_argument(
|
|
"base_path", help="Base path of the media store directory"
|
|
)
|
|
upload_parser.add_argument("bucket", help="S3 bucket to upload to")
|
|
upload_parser.add_argument(
|
|
"--storage-class",
|
|
help="S3 storage class to use",
|
|
nargs="?",
|
|
choices=[
|
|
"STANDARD",
|
|
"REDUCED_REDUNDANCY",
|
|
"STANDARD_IA",
|
|
"ONEZONE_IA",
|
|
"INTELLIGENT_TIERING",
|
|
],
|
|
default="STANDARD",
|
|
)
|
|
|
|
upload_parser.add_argument(
|
|
"--delete",
|
|
action="store_const",
|
|
const=True,
|
|
help="Deletes local copy from media store on succesful upload",
|
|
)
|
|
|
|
upload_parser.add_argument(
|
|
"--endpoint-url",
|
|
help="S3 endpoint url to use",
|
|
default=None
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
if args.no_progress:
|
|
global progress
|
|
progress = False
|
|
|
|
if args.cmd == "write":
|
|
sqlite_conn = get_sqlite_conn(parser)
|
|
run_write(sqlite_conn, args.out)
|
|
return
|
|
|
|
if args.cmd == "update-db":
|
|
sqlite_conn = get_sqlite_conn(parser)
|
|
postgres_conn = get_postgres_conn(parser)
|
|
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
|
return
|
|
|
|
if args.cmd == "check-deleted":
|
|
sqlite_conn = get_sqlite_conn(parser)
|
|
run_check_delete(sqlite_conn, args.base_path)
|
|
return
|
|
|
|
if args.cmd == "update":
|
|
sqlite_conn = get_sqlite_conn(parser)
|
|
postgres_conn = get_postgres_conn(parser)
|
|
run_update_db(postgres_conn, sqlite_conn, args.duration)
|
|
run_check_delete(sqlite_conn, args.base_path)
|
|
return
|
|
|
|
if args.cmd == "upload":
|
|
sqlite_conn = get_sqlite_conn(parser)
|
|
s3 = boto3.client("s3", endpoint_url=args.endpoint_url)
|
|
run_upload(
|
|
s3,
|
|
args.bucket,
|
|
sqlite_conn,
|
|
args.base_path,
|
|
should_delete=args.delete,
|
|
storage_class=args.storage_class,
|
|
)
|
|
return
|
|
|
|
parser.error("Valid subcommand must be specified")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|