mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Factor out generic streaming from s3 specific
This commit is contained in:
parent
1314baff14
commit
005f857917
1 changed files with 81 additions and 45 deletions
|
@ -34,6 +34,9 @@ logger = logging.getLogger("synapse.s3")
|
||||||
# The list of valid AWS storage class names
|
# The list of valid AWS storage class names
|
||||||
_VALID_STORAGE_CLASSES = ('STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA')
|
_VALID_STORAGE_CLASSES = ('STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA')
|
||||||
|
|
||||||
|
# Chunk size to use when reading from s3 connection in bytes
|
||||||
|
READ_CHUNK_SIZE = 16 * 1024
|
||||||
|
|
||||||
|
|
||||||
class S3StorageProviderBackend(StorageProvider):
|
class S3StorageProviderBackend(StorageProvider):
|
||||||
"""
|
"""
|
||||||
|
@ -97,14 +100,8 @@ class _S3DownloadThread(threading.Thread):
|
||||||
deferred (Deferred[_S3Responder|None]): If file exists
|
deferred (Deferred[_S3Responder|None]): If file exists
|
||||||
resolved with an _S3Responder instance, if it doesn't
|
resolved with an _S3Responder instance, if it doesn't
|
||||||
exist then resolves with None.
|
exist then resolves with None.
|
||||||
|
|
||||||
Attributes:
|
|
||||||
READ_CHUNK_SIZE (int): The chunk size in bytes used when downloading
|
|
||||||
file.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
READ_CHUNK_SIZE = 16 * 1024
|
|
||||||
|
|
||||||
def __init__(self, bucket, key, deferred):
|
def __init__(self, bucket, key, deferred):
|
||||||
super(_S3DownloadThread, self).__init__(name="s3-download")
|
super(_S3DownloadThread, self).__init__(name="s3-download")
|
||||||
self.bucket = bucket
|
self.bucket = bucket
|
||||||
|
@ -125,28 +122,51 @@ class _S3DownloadThread(threading.Thread):
|
||||||
reactor.callFromThread(self.deferred.errback, Failure())
|
reactor.callFromThread(self.deferred.errback, Failure())
|
||||||
return
|
return
|
||||||
|
|
||||||
# Triggered by responder when more data has been requested (or
|
producer = _S3Responder()
|
||||||
# stop_event has been triggered)
|
|
||||||
wakeup_event = threading.Event()
|
|
||||||
# Trigered by responder when we should abort the download.
|
|
||||||
stop_event = threading.Event()
|
|
||||||
|
|
||||||
producer = _S3Responder(wakeup_event, stop_event)
|
|
||||||
reactor.callFromThread(self.deferred.callback, producer)
|
reactor.callFromThread(self.deferred.callback, producer)
|
||||||
|
_stream_to_producer(self.deferred, resp["Body"], timeout=90.)
|
||||||
|
|
||||||
|
|
||||||
|
def _stream_to_producer(reactor, producer, body, status=None, timeout=None):
|
||||||
|
"""Streams a file like object to the producer.
|
||||||
|
|
||||||
|
Correctly handles producer being paused/resumed/stopped.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reactor
|
||||||
|
producer (_S3Responder): Producer object to stream results to
|
||||||
|
body (file like): The object to read from
|
||||||
|
status (_ProducerStatus|None): Used to track whether we're currently
|
||||||
|
paused or not. Used for testing
|
||||||
|
timeout (float|None): Timeout in seconds to wait for consume to resume
|
||||||
|
after being paused
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Set when we should be producing, cleared when we are paused
|
||||||
|
wakeup_event = producer.wakeup_event
|
||||||
|
|
||||||
|
# Set if we should stop producing forever
|
||||||
|
stop_event = producer.stop_event
|
||||||
|
|
||||||
|
if not status:
|
||||||
|
status = _ProducerStatus()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
body = resp["Body"]
|
|
||||||
|
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
# We wait for the producer to signal that the consumer wants
|
# We wait for the producer to signal that the consumer wants
|
||||||
# more data (or we should abort)
|
# more data (or we should abort)
|
||||||
wakeup_event.wait()
|
if not wakeup_event.is_set():
|
||||||
|
status.set_paused(True)
|
||||||
|
ret = wakeup_event.wait(timeout)
|
||||||
|
if not ret:
|
||||||
|
raise Exception("Timed out waiting to resume")
|
||||||
|
status.set_paused(False)
|
||||||
|
|
||||||
# Check if we were woken up so that we abort the download
|
# Check if we were woken up so that we abort the download
|
||||||
if stop_event.is_set():
|
if stop_event.is_set():
|
||||||
return
|
return
|
||||||
|
|
||||||
chunk = body.read(self.READ_CHUNK_SIZE)
|
chunk = body.read(READ_CHUNK_SIZE)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -154,7 +174,6 @@ class _S3DownloadThread(threading.Thread):
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
reactor.callFromThread(producer._error, Failure())
|
reactor.callFromThread(producer._error, Failure())
|
||||||
return
|
|
||||||
finally:
|
finally:
|
||||||
reactor.callFromThread(producer._finish)
|
reactor.callFromThread(producer._finish)
|
||||||
if body:
|
if body:
|
||||||
|
@ -163,18 +182,13 @@ class _S3DownloadThread(threading.Thread):
|
||||||
|
|
||||||
class _S3Responder(Responder):
|
class _S3Responder(Responder):
|
||||||
"""A Responder for S3. Created by _S3DownloadThread
|
"""A Responder for S3. Created by _S3DownloadThread
|
||||||
|
|
||||||
Args:
|
|
||||||
wakeup_event (threading.Event): Used to signal to _S3DownloadThread
|
|
||||||
that consumer is ready for more data (or that we've triggered
|
|
||||||
stop_event).
|
|
||||||
stop_event (threading.Event): Used to signal to _S3DownloadThread that
|
|
||||||
it should stop producing. `wakeup_event` must also be set if
|
|
||||||
`stop_event` is used.
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, wakeup_event, stop_event):
|
def __init__(self):
|
||||||
self.wakeup_event = wakeup_event
|
# Triggered by responder when more data has been requested (or
|
||||||
self.stop_event = stop_event
|
# stop_event has been triggered)
|
||||||
|
self.wakeup_event = threading.Event()
|
||||||
|
# Trigered by responder when we should abort the download.
|
||||||
|
self.stop_event = threading.Event()
|
||||||
|
|
||||||
# The consumer we're registered to
|
# The consumer we're registered to
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
@ -214,6 +228,7 @@ class _S3Responder(Responder):
|
||||||
# The consumer wants no more data ever, signal _S3DownloadThread
|
# The consumer wants no more data ever, signal _S3DownloadThread
|
||||||
self.stop_event.set()
|
self.stop_event.set()
|
||||||
self.wakeup_event.set()
|
self.wakeup_event.set()
|
||||||
|
if not self.deferred.called:
|
||||||
self.deferred.errback(Exception("Consumer ask to stop producing"))
|
self.deferred.errback(Exception("Consumer ask to stop producing"))
|
||||||
|
|
||||||
def _write(self, chunk):
|
def _write(self, chunk):
|
||||||
|
@ -242,3 +257,24 @@ class _S3Responder(Responder):
|
||||||
|
|
||||||
if not self.deferred.called:
|
if not self.deferred.called:
|
||||||
self.deferred.callback(None)
|
self.deferred.callback(None)
|
||||||
|
|
||||||
|
|
||||||
|
class _ProducerStatus(object):
|
||||||
|
"""Used to track whether the s3 download thread is currently paused
|
||||||
|
waiting for consumer to resume. Used for testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.is_paused = threading.Event()
|
||||||
|
self.is_paused.clear()
|
||||||
|
|
||||||
|
def wait_until_paused(self, timeout=None):
|
||||||
|
is_paused = self.is_paused.wait(timeout)
|
||||||
|
if not is_paused:
|
||||||
|
raise Exception("Timed out waiting")
|
||||||
|
|
||||||
|
def set_paused(self, paused):
|
||||||
|
if paused:
|
||||||
|
self.is_paused.set()
|
||||||
|
else:
|
||||||
|
self.is_paused.clear()
|
||||||
|
|
Loading…
Reference in a new issue