mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Use a threadpool for downloading things from S3 (#30)
This is a good thing because we need to create a new S3 client for each thread, and creating S3 clients is relatively expensive.
This commit is contained in:
parent
410c4382ab
commit
17a65cf4f7
2 changed files with 39 additions and 21 deletions
|
@ -27,6 +27,11 @@ media_storage_providers:
|
||||||
endpoint_url: <S3_LIKE_SERVICE_ENDPOINT_URL>
|
endpoint_url: <S3_LIKE_SERVICE_ENDPOINT_URL>
|
||||||
access_key_id: <S3_ACCESS_KEY_ID>
|
access_key_id: <S3_ACCESS_KEY_ID>
|
||||||
secret_access_key: <S3_SECRET_ACCESS_KEY>
|
secret_access_key: <S3_SECRET_ACCESS_KEY>
|
||||||
|
|
||||||
|
# also optional. The maximum number of concurrent threads which will be used
|
||||||
|
# to connect to S3. Each thread manages a single connection. Default is 40.
|
||||||
|
#
|
||||||
|
#threadpool_size: 20
|
||||||
```
|
```
|
||||||
|
|
||||||
This module uses `boto3`, and so the credentials should be specified as
|
This module uses `boto3`, and so the credentials should be specified as
|
||||||
|
|
|
@ -24,10 +24,11 @@ import botocore
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
from twisted.python.threadpool import ThreadPool
|
||||||
|
|
||||||
from synapse.rest.media.v1._base import Responder
|
from synapse.rest.media.v1._base import Responder
|
||||||
from synapse.rest.media.v1.storage_provider import StorageProvider
|
from synapse.rest.media.v1.storage_provider import StorageProvider
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable
|
||||||
|
|
||||||
logger = logging.getLogger("synapse.s3")
|
logger = logging.getLogger("synapse.s3")
|
||||||
|
|
||||||
|
@ -69,6 +70,12 @@ class S3StorageProviderBackend(StorageProvider):
|
||||||
if "secret_access_key" in config:
|
if "secret_access_key" in config:
|
||||||
self.api_kwargs["aws_secret_access_key"] = config["secret_access_key"]
|
self.api_kwargs["aws_secret_access_key"] = config["secret_access_key"]
|
||||||
|
|
||||||
|
threadpool_size = config.get("threadpool_size", 40)
|
||||||
|
self._download_pool = ThreadPool(
|
||||||
|
name="s3-download-pool", maxthreads=threadpool_size
|
||||||
|
)
|
||||||
|
self._download_pool.start()
|
||||||
|
|
||||||
def store_file(self, path, file_info):
|
def store_file(self, path, file_info):
|
||||||
"""See StorageProvider.store_file"""
|
"""See StorageProvider.store_file"""
|
||||||
|
|
||||||
|
@ -80,12 +87,18 @@ class S3StorageProviderBackend(StorageProvider):
|
||||||
ExtraArgs={"StorageClass": self.storage_class},
|
ExtraArgs={"StorageClass": self.storage_class},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX: reactor.callInThread doesn't return anything, so I don't think this does
|
||||||
|
# what the author intended.
|
||||||
return make_deferred_yieldable(reactor.callInThread(_store_file))
|
return make_deferred_yieldable(reactor.callInThread(_store_file))
|
||||||
|
|
||||||
def fetch(self, path, file_info):
|
def fetch(self, path, file_info):
|
||||||
"""See StorageProvider.fetch"""
|
"""See StorageProvider.fetch"""
|
||||||
|
logcontext = LoggingContext.current_context()
|
||||||
|
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
_S3DownloadThread(self.bucket, self.api_kwargs, path, d).start()
|
self._download_pool.callInThread(
|
||||||
|
s3_download_task, self.bucket, self.api_kwargs, path, d, logcontext
|
||||||
|
)
|
||||||
return make_deferred_yieldable(d)
|
return make_deferred_yieldable(d)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -123,7 +136,7 @@ class S3StorageProviderBackend(StorageProvider):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
class _S3DownloadThread(threading.Thread):
|
def s3_download_task(bucket, api_kwargs, key, deferred, parent_logcontext):
|
||||||
"""Attempts to download a file from S3.
|
"""Attempts to download a file from S3.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -134,34 +147,33 @@ class _S3DownloadThread(threading.Thread):
|
||||||
deferred (Deferred[_S3Responder|None]): If file exists
|
deferred (Deferred[_S3Responder|None]): If file exists
|
||||||
resolved with an _S3Responder instance, if it doesn't
|
resolved with an _S3Responder instance, if it doesn't
|
||||||
exist then resolves with None.
|
exist then resolves with None.
|
||||||
|
parent_logcontext (LoggingContext): the logcontext to report logs and metrics
|
||||||
|
against.
|
||||||
"""
|
"""
|
||||||
|
with LoggingContext(parent_context=parent_logcontext):
|
||||||
|
logger.info("Fetching %s from S3", key)
|
||||||
|
|
||||||
def __init__(self, bucket, api_kwargs, key, deferred):
|
|
||||||
super(_S3DownloadThread, self).__init__(name="s3-download")
|
|
||||||
self.bucket = bucket
|
|
||||||
self.api_kwargs = api_kwargs
|
|
||||||
self.key = key
|
|
||||||
self.deferred = deferred
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
local_data = threading.local()
|
local_data = threading.local()
|
||||||
if not hasattr(local_data, "b3_session"):
|
|
||||||
local_data.b3_session = boto3.session.Session()
|
|
||||||
session = local_data.b3_session
|
|
||||||
s3 = session.client("s3", **self.api_kwargs)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
|
s3 = local_data.b3_client
|
||||||
|
except AttributeError:
|
||||||
|
b3_session = boto3.session.Session()
|
||||||
|
local_data.b3_client = s3 = b3_session.client("s3", **api_kwargs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = s3.get_object(Bucket=bucket, Key=key)
|
||||||
except botocore.exceptions.ClientError as e:
|
except botocore.exceptions.ClientError as e:
|
||||||
if e.response["Error"]["Code"] in ("404", "NoSuchKey",):
|
if e.response["Error"]["Code"] in ("404", "NoSuchKey",):
|
||||||
reactor.callFromThread(self.deferred.callback, None)
|
logger.info("Media %s not found in S3", key)
|
||||||
|
reactor.callFromThread(deferred.callback, None)
|
||||||
return
|
return
|
||||||
|
|
||||||
reactor.callFromThread(self.deferred.errback, Failure())
|
reactor.callFromThread(deferred.errback, Failure())
|
||||||
return
|
return
|
||||||
|
|
||||||
producer = _S3Responder()
|
producer = _S3Responder()
|
||||||
reactor.callFromThread(self.deferred.callback, producer)
|
reactor.callFromThread(deferred.callback, producer)
|
||||||
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.0)
|
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -244,7 +256,7 @@ class _S3Responder(Responder):
|
||||||
# get a pauseProducing or stopProducing
|
# get a pauseProducing or stopProducing
|
||||||
consumer.registerProducer(self, True)
|
consumer.registerProducer(self, True)
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
return self.deferred
|
return make_deferred_yieldable(self.deferred)
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.stop_event.set()
|
self.stop_event.set()
|
||||||
|
@ -268,6 +280,7 @@ class _S3Responder(Responder):
|
||||||
self.stop_event.set()
|
self.stop_event.set()
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
if not self.deferred.called:
|
if not self.deferred.called:
|
||||||
|
with LoggingContext():
|
||||||
self.deferred.errback(Exception("Consumer ask to stop producing"))
|
self.deferred.errback(Exception("Consumer ask to stop producing"))
|
||||||
|
|
||||||
def _write(self, chunk):
|
def _write(self, chunk):
|
||||||
|
|
Loading…
Reference in a new issue