Reorder file

This commit is contained in:
Erik Johnston 2018-02-12 13:50:21 +00:00
parent c1239fbea3
commit 231426db5f

View file

@ -68,6 +68,82 @@ class S3StorageProviderBackend(StorageProvider):
return config["bucket"] return config["bucket"]
class _S3DownloadThread(threading.Thread):
"""Attempts to download a file from S3.
Args:
bucket (str): The S3 bucket which may have the file
key (str): The key of the file
deferred (Deferred[_S3Responder|None]): If files exists
resolved with an _S3Responder instance, if it doesn't
exist then resolves with None.
Attributes:
READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading
file.
"""
READ_CHUNK_SIZE = 16 * 1024
def __init__(self, bucket, key, deferred):
super(_S3DownloadThread, self).__init__(name="s3-download")
self.bucket = bucket
self.key = key
self.deferred = deferred
def run(self):
session = boto3.session.Session()
s3 = session.client('s3')
try:
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
reactor.callFromThread(self.deferred.callback, None)
return
reactor.callFromThread(self.deferred.errback, Failure())
return
# Triggered by responder when more data has been requested (or
# stop_event has been triggered)
wakeup_event = threading.Event()
# Trigered by responder when we should abort the download.
stop_event = threading.Event()
producer = _S3Responder(wakeup_event, stop_event)
reactor.callFromThread(self.deferred.callback, producer)
try:
body = resp["Body"]
while not stop_event.is_set():
# We wait for the producer to signal that the consumer wants
# more data (or we should abort)
wakeup_event.wait()
# Check if we were woken up so that we abort the download
if stop_event.is_set():
return
chunk = body.read(self.READ_CHUNK_SIZE)
if not chunk:
return
# We clear the wakeup_event flag just before we write the data
# to producer.
wakeup_event.clear()
reactor.callFromThread(producer._write, chunk)
except Exception:
reactor.callFromThread(producer._error, Failure())
return
finally:
reactor.callFromThread(producer._finish)
if body:
body.close()
class _S3Responder(Responder): class _S3Responder(Responder):
"""A Responder for S3. Created by _S3DownloadThread """A Responder for S3. Created by _S3DownloadThread
@ -143,79 +219,3 @@ class _S3Responder(Responder):
if not self.deferred.called: if not self.deferred.called:
self.deferred.callback(None) self.deferred.callback(None)
class _S3DownloadThread(threading.Thread):
"""Attempts to download a file from S3.
Args:
bucket (str): The S3 bucket which may have the file
key (str): The key of the file
deferred (Deferred[_S3Responder|None]): If files exists
resolved with an _S3Responder instance, if it doesn't
exist then resolves with None.
Attributes:
READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading
file.
"""
READ_CHUNK_SIZE = 16 * 1024
def __init__(self, bucket, key, deferred):
super(_S3DownloadThread, self).__init__(name="s3-download")
self.bucket = bucket
self.key = key
self.deferred = deferred
def run(self):
session = boto3.session.Session()
s3 = session.client('s3')
try:
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
reactor.callFromThread(self.deferred.callback, None)
return
reactor.callFromThread(self.deferred.errback, Failure())
return
# Triggered by responder when more data has been requested (or
# stop_event has been triggered)
wakeup_event = threading.Event()
# Trigered by responder when we should abort the download.
stop_event = threading.Event()
producer = _S3Responder(wakeup_event, stop_event)
reactor.callFromThread(self.deferred.callback, producer)
try:
body = resp["Body"]
while not stop_event.is_set():
# We wait for the producer to signal that the consumer wants
# more data (or we should abort)
wakeup_event.wait()
# Check if we were woken up so that we abort the download
if stop_event.is_set():
return
chunk = body.read(self.READ_CHUNK_SIZE)
if not chunk:
return
# We clear the wakeup_event flag just before we write the data
# to producer.
wakeup_event.clear()
reactor.callFromThread(producer._write, chunk)
except Exception:
reactor.callFromThread(producer._error, Failure())
return
finally:
reactor.callFromThread(producer._finish)
if body:
body.close()