From 17a65cf4f7f16a876f9b0ee3fe19b90d76310bc5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 27 Jan 2020 16:56:44 +0000 Subject: [PATCH] Use a threadpool for downloading things from S3 (#30) This is a good thing because we need to create a new S3 client for each thread, and creating S3 clients is relatively expensive. --- README.md | 5 ++++ s3_storage_provider.py | 55 ++++++++++++++++++++++++++---------------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index d08147c..17cd462 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,11 @@ media_storage_providers: endpoint_url: access_key_id: secret_access_key: + + # also optional. The maximum number of concurrent threads which will be used + # to connect to S3. Each thread manages a single connection. Default is 40. + # + #threadpool_size: 20 ``` This module uses `boto3`, and so the credentials should be specified as diff --git a/s3_storage_provider.py b/s3_storage_provider.py index f5b9308..a6951a1 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -24,10 +24,11 @@ import botocore from twisted.internet import defer, reactor from twisted.python.failure import Failure +from twisted.python.threadpool import ThreadPool from synapse.rest.media.v1._base import Responder from synapse.rest.media.v1.storage_provider import StorageProvider -from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.logcontext import LoggingContext, make_deferred_yieldable logger = logging.getLogger("synapse.s3") @@ -69,6 +70,12 @@ class S3StorageProviderBackend(StorageProvider): if "secret_access_key" in config: self.api_kwargs["aws_secret_access_key"] = config["secret_access_key"] + threadpool_size = config.get("threadpool_size", 40) + self._download_pool = ThreadPool( + name="s3-download-pool", maxthreads=threadpool_size + ) + self._download_pool.start() + def store_file(self, path, file_info): """See StorageProvider.store_file""" @@ -80,12 +87,18 @@ class S3StorageProviderBackend(StorageProvider): ExtraArgs={"StorageClass": self.storage_class}, ) + # XXX: reactor.callInThread doesn't return anything, so I don't think this does + # what the author intended. return make_deferred_yieldable(reactor.callInThread(_store_file)) def fetch(self, path, file_info): """See StorageProvider.fetch""" + logcontext = LoggingContext.current_context() + d = defer.Deferred() - _S3DownloadThread(self.bucket, self.api_kwargs, path, d).start() + self._download_pool.callInThread( + s3_download_task, self.bucket, self.api_kwargs, path, d, logcontext + ) return make_deferred_yieldable(d) @staticmethod @@ -123,7 +136,7 @@ class S3StorageProviderBackend(StorageProvider): return result -class _S3DownloadThread(threading.Thread): +def s3_download_task(bucket, api_kwargs, key, deferred, parent_logcontext): """Attempts to download a file from S3. Args: @@ -134,34 +147,33 @@ class _S3DownloadThread(threading.Thread): deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't exist then resolves with None. + parent_logcontext (LoggingContext): the logcontext to report logs and metrics + against. """ + with LoggingContext(parent_context=parent_logcontext): + logger.info("Fetching %s from S3", key) - def __init__(self, bucket, api_kwargs, key, deferred): - super(_S3DownloadThread, self).__init__(name="s3-download") - self.bucket = bucket - self.api_kwargs = api_kwargs - self.key = key - self.deferred = deferred - - def run(self): local_data = threading.local() - if not hasattr(local_data, "b3_session"): - local_data.b3_session = boto3.session.Session() - session = local_data.b3_session - s3 = session.client("s3", **self.api_kwargs) try: - resp = s3.get_object(Bucket=self.bucket, Key=self.key) + s3 = local_data.b3_client + except AttributeError: + b3_session = boto3.session.Session() + local_data.b3_client = s3 = b3_session.client("s3", **api_kwargs) + + try: + resp = s3.get_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] in ("404", "NoSuchKey",): - reactor.callFromThread(self.deferred.callback, None) + logger.info("Media %s not found in S3", key) + reactor.callFromThread(deferred.callback, None) return - reactor.callFromThread(self.deferred.errback, Failure()) + reactor.callFromThread(deferred.errback, Failure()) return producer = _S3Responder() - reactor.callFromThread(self.deferred.callback, producer) + reactor.callFromThread(deferred.callback, producer) _stream_to_producer(reactor, producer, resp["Body"], timeout=90.0) @@ -244,7 +256,7 @@ class _S3Responder(Responder): # get a pauseProducing or stopProducing consumer.registerProducer(self, True) self.wakeup_event.set() - return self.deferred + return make_deferred_yieldable(self.deferred) def __exit__(self, exc_type, exc_val, exc_tb): self.stop_event.set() @@ -268,7 +280,8 @@ class _S3Responder(Responder): self.stop_event.set() self.wakeup_event.set() if not self.deferred.called: - self.deferred.errback(Exception("Consumer ask to stop producing")) + with LoggingContext(): + self.deferred.errback(Exception("Consumer ask to stop producing")) def _write(self, chunk): """Writes the chunk of data to consumer. Called by _S3DownloadThread.