mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
Convert to be a PushProducer
This commit is contained in:
parent
931fd197cf
commit
1314baff14
1 changed files with 11 additions and 8 deletions
|
@ -150,9 +150,6 @@ class _S3DownloadThread(threading.Thread):
|
|||
if not chunk:
|
||||
return
|
||||
|
||||
# We clear the wakeup_event flag just before we write the data
|
||||
# to producer.
|
||||
wakeup_event.clear()
|
||||
reactor.callFromThread(producer._write, chunk)
|
||||
|
||||
except Exception:
|
||||
|
@ -190,9 +187,10 @@ class _S3Responder(Responder):
|
|||
"""See Responder.write_to_consumer
|
||||
"""
|
||||
self.consumer = consumer
|
||||
# We are a IPullProducer, so we expect consumer to call resumeProducing
|
||||
# each time they want a new chunk of data.
|
||||
consumer.registerProducer(self, False)
|
||||
# We are a IPushProducer, so we start producing immediately until we
|
||||
# get a pauseProducing or stopProducing
|
||||
consumer.registerProducer(self, True)
|
||||
self.wakeup_event.set()
|
||||
return self.deferred
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
|
@ -200,13 +198,18 @@ class _S3Responder(Responder):
|
|||
self.wakeup_event.set()
|
||||
|
||||
def resumeProducing(self):
|
||||
"""See IPullProducer.resumeProducing
|
||||
"""See IPushProducer.resumeProducing
|
||||
"""
|
||||
# The consumer is asking for more data, signal _S3DownloadThread
|
||||
self.wakeup_event.set()
|
||||
|
||||
def pauseProducing(self):
|
||||
"""See IPushProducer.stopProducing
|
||||
"""
|
||||
self.wakeup_event.clear()
|
||||
|
||||
def stopProducing(self):
|
||||
"""See IPullProducer.stopProducing
|
||||
"""See IPushProducer.stopProducing
|
||||
"""
|
||||
# The consumer wants no more data ever, signal _S3DownloadThread
|
||||
self.stop_event.set()
|
||||
|
|
Loading…
Reference in a new issue