From 005f85791735473ffd8924ea9669047bc396c102 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 23 Aug 2018 11:04:52 +0100 Subject: [PATCH] Factor out generic streaming from s3 specific --- s3_storage_provider.py | 126 ++++++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 45 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 8ff65c8..5330045 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -34,6 +34,9 @@ logger = logging.getLogger("synapse.s3") # The list of valid AWS storage class names _VALID_STORAGE_CLASSES = ('STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA') +# Chunk size to use when reading from s3 connection in bytes +READ_CHUNK_SIZE = 16 * 1024 + class S3StorageProviderBackend(StorageProvider): """ @@ -97,14 +100,8 @@ class _S3DownloadThread(threading.Thread): deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't exist then resolves with None. - - Attributes: - READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading - file. """ - READ_CHUNK_SIZE = 16 * 1024 - def __init__(self, bucket, key, deferred): super(_S3DownloadThread, self).__init__(name="s3-download") self.bucket = bucket @@ -125,56 +122,73 @@ class _S3DownloadThread(threading.Thread): reactor.callFromThread(self.deferred.errback, Failure()) return - # Triggered by responder when more data has been requested (or - # stop_event has been triggered) - wakeup_event = threading.Event() - # Trigered by responder when we should abort the download. - stop_event = threading.Event() - - producer = _S3Responder(wakeup_event, stop_event) + producer = _S3Responder() reactor.callFromThread(self.deferred.callback, producer) + _stream_to_producer(self.deferred, resp["Body"], timeout=90.) - try: - body = resp["Body"] - while not stop_event.is_set(): - # We wait for the producer to signal that the consumer wants - # more data (or we should abort) - wakeup_event.wait() +def _stream_to_producer(reactor, producer, body, status=None, timeout=None): + """Streams a file like object to the producer. - # Check if we were woken up so that we abort the download - if stop_event.is_set(): - return + Correctly handles producer being paused/resumed/stopped. - chunk = body.read(self.READ_CHUNK_SIZE) - if not chunk: - return + Args: + reactor + producer (_S3Responder): Producer object to stream results to + body (file like): The object to read from + status (_ProducerStatus|None): Used to track whether we're currently + paused or not. Used for testing + timeout (float|None): Timeout in seconds to wait for consume to resume + after being paused + """ - reactor.callFromThread(producer._write, chunk) + # Set when we should be producing, cleared when we are paused + wakeup_event = producer.wakeup_event - except Exception: - reactor.callFromThread(producer._error, Failure()) - return - finally: - reactor.callFromThread(producer._finish) - if body: - body.close() + # Set if we should stop producing forever + stop_event = producer.stop_event + + if not status: + status = _ProducerStatus() + + try: + while not stop_event.is_set(): + # We wait for the producer to signal that the consumer wants + # more data (or we should abort) + if not wakeup_event.is_set(): + status.set_paused(True) + ret = wakeup_event.wait(timeout) + if not ret: + raise Exception("Timed out waiting to resume") + status.set_paused(False) + + # Check if we were woken up so that we abort the download + if stop_event.is_set(): + return + + chunk = body.read(READ_CHUNK_SIZE) + if not chunk: + return + + reactor.callFromThread(producer._write, chunk) + + except Exception: + reactor.callFromThread(producer._error, Failure()) + finally: + reactor.callFromThread(producer._finish) + if body: + body.close() class _S3Responder(Responder): """A Responder for S3. Created by _S3DownloadThread - - Args: - wakeup_event (threading.Event): Used to signal to _S3DownloadThread - that consumer is ready for more data (or that we've triggered - stop_event). - stop_event (threading.Event): Used to signal to _S3DownloadThread that - it should stop producing. `wakeup_event` must also be set if - `stop_event` is used. """ - def __init__(self, wakeup_event, stop_event): - self.wakeup_event = wakeup_event - self.stop_event = stop_event + def __init__(self): + # Triggered by responder when more data has been requested (or + # stop_event has been triggered) + self.wakeup_event = threading.Event() + # Trigered by responder when we should abort the download. + self.stop_event = threading.Event() # The consumer we're registered to self.consumer = None @@ -214,7 +228,8 @@ class _S3Responder(Responder): # The consumer wants no more data ever, signal _S3DownloadThread self.stop_event.set() self.wakeup_event.set() - self.deferred.errback(Exception("Consumer ask to stop producing")) + if not self.deferred.called: + self.deferred.errback(Exception("Consumer ask to stop producing")) def _write(self, chunk): """Writes the chunk of data to consumer. Called by _S3DownloadThread. @@ -242,3 +257,24 @@ class _S3Responder(Responder): if not self.deferred.called: self.deferred.callback(None) + + +class _ProducerStatus(object): + """Used to track whether the s3 download thread is currently paused + waiting for consumer to resume. Used for testing. + """ + + def __init__(self): + self.is_paused = threading.Event() + self.is_paused.clear() + + def wait_until_paused(self, timeout=None): + is_paused = self.is_paused.wait(timeout) + if not is_paused: + raise Exception("Timed out waiting") + + def set_paused(self, paused): + if paused: + self.is_paused.set() + else: + self.is_paused.clear()