diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 1ec4951..8ff65c8 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -150,9 +150,6 @@ class _S3DownloadThread(threading.Thread): if not chunk: return - # We clear the wakeup_event flag just before we write the data - # to producer. - wakeup_event.clear() reactor.callFromThread(producer._write, chunk) except Exception: @@ -190,9 +187,10 @@ class _S3Responder(Responder): """See Responder.write_to_consumer """ self.consumer = consumer - # We are a IPullProducer, so we expect consumer to call resumeProducing - # each time they want a new chunk of data. - consumer.registerProducer(self, False) + # We are a IPushProducer, so we start producing immediately until we + # get a pauseProducing or stopProducing + consumer.registerProducer(self, True) + self.wakeup_event.set() return self.deferred def __exit__(self, exc_type, exc_val, exc_tb): @@ -200,13 +198,18 @@ class _S3Responder(Responder): self.wakeup_event.set() def resumeProducing(self): - """See IPullProducer.resumeProducing + """See IPushProducer.resumeProducing """ # The consumer is asking for more data, signal _S3DownloadThread self.wakeup_event.set() + def pauseProducing(self): + """See IPushProducer.stopProducing + """ + self.wakeup_event.clear() + def stopProducing(self): - """See IPullProducer.stopProducing + """See IPushProducer.stopProducing """ # The consumer wants no more data ever, signal _S3DownloadThread self.stop_event.set()