diff --git a/s3_storage_provider.py b/s3_storage_provider.py index e0ab67e..f5b9308 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -33,8 +33,12 @@ logger = logging.getLogger("synapse.s3") # The list of valid AWS storage class names -_VALID_STORAGE_CLASSES = ('STANDARD', 'REDUCED_REDUNDANCY', - 'STANDARD_IA', 'INTELLIGENT_TIERING') +_VALID_STORAGE_CLASSES = ( + "STANDARD", + "REDUCED_REDUNDANCY", + "STANDARD_IA", + "INTELLIGENT_TIERING", +) # Chunk size to use when reading from s3 connection in bytes READ_CHUNK_SIZE = 16 * 1024 @@ -70,15 +74,13 @@ class S3StorageProviderBackend(StorageProvider): def _store_file(): session = boto3.session.Session() - session.resource('s3', **self.api_kwargs).Bucket(self.bucket).upload_file( + session.resource("s3", **self.api_kwargs).Bucket(self.bucket).upload_file( Filename=os.path.join(self.cache_directory, path), Key=path, ExtraArgs={"StorageClass": self.storage_class}, ) - return make_deferred_yieldable( - reactor.callInThread(_store_file) - ) + return make_deferred_yieldable(reactor.callInThread(_store_file)) def fetch(self, path, file_info): """See StorageProvider.fetch""" @@ -146,12 +148,12 @@ class _S3DownloadThread(threading.Thread): if not hasattr(local_data, "b3_session"): local_data.b3_session = boto3.session.Session() session = local_data.b3_session - s3 = session.client('s3', **self.api_kwargs) + s3 = session.client("s3", **self.api_kwargs) try: resp = s3.get_object(Bucket=self.bucket, Key=self.key) except botocore.exceptions.ClientError as e: - if e.response['Error']['Code'] in ("404", "NoSuchKey",): + if e.response["Error"]["Code"] in ("404", "NoSuchKey",): reactor.callFromThread(self.deferred.callback, None) return @@ -160,7 +162,7 @@ class _S3DownloadThread(threading.Thread): producer = _S3Responder() reactor.callFromThread(self.deferred.callback, producer) - _stream_to_producer(reactor, producer, resp["Body"], timeout=90.) + _stream_to_producer(reactor, producer, resp["Body"], timeout=90.0) def _stream_to_producer(reactor, producer, body, status=None, timeout=None): @@ -219,6 +221,7 @@ def _stream_to_producer(reactor, producer, body, status=None, timeout=None): class _S3Responder(Responder): """A Responder for S3. Created by _S3DownloadThread """ + def __init__(self): # Triggered by responder when more data has been requested (or # stop_event has been triggered) diff --git a/setup.py b/setup.py index a7255c5..a880b1e 100644 --- a/setup.py +++ b/setup.py @@ -3,18 +3,18 @@ from setuptools import setup __version__ = "1.0" setup( - name='synapse-s3-storage-provider', + name="synapse-s3-storage-provider", version=__version__, zip_safe=False, url="https://github.com/matrix-org/synapse-s3-storage-provider", - py_modules=['s3_storage_provider'], - scripts=['scripts/s3_media_upload'], + py_modules=["s3_storage_provider"], + scripts=["scripts/s3_media_upload"], install_requires=[ - 'boto3>=1.9.23<2.0', - 'botocore>=1.12.23<2.0', - 'humanize>=0.5.1<0.6', - 'psycopg2>=2.7.5<3.0', - 'PyYAML>=3.13<4.0', - 'tqdm>=4.26.0<5.0' + "boto3>=1.9.23<2.0", + "botocore>=1.12.23<2.0", + "humanize>=0.5.1<0.6", + "psycopg2>=2.7.5<3.0", + "PyYAML>=3.13<4.0", + "tqdm>=4.26.0<5.0", ], ) diff --git a/test_s3.py b/test_s3.py index b7a1f61..5328ad0 100644 --- a/test_s3.py +++ b/test_s3.py @@ -19,7 +19,8 @@ from twisted.test.proto_helpers import MemoryReactorClock from twisted.trial import unittest import sys -is_py2 = sys.version[0] == '2' + +is_py2 = sys.version[0] == "2" if is_py2: from Queue import Queue else: @@ -29,9 +30,7 @@ from threading import Event, Thread from mock import Mock -from s3_storage_provider import ( - _stream_to_producer, _S3Responder, _ProducerStatus, -) +from s3_storage_provider import _stream_to_producer, _S3Responder, _ProducerStatus class StreamingProducerTestCase(unittest.TestCase): @@ -52,10 +51,7 @@ class StreamingProducerTestCase(unittest.TestCase): self.thread = Thread( target=_stream_to_producer, args=(self.reactor, self.producer, self.body), - kwargs={ - "status": self.producer_status, - "timeout": 1.0, - }, + kwargs={"status": self.producer_status, "timeout": 1.0}, ) self.thread.daemon = True self.thread.start() @@ -94,12 +90,12 @@ class StreamingProducerTestCase(unittest.TestCase): self.producer.pauseProducing() self.body.write(" string") self.wait_for_thread() - self.producer_status.wait_until_paused(10.) + self.producer_status.wait_until_paused(10.0) self.assertEqual("test string", self.written) # If we write again we remain paused and nothing gets written self.body.write(" second") - self.producer_status.wait_until_paused(10.) + self.producer_status.wait_until_paused(10.0) self.assertEqual("test string", self.written) # If we call resumeProducing the buffered data gets read and written. @@ -165,6 +161,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): class Channel(object): """Simple channel to mimic a thread safe file like object """ + def __init__(self): self._queue = Queue() diff --git a/tox.ini b/tox.ini index 21ade6c..ad5cc67 100644 --- a/tox.ini +++ b/tox.ini @@ -46,7 +46,11 @@ skip_install = True basepython = python3.6 deps = flake8 -commands = /bin/sh -c "flake8 s3_storage_provider.py setup.py {env:PEP8SUFFIX:}" + # We pin so that our tests don't start failing on new releases of black. + black==19.10b0 +commands = + python -m black --check --diff . + /bin/sh -c "flake8 s3_storage_provider.py setup.py {env:PEP8SUFFIX:}" [testenv:check_isort] skip_install = True