mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Misc improvements to scripts/s3_media_upload
and fix CI (#59)
scripts/s3_media_upload: * Mark `scripts/s3_media_upload` as executable * Fix `scripts/s3_media_upload` shebang to respect virtual environments * Format `scripts/s3_media_upload` * Remove unused imports from `scripts/s3_media_upload` * Include `scripts/s3_media_upload` in CI checks * Refactor `s3_media_upload`'s `run_check_delete` to use `to_path` instead of duplicating code CI: * Fix branch names in CI config
This commit is contained in:
parent
3c3fafd6a2
commit
a5b15d644d
3 changed files with 102 additions and 126 deletions
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
|
@ -2,9 +2,9 @@ name: CI
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches: [ master ]
|
branches: [ main ]
|
||||||
pull_request:
|
pull_request:
|
||||||
branches: [ master ]
|
branches: [ main ]
|
||||||
|
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
|
|
||||||
|
|
212
scripts/s3_media_upload
Normal file → Executable file
212
scripts/s3_media_upload
Normal file → Executable file
|
@ -1,4 +1,4 @@
|
||||||
#!/usr/bin/python
|
#!/usr/bin/env python
|
||||||
import argparse
|
import argparse
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
|
@ -10,7 +10,6 @@ import humanize
|
||||||
import psycopg2
|
import psycopg2
|
||||||
import tqdm
|
import tqdm
|
||||||
import yaml
|
import yaml
|
||||||
import sys
|
|
||||||
|
|
||||||
# Schema for our sqlite database cache
|
# Schema for our sqlite database cache
|
||||||
SCHEMA = """
|
SCHEMA = """
|
||||||
|
@ -29,6 +28,7 @@ SCHEMA = """
|
||||||
|
|
||||||
progress = True
|
progress = True
|
||||||
|
|
||||||
|
|
||||||
def parse_duration(string):
|
def parse_duration(string):
|
||||||
"""Parse a string into a duration supports suffix of d, m or y.
|
"""Parse a string into a duration supports suffix of d, m or y.
|
||||||
"""
|
"""
|
||||||
|
@ -50,112 +50,11 @@ def parse_duration(string):
|
||||||
elif suffix == "y":
|
elif suffix == "y":
|
||||||
then = now - datetime.timedelta(days=365 * number)
|
then = now - datetime.timedelta(days=365 * number)
|
||||||
else:
|
else:
|
||||||
raise argparse.ArgumentTypeError(
|
raise argparse.ArgumentTypeError("duration must end in 'd', 'm' or 'y'")
|
||||||
"duration must end in 'd', 'm' or 'y'"
|
|
||||||
)
|
|
||||||
|
|
||||||
return then
|
return then
|
||||||
|
|
||||||
|
|
||||||
def run_update_db(postgres_conn, sqlite_conn, before_date):
|
|
||||||
"""Entry point for update-db command
|
|
||||||
"""
|
|
||||||
|
|
||||||
local_sql = """
|
|
||||||
SELECT '', media_id, media_id
|
|
||||||
FROM local_media_repository
|
|
||||||
WHERE
|
|
||||||
COALESCE(last_access_ts, created_ts) < %s
|
|
||||||
AND url_cache IS NULL
|
|
||||||
"""
|
|
||||||
|
|
||||||
remote_sql = """
|
|
||||||
SELECT media_origin, media_id, filesystem_id
|
|
||||||
FROM remote_media_cache
|
|
||||||
WHERE
|
|
||||||
COALESCE(last_access_ts, created_ts) < %s
|
|
||||||
"""
|
|
||||||
|
|
||||||
last_access_ts = int(before_date.timestamp() * 1000)
|
|
||||||
|
|
||||||
print(
|
|
||||||
"Syncing files that haven't been accessed since:",
|
|
||||||
before_date.isoformat(" "),
|
|
||||||
)
|
|
||||||
|
|
||||||
update_count = 0
|
|
||||||
with sqlite_conn:
|
|
||||||
sqlite_cur = sqlite_conn.cursor()
|
|
||||||
with postgres_conn.cursor() as pg_curs:
|
|
||||||
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
|
||||||
pg_curs.execute(sql, (last_access_ts,))
|
|
||||||
|
|
||||||
for (origin, media_id, filesystem_id) in pg_curs:
|
|
||||||
sqlite_cur.execute(
|
|
||||||
"""
|
|
||||||
INSERT OR IGNORE INTO media
|
|
||||||
(origin, media_id, filesystem_id, type, known_deleted)
|
|
||||||
VALUES (?, ?, ?, ?, ?)
|
|
||||||
""",
|
|
||||||
(origin, media_id, filesystem_id, mtype, False),
|
|
||||||
)
|
|
||||||
update_count += sqlite_cur.rowcount
|
|
||||||
|
|
||||||
print("Synced", update_count, "new rows")
|
|
||||||
|
|
||||||
postgres_conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def run_check_delete(sqlite_conn, base_path):
|
|
||||||
"""Entry point for check-deleted command
|
|
||||||
"""
|
|
||||||
deleted = []
|
|
||||||
if progress:
|
|
||||||
it = tqdm.tqdm(
|
|
||||||
get_not_deleted(sqlite_conn),
|
|
||||||
unit="files",
|
|
||||||
total=get_not_deleted_count(sqlite_conn)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
it = get_not_deleted(sqlite_conn)
|
|
||||||
print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files")
|
|
||||||
|
|
||||||
for origin, media_id, filesystem_id, m_type in it:
|
|
||||||
if m_type == "local":
|
|
||||||
file_path = os.path.join(
|
|
||||||
base_path,
|
|
||||||
"local_content",
|
|
||||||
filesystem_id[:2],
|
|
||||||
filesystem_id[2:4],
|
|
||||||
filesystem_id[4:],
|
|
||||||
)
|
|
||||||
elif m_type == "remote":
|
|
||||||
file_path = os.path.join(
|
|
||||||
base_path,
|
|
||||||
"remote_content",
|
|
||||||
origin,
|
|
||||||
filesystem_id[:2],
|
|
||||||
filesystem_id[2:4],
|
|
||||||
filesystem_id[4:],
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise Exception("Unexpected media type %r", m_type)
|
|
||||||
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
deleted.append((origin, media_id))
|
|
||||||
|
|
||||||
with sqlite_conn:
|
|
||||||
sqlite_conn.executemany(
|
|
||||||
"""
|
|
||||||
UPDATE media SET known_deleted = ?
|
|
||||||
WHERE origin = ? AND media_id = ?
|
|
||||||
""",
|
|
||||||
((True, o, m) for o, m in deleted),
|
|
||||||
)
|
|
||||||
|
|
||||||
print("Updated", len(deleted), "as deleted")
|
|
||||||
|
|
||||||
|
|
||||||
def mark_as_deleted(sqlite_conn, origin, media_id):
|
def mark_as_deleted(sqlite_conn, origin, media_id):
|
||||||
with sqlite_conn:
|
with sqlite_conn:
|
||||||
sqlite_conn.execute(
|
sqlite_conn.execute(
|
||||||
|
@ -178,7 +77,7 @@ def get_not_deleted_count(sqlite_conn):
|
||||||
WHERE NOT known_deleted
|
WHERE NOT known_deleted
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
count, = cur.fetchone()
|
(count,) = cur.fetchone()
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
@ -201,10 +100,7 @@ def to_path(origin, filesystem_id, m_type):
|
||||||
"""
|
"""
|
||||||
if m_type == "local":
|
if m_type == "local":
|
||||||
file_path = os.path.join(
|
file_path = os.path.join(
|
||||||
"local_content",
|
"local_content", filesystem_id[:2], filesystem_id[2:4], filesystem_id[4:],
|
||||||
filesystem_id[:2],
|
|
||||||
filesystem_id[2:4],
|
|
||||||
filesystem_id[4:],
|
|
||||||
)
|
)
|
||||||
elif m_type == "remote":
|
elif m_type == "remote":
|
||||||
file_path = os.path.join(
|
file_path = os.path.join(
|
||||||
|
@ -241,9 +137,87 @@ def run_write(sqlite_conn, output_file):
|
||||||
print(file_path, file=output_file)
|
print(file_path, file=output_file)
|
||||||
|
|
||||||
|
|
||||||
def run_upload(
|
def run_update_db(postgres_conn, sqlite_conn, before_date):
|
||||||
s3, bucket, sqlite_conn, base_path, should_delete, storage_class
|
"""Entry point for update-db command
|
||||||
):
|
"""
|
||||||
|
|
||||||
|
local_sql = """
|
||||||
|
SELECT '', media_id, media_id
|
||||||
|
FROM local_media_repository
|
||||||
|
WHERE
|
||||||
|
COALESCE(last_access_ts, created_ts) < %s
|
||||||
|
AND url_cache IS NULL
|
||||||
|
"""
|
||||||
|
|
||||||
|
remote_sql = """
|
||||||
|
SELECT media_origin, media_id, filesystem_id
|
||||||
|
FROM remote_media_cache
|
||||||
|
WHERE
|
||||||
|
COALESCE(last_access_ts, created_ts) < %s
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_access_ts = int(before_date.timestamp() * 1000)
|
||||||
|
|
||||||
|
print(
|
||||||
|
"Syncing files that haven't been accessed since:", before_date.isoformat(" "),
|
||||||
|
)
|
||||||
|
|
||||||
|
update_count = 0
|
||||||
|
with sqlite_conn:
|
||||||
|
sqlite_cur = sqlite_conn.cursor()
|
||||||
|
with postgres_conn.cursor() as pg_curs:
|
||||||
|
for sql, mtype in ((local_sql, "local"), (remote_sql, "remote")):
|
||||||
|
pg_curs.execute(sql, (last_access_ts,))
|
||||||
|
|
||||||
|
for (origin, media_id, filesystem_id) in pg_curs:
|
||||||
|
sqlite_cur.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR IGNORE INTO media
|
||||||
|
(origin, media_id, filesystem_id, type, known_deleted)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(origin, media_id, filesystem_id, mtype, False),
|
||||||
|
)
|
||||||
|
update_count += sqlite_cur.rowcount
|
||||||
|
|
||||||
|
print("Synced", update_count, "new rows")
|
||||||
|
|
||||||
|
postgres_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def run_check_delete(sqlite_conn, base_path):
|
||||||
|
"""Entry point for check-deleted command
|
||||||
|
"""
|
||||||
|
deleted = []
|
||||||
|
if progress:
|
||||||
|
it = tqdm.tqdm(
|
||||||
|
get_not_deleted(sqlite_conn),
|
||||||
|
unit="files",
|
||||||
|
total=get_not_deleted_count(sqlite_conn),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
it = get_not_deleted(sqlite_conn)
|
||||||
|
print("Checking on ", get_not_deleted_count(sqlite_conn), " undeleted files")
|
||||||
|
|
||||||
|
for origin, media_id, filesystem_id, m_type in it:
|
||||||
|
rel_file_path = to_path(origin, filesystem_id, m_type)
|
||||||
|
file_path = os.path.join(base_path, rel_file_path)
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
deleted.append((origin, media_id))
|
||||||
|
|
||||||
|
with sqlite_conn:
|
||||||
|
sqlite_conn.executemany(
|
||||||
|
"""
|
||||||
|
UPDATE media SET known_deleted = ?
|
||||||
|
WHERE origin = ? AND media_id = ?
|
||||||
|
""",
|
||||||
|
((True, o, m) for o, m in deleted),
|
||||||
|
)
|
||||||
|
|
||||||
|
print("Updated", len(deleted), "as deleted")
|
||||||
|
|
||||||
|
|
||||||
|
def run_upload(s3, bucket, sqlite_conn, base_path, should_delete, storage_class):
|
||||||
"""Entry point for upload command
|
"""Entry point for upload command
|
||||||
"""
|
"""
|
||||||
total = get_not_deleted_count(sqlite_conn)
|
total = get_not_deleted_count(sqlite_conn)
|
||||||
|
@ -340,7 +314,12 @@ def get_postgres_conn(parser):
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(prog="s3_media_upload")
|
parser = argparse.ArgumentParser(prog="s3_media_upload")
|
||||||
parser.add_argument("--no-progress", help="do not show progress bars", action="store_true", dest="no_progress")
|
parser.add_argument(
|
||||||
|
"--no-progress",
|
||||||
|
help="do not show progress bars",
|
||||||
|
action="store_true",
|
||||||
|
dest="no_progress",
|
||||||
|
)
|
||||||
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
|
subparsers = parser.add_subparsers(help="command to run", dest="cmd")
|
||||||
|
|
||||||
update_db_parser = subparsers.add_parser(
|
update_db_parser = subparsers.add_parser(
|
||||||
|
@ -356,8 +335,7 @@ def main():
|
||||||
|
|
||||||
deleted_parser = subparsers.add_parser(
|
deleted_parser = subparsers.add_parser(
|
||||||
"check-deleted",
|
"check-deleted",
|
||||||
help="Check whether files in the local cache still exist under given"
|
help="Check whether files in the local cache still exist under given path",
|
||||||
" path",
|
|
||||||
)
|
)
|
||||||
deleted_parser.add_argument(
|
deleted_parser.add_argument(
|
||||||
"base_path", help="Base path of the media store directory"
|
"base_path", help="Base path of the media store directory"
|
||||||
|
@ -421,9 +399,7 @@ def main():
|
||||||
)
|
)
|
||||||
|
|
||||||
upload_parser.add_argument(
|
upload_parser.add_argument(
|
||||||
"--endpoint-url",
|
"--endpoint-url", help="S3 endpoint url to use", default=None
|
||||||
help="S3 endpoint url to use",
|
|
||||||
default=None
|
|
||||||
)
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
6
tox.ini
6
tox.ini
|
@ -51,10 +51,10 @@ deps =
|
||||||
# We pin so that our tests don't start failing on new releases of black.
|
# We pin so that our tests don't start failing on new releases of black.
|
||||||
black==19.10b0
|
black==19.10b0
|
||||||
commands =
|
commands =
|
||||||
python -m black --check --diff .
|
python -m black --check --diff . scripts/s3_media_upload
|
||||||
/bin/sh -c "flake8 s3_storage_provider.py setup.py"
|
/bin/sh -c "flake8 s3_storage_provider.py setup.py scripts/s3_media_upload"
|
||||||
|
|
||||||
[testenv:check_isort]
|
[testenv:check_isort]
|
||||||
skip_install = True
|
skip_install = True
|
||||||
deps = isort
|
deps = isort
|
||||||
commands = /bin/sh -c "isort -c -sp setup.cfg -rc s3_storage_provider.py setup.py "
|
commands = /bin/sh -c "isort -c -sp setup.cfg -rc s3_storage_provider.py setup.py scripts/s3_media_upload"
|
||||||
|
|
Loading…
Reference in a new issue