synapse-s3-storage-provider/scripts/s3_media_upload

457 lines
13 KiB
Text
Raw Normal View History

2019-06-03 14:06:06 +00:00
#!/usr/bin/python
import argparse
import datetime
import os
import sqlite3
import boto3
import botocore
import humanize
import psycopg2
import tqdm
import yaml
# Schema for our sqlite database cache
SCHEMA = """
CREATE TABLE IF NOT EXISTS media (
origin TEXT NOT NULL, -- empty string if local media
media_id TEXT NOT NULL,
filesystem_id TEXT NOT NULL,
-- Type is "local" or "remote"
type TEXT NOT NULL,
known_deleted BOOLEAN NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS media_id_idx ON media(origin, media_id);
CREATE INDEX IF NOT EXISTS deleted_idx ON media(known_deleted);
"""
def parse_duration(string):
"""Parse a string into a duration supports suffix of d, m or y.
"""
suffix = string[-1]
number = string[:-1]
try:
number = int(number)
except ValueError:
raise argparse.ArgumentTypeError(
"duration must be an integer followed by a 'd', 'm' or 'y' suffix"
)
now = datetime.datetime.now()
if suffix == "d":
then = now - datetime.timedelta(days=number)
elif suffix == "m":
then = now - datetime.timedelta(days=30 * number)
elif suffix == "y":
then = now - datetime.timedelta(days=365 * number)
else:
raise argparse.ArgumentTypeError(
"duration must end in 'd', 'm' or 'y'"
)
return then
def run_update_db(postgres_conn, sqlite_conn, before_date):
"""Entry point for update-db command
"""
local_sql = """
SELECT '', media_id, media_id
FROM local_media_repository
WHERE
COALESCE(last_access_ts, created_ts) < %s
2019-06-03 14:06:06 +00:00
AND url_cache IS NULL
"""
remote_sql = """
SELECT media_origin, media_id, filesystem_id
FROM remote_media_cache
WHERE
COALESCE(last_access_ts, created_ts) < %s
2019-06-03 14:06:06 +00:00
"""
last_access_ts = int(before_date.timestamp() * 1000)
print(
"Syncing files that haven't been accessed since:",
before_date.isoformat(" "),
)
update_count = 0
with sqlite_conn:
sqlite_cur = sqlite_conn.cursor()
with postgres_conn.cursor() as pg_curs:
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
pg_curs.execute(sql, (last_access_ts,))
for (origin, media_id, filesystem_id) in pg_curs:
sqlite_cur.execute(
"""
INSERT OR IGNORE INTO media
(origin, media_id, filesystem_id, type, known_deleted)
VALUES (?, ?, ?, ?, ?)
""",
(origin, media_id, filesystem_id, mtype, False),
)
update_count += sqlite_cur.rowcount
print("Synced", update_count, "new rows")
postgres_conn.close()
def run_check_delete(sqlite_conn, base_path):
"""Entry point for check-deleted command
"""
deleted = []
for origin, media_id, filesystem_id, m_type in tqdm.tqdm(
get_not_deleted(sqlite_conn),
unit="files",
total=get_not_deleted_count(sqlite_conn),
):
if m_type == "local":
file_path = os.path.join(
base_path,
"local_content",
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
elif m_type == "remote":
file_path = os.path.join(
base_path,
"remote_content",
origin,
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
else:
raise Exception("Unexpected media type %r", m_type)
if not os.path.exists(file_path):
deleted.append((origin, media_id))
with sqlite_conn:
sqlite_conn.executemany(
"""
UPDATE media SET known_deleted = ?
WHERE origin = ? AND media_id = ?
""",
((True, o, m) for o, m in deleted),
)
print("Updated", len(deleted), "as deleted")
def mark_as_deleted(sqlite_conn, origin, media_id):
with sqlite_conn:
sqlite_conn.execute(
"""
UPDATE media SET known_deleted = ?
WHERE origin = ? AND media_id = ?
""",
(True, origin, media_id),
)
def get_not_deleted_count(sqlite_conn):
"""Get count of all rows in our cache that we don't think have been deleted
"""
cur = sqlite_conn.cursor()
cur.execute(
"""
SELECT COALESCE(count(*), 0) FROM media
WHERE NOT known_deleted
"""
)
count, = cur.fetchone()
return count
def get_not_deleted(sqlite_conn):
"""Get all rows in our cache that we don't think have been deleted
"""
cur = sqlite_conn.cursor()
cur.execute(
"""
SELECT origin, media_id, filesystem_id, type FROM media
WHERE NOT known_deleted
"""
)
return cur
def to_path(origin, filesystem_id, m_type):
"""Get a relative path to the given media
"""
if m_type == "local":
file_path = os.path.join(
"local_content",
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
elif m_type == "remote":
file_path = os.path.join(
"remote_content",
origin,
filesystem_id[:2],
filesystem_id[2:4],
filesystem_id[4:],
)
else:
raise Exception("Unexpected media type %r", m_type)
return file_path
def check_file_in_s3(s3, bucket, key):
"""Check the file exists in S3 (though it could be different)
"""
try:
s3.head_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as e:
if int(e.response["Error"]["Code"]) == 404:
return False
raise
return True
def run_write(sqlite_conn, output_file):
"""Entry point for write command
"""
for origin, _, filesystem_id, m_type in get_not_deleted(sqlite_conn):
file_path = to_path(origin, filesystem_id, m_type)
print(file_path, file=output_file)
def run_upload(
s3, bucket, sqlite_conn, base_path, should_delete, storage_class
):
"""Entry point for upload command
"""
total = get_not_deleted_count(sqlite_conn)
uploaded = 0
uploaded_bytes = 0
deleted = 0
deleted_bytes = 0
# This is a progress bar
it = tqdm.tqdm(get_not_deleted(sqlite_conn), unit="files", total=total)
for origin, media_id, filesystem_id, m_type in it:
rel_file_path = to_path(origin, filesystem_id, m_type)
local_path = os.path.join(base_path, rel_file_path)
path_exists = os.path.exists(local_path)
if not path_exists:
mark_as_deleted(sqlite_conn, origin, media_id)
continue
if not check_file_in_s3(s3, bucket, rel_file_path):
try:
s3.upload_file(
local_path,
bucket,
rel_file_path,
ExtraArgs={"StorageClass": storage_class},
)
except Exception as e:
print("Failed to upload file %s: %s", local_path, e)
continue
uploaded += 1
uploaded_bytes += os.path.getsize(local_path)
if should_delete:
size = os.path.getsize(local_path)
os.remove(local_path)
try:
# This may have lead to an empty directory, so lets remove all
# that are empty
os.removedirs(os.path.dirname(local_path))
except Exception:
# The directory might not be empty, or maybe we don't have
# permission. Either way doesn't really matter.
pass
mark_as_deleted(sqlite_conn, origin, media_id)
deleted += 1
deleted_bytes += size
print("Uploaded", uploaded, "files out of", total)
print("Uploaded", humanize.naturalsize(uploaded_bytes, gnu=True))
print("Deleted", deleted, "files")
print("Deleted", humanize.naturalsize(deleted_bytes, gnu=True))
def get_sqlite_conn(parser):
"""Attempt to get a sqlite connection to cache.db, or exit.
"""
try:
sqlite_conn = sqlite3.connect("cache.db")
sqlite_conn.executescript(SCHEMA)
except sqlite3.Error as e:
parser.error("Could not open 'cache.db' as sqlite DB: %s" % (e,))
return sqlite_conn
def get_postgres_conn(parser):
"""Attempt to get a postgres connection based on database.yaml, or exit.
"""
try:
database_yaml = yaml.safe_load(open("database.yaml"))
2019-06-03 14:06:06 +00:00
except FileNotFoundError:
parser.error("Could not find database.yaml")
except yaml.YAMLError as e:
parser.error("database.yaml is not valid yaml: %s" % (e,))
try:
postgres_conn = psycopg2.connect(**database_yaml)
except psycopg2.Error as e:
parser.error("Could not connect to postgres database: %s" % (e,))
return postgres_conn
def main():
parser = argparse.ArgumentParser(prog="s3_media_upload")
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
update_db_parser = subparsers.add_parser(
"update-db", help="Syncs rows from database to local cache"
)
update_db_parser.add_argument(
"duration",
type=parse_duration,
help="Fetch rows that haven't been accessed in the duration given,"
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
" are d, m or y. NOTE: Currently does not remove entries from the cache",
)
deleted_parser = subparsers.add_parser(
"check-deleted",
help="Check whether files in the local cache still exist under given"
" path",
)
deleted_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
update_parser = subparsers.add_parser(
"update",
help="Updates local cache. Equivalent to running update-db and"
" check-deleted",
)
update_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
update_parser.add_argument(
"duration",
type=parse_duration,
help="Fetch rows that haven't been accessed in the duration given,"
" accepts duration of the form of e.g. 1m (one month). Valid suffixes"
" are d, m or y. NOTE: Currently does not remove entries from the cache",
)
write_parser = subparsers.add_parser(
"write",
help="Outputs all files in local cache that we may not have deleted,"
" check-deleted should be run first to update cache.",
)
write_parser.add_argument(
"out",
type=argparse.FileType("w", encoding="UTF-8"),
default="-",
nargs="?",
help="File to output list to, or '-' for stdout",
)
upload_parser = subparsers.add_parser(
"upload", help="Uploads media to s3 based on local cache"
)
upload_parser.add_argument(
"base_path", help="Base path of the media store directory"
)
upload_parser.add_argument("bucket", help="S3 bucket to upload to")
upload_parser.add_argument(
"--storage-class",
help="S3 storage class to use",
nargs="?",
choices=[
"STANDARD",
"REDUCED_REDUNDANCY",
"STANDARD_IA",
"ONEZONE_IA",
],
default="STANDARD",
)
upload_parser.add_argument(
"--delete",
action="store_const",
const=True,
help="Deletes local copy from media store on succesful upload",
)
2020-11-07 13:12:09 +00:00
upload_parser.add_argument(
"--endpoint-url",
help="S3 endpoint url to use",
default=None
)
2019-06-03 14:06:06 +00:00
args = parser.parse_args()
if args.cmd == "write":
sqlite_conn = get_sqlite_conn(parser)
run_write(sqlite_conn, args.out)
return
if args.cmd == "update-db":
sqlite_conn = get_sqlite_conn(parser)
postgres_conn = get_postgres_conn(parser)
run_update_db(postgres_conn, sqlite_conn, args.duration)
return
if args.cmd == "check-deleted":
sqlite_conn = get_sqlite_conn(parser)
run_check_delete(sqlite_conn, args.base_path)
return
if args.cmd == "update":
sqlite_conn = get_sqlite_conn(parser)
postgres_conn = get_postgres_conn(parser)
run_update_db(postgres_conn, sqlite_conn, args.duration)
run_check_delete(sqlite_conn, args.base_path)
return
if args.cmd == "upload":
sqlite_conn = get_sqlite_conn(parser)
2020-11-07 13:12:09 +00:00
s3 = boto3.client("s3", endpoint_url=args.endpoint_url)
2019-06-03 14:06:06 +00:00
run_upload(
s3,
args.bucket,
sqlite_conn,
args.base_path,
should_delete=args.delete,
storage_class=args.storage_class,
)
return
parser.error("Valid subcommand must be specified")
if __name__ == "__main__":
main()