From 231426db5f399d7b93494a19bf82c7bdc2b80ec7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Feb 2018 13:50:21 +0000 Subject: [PATCH] Reorder file --- s3_storage_provider.py | 152 ++++++++++++++++++++--------------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index f430b3a..0c1180b 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -68,6 +68,82 @@ class S3StorageProviderBackend(StorageProvider): return config["bucket"] +class _S3DownloadThread(threading.Thread): + """Attempts to download a file from S3. + + Args: + bucket (str): The S3 bucket which may have the file + key (str): The key of the file + deferred (Deferred[_S3Responder|None]): If files exists + resolved with an _S3Responder instance, if it doesn't + exist then resolves with None. + + Attributes: + READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading + file. + """ + + READ_CHUNK_SIZE = 16 * 1024 + + def __init__(self, bucket, key, deferred): + super(_S3DownloadThread, self).__init__(name="s3-download") + self.bucket = bucket + self.key = key + self.deferred = deferred + + def run(self): + session = boto3.session.Session() + s3 = session.client('s3') + + try: + resp = s3.get_object(Bucket=self.bucket, Key=self.key) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + reactor.callFromThread(self.deferred.callback, None) + return + + reactor.callFromThread(self.deferred.errback, Failure()) + return + + # Triggered by responder when more data has been requested (or + # stop_event has been triggered) + wakeup_event = threading.Event() + # Trigered by responder when we should abort the download. + stop_event = threading.Event() + + producer = _S3Responder(wakeup_event, stop_event) + reactor.callFromThread(self.deferred.callback, producer) + + try: + body = resp["Body"] + + while not stop_event.is_set(): + # We wait for the producer to signal that the consumer wants + # more data (or we should abort) + wakeup_event.wait() + + # Check if we were woken up so that we abort the download + if stop_event.is_set(): + return + + chunk = body.read(self.READ_CHUNK_SIZE) + if not chunk: + return + + # We clear the wakeup_event flag just before we write the data + # to producer. + wakeup_event.clear() + reactor.callFromThread(producer._write, chunk) + + except Exception: + reactor.callFromThread(producer._error, Failure()) + return + finally: + reactor.callFromThread(producer._finish) + if body: + body.close() + + class _S3Responder(Responder): """A Responder for S3. Created by _S3DownloadThread @@ -143,79 +219,3 @@ class _S3Responder(Responder): if not self.deferred.called: self.deferred.callback(None) - - -class _S3DownloadThread(threading.Thread): - """Attempts to download a file from S3. - - Args: - bucket (str): The S3 bucket which may have the file - key (str): The key of the file - deferred (Deferred[_S3Responder|None]): If files exists - resolved with an _S3Responder instance, if it doesn't - exist then resolves with None. - - Attributes: - READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading - file. - """ - - READ_CHUNK_SIZE = 16 * 1024 - - def __init__(self, bucket, key, deferred): - super(_S3DownloadThread, self).__init__(name="s3-download") - self.bucket = bucket - self.key = key - self.deferred = deferred - - def run(self): - session = boto3.session.Session() - s3 = session.client('s3') - - try: - resp = s3.get_object(Bucket=self.bucket, Key=self.key) - except botocore.exceptions.ClientError as e: - if e.response['Error']['Code'] == "404": - reactor.callFromThread(self.deferred.callback, None) - return - - reactor.callFromThread(self.deferred.errback, Failure()) - return - - # Triggered by responder when more data has been requested (or - # stop_event has been triggered) - wakeup_event = threading.Event() - # Trigered by responder when we should abort the download. - stop_event = threading.Event() - - producer = _S3Responder(wakeup_event, stop_event) - reactor.callFromThread(self.deferred.callback, producer) - - try: - body = resp["Body"] - - while not stop_event.is_set(): - # We wait for the producer to signal that the consumer wants - # more data (or we should abort) - wakeup_event.wait() - - # Check if we were woken up so that we abort the download - if stop_event.is_set(): - return - - chunk = body.read(self.READ_CHUNK_SIZE) - if not chunk: - return - - # We clear the wakeup_event flag just before we write the data - # to producer. - wakeup_event.clear() - reactor.callFromThread(producer._write, chunk) - - except Exception: - reactor.callFromThread(producer._error, Failure()) - return - finally: - reactor.callFromThread(producer._finish) - if body: - body.close()