mirror of
https://github.com/matrix-org/synapse-s3-storage-provider.git
synced 2024-10-23 07:29:40 +00:00
parent
fa0194e225
commit
410c4382ab
4 changed files with 33 additions and 29 deletions
|
@ -33,8 +33,12 @@ logger = logging.getLogger("synapse.s3")
|
||||||
|
|
||||||
|
|
||||||
# The list of valid AWS storage class names
|
# The list of valid AWS storage class names
|
||||||
_VALID_STORAGE_CLASSES = ('STANDARD', 'REDUCED_REDUNDANCY',
|
_VALID_STORAGE_CLASSES = (
|
||||||
'STANDARD_IA', 'INTELLIGENT_TIERING')
|
"STANDARD",
|
||||||
|
"REDUCED_REDUNDANCY",
|
||||||
|
"STANDARD_IA",
|
||||||
|
"INTELLIGENT_TIERING",
|
||||||
|
)
|
||||||
|
|
||||||
# Chunk size to use when reading from s3 connection in bytes
|
# Chunk size to use when reading from s3 connection in bytes
|
||||||
READ_CHUNK_SIZE = 16 * 1024
|
READ_CHUNK_SIZE = 16 * 1024
|
||||||
|
@ -70,15 +74,13 @@ class S3StorageProviderBackend(StorageProvider):
|
||||||
|
|
||||||
def _store_file():
|
def _store_file():
|
||||||
session = boto3.session.Session()
|
session = boto3.session.Session()
|
||||||
session.resource('s3', **self.api_kwargs).Bucket(self.bucket).upload_file(
|
session.resource("s3", **self.api_kwargs).Bucket(self.bucket).upload_file(
|
||||||
Filename=os.path.join(self.cache_directory, path),
|
Filename=os.path.join(self.cache_directory, path),
|
||||||
Key=path,
|
Key=path,
|
||||||
ExtraArgs={"StorageClass": self.storage_class},
|
ExtraArgs={"StorageClass": self.storage_class},
|
||||||
)
|
)
|
||||||
|
|
||||||
return make_deferred_yieldable(
|
return make_deferred_yieldable(reactor.callInThread(_store_file))
|
||||||
reactor.callInThread(_store_file)
|
|
||||||
)
|
|
||||||
|
|
||||||
def fetch(self, path, file_info):
|
def fetch(self, path, file_info):
|
||||||
"""See StorageProvider.fetch"""
|
"""See StorageProvider.fetch"""
|
||||||
|
@ -146,12 +148,12 @@ class _S3DownloadThread(threading.Thread):
|
||||||
if not hasattr(local_data, "b3_session"):
|
if not hasattr(local_data, "b3_session"):
|
||||||
local_data.b3_session = boto3.session.Session()
|
local_data.b3_session = boto3.session.Session()
|
||||||
session = local_data.b3_session
|
session = local_data.b3_session
|
||||||
s3 = session.client('s3', **self.api_kwargs)
|
s3 = session.client("s3", **self.api_kwargs)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
|
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
|
||||||
except botocore.exceptions.ClientError as e:
|
except botocore.exceptions.ClientError as e:
|
||||||
if e.response['Error']['Code'] in ("404", "NoSuchKey",):
|
if e.response["Error"]["Code"] in ("404", "NoSuchKey",):
|
||||||
reactor.callFromThread(self.deferred.callback, None)
|
reactor.callFromThread(self.deferred.callback, None)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -160,7 +162,7 @@ class _S3DownloadThread(threading.Thread):
|
||||||
|
|
||||||
producer = _S3Responder()
|
producer = _S3Responder()
|
||||||
reactor.callFromThread(self.deferred.callback, producer)
|
reactor.callFromThread(self.deferred.callback, producer)
|
||||||
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.)
|
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.0)
|
||||||
|
|
||||||
|
|
||||||
def _stream_to_producer(reactor, producer, body, status=None, timeout=None):
|
def _stream_to_producer(reactor, producer, body, status=None, timeout=None):
|
||||||
|
@ -219,6 +221,7 @@ def _stream_to_producer(reactor, producer, body, status=None, timeout=None):
|
||||||
class _S3Responder(Responder):
|
class _S3Responder(Responder):
|
||||||
"""A Responder for S3. Created by _S3DownloadThread
|
"""A Responder for S3. Created by _S3DownloadThread
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Triggered by responder when more data has been requested (or
|
# Triggered by responder when more data has been requested (or
|
||||||
# stop_event has been triggered)
|
# stop_event has been triggered)
|
||||||
|
|
18
setup.py
18
setup.py
|
@ -3,18 +3,18 @@ from setuptools import setup
|
||||||
__version__ = "1.0"
|
__version__ = "1.0"
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='synapse-s3-storage-provider',
|
name="synapse-s3-storage-provider",
|
||||||
version=__version__,
|
version=__version__,
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
url="https://github.com/matrix-org/synapse-s3-storage-provider",
|
url="https://github.com/matrix-org/synapse-s3-storage-provider",
|
||||||
py_modules=['s3_storage_provider'],
|
py_modules=["s3_storage_provider"],
|
||||||
scripts=['scripts/s3_media_upload'],
|
scripts=["scripts/s3_media_upload"],
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'boto3>=1.9.23<2.0',
|
"boto3>=1.9.23<2.0",
|
||||||
'botocore>=1.12.23<2.0',
|
"botocore>=1.12.23<2.0",
|
||||||
'humanize>=0.5.1<0.6',
|
"humanize>=0.5.1<0.6",
|
||||||
'psycopg2>=2.7.5<3.0',
|
"psycopg2>=2.7.5<3.0",
|
||||||
'PyYAML>=3.13<4.0',
|
"PyYAML>=3.13<4.0",
|
||||||
'tqdm>=4.26.0<5.0'
|
"tqdm>=4.26.0<5.0",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
17
test_s3.py
17
test_s3.py
|
@ -19,7 +19,8 @@ from twisted.test.proto_helpers import MemoryReactorClock
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
is_py2 = sys.version[0] == '2'
|
|
||||||
|
is_py2 = sys.version[0] == "2"
|
||||||
if is_py2:
|
if is_py2:
|
||||||
from Queue import Queue
|
from Queue import Queue
|
||||||
else:
|
else:
|
||||||
|
@ -29,9 +30,7 @@ from threading import Event, Thread
|
||||||
|
|
||||||
from mock import Mock
|
from mock import Mock
|
||||||
|
|
||||||
from s3_storage_provider import (
|
from s3_storage_provider import _stream_to_producer, _S3Responder, _ProducerStatus
|
||||||
_stream_to_producer, _S3Responder, _ProducerStatus,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class StreamingProducerTestCase(unittest.TestCase):
|
class StreamingProducerTestCase(unittest.TestCase):
|
||||||
|
@ -52,10 +51,7 @@ class StreamingProducerTestCase(unittest.TestCase):
|
||||||
self.thread = Thread(
|
self.thread = Thread(
|
||||||
target=_stream_to_producer,
|
target=_stream_to_producer,
|
||||||
args=(self.reactor, self.producer, self.body),
|
args=(self.reactor, self.producer, self.body),
|
||||||
kwargs={
|
kwargs={"status": self.producer_status, "timeout": 1.0},
|
||||||
"status": self.producer_status,
|
|
||||||
"timeout": 1.0,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
self.thread.daemon = True
|
self.thread.daemon = True
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
@ -94,12 +90,12 @@ class StreamingProducerTestCase(unittest.TestCase):
|
||||||
self.producer.pauseProducing()
|
self.producer.pauseProducing()
|
||||||
self.body.write(" string")
|
self.body.write(" string")
|
||||||
self.wait_for_thread()
|
self.wait_for_thread()
|
||||||
self.producer_status.wait_until_paused(10.)
|
self.producer_status.wait_until_paused(10.0)
|
||||||
self.assertEqual("test string", self.written)
|
self.assertEqual("test string", self.written)
|
||||||
|
|
||||||
# If we write again we remain paused and nothing gets written
|
# If we write again we remain paused and nothing gets written
|
||||||
self.body.write(" second")
|
self.body.write(" second")
|
||||||
self.producer_status.wait_until_paused(10.)
|
self.producer_status.wait_until_paused(10.0)
|
||||||
self.assertEqual("test string", self.written)
|
self.assertEqual("test string", self.written)
|
||||||
|
|
||||||
# If we call resumeProducing the buffered data gets read and written.
|
# If we call resumeProducing the buffered data gets read and written.
|
||||||
|
@ -165,6 +161,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
|
||||||
class Channel(object):
|
class Channel(object):
|
||||||
"""Simple channel to mimic a thread safe file like object
|
"""Simple channel to mimic a thread safe file like object
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._queue = Queue()
|
self._queue = Queue()
|
||||||
|
|
||||||
|
|
6
tox.ini
6
tox.ini
|
@ -46,7 +46,11 @@ skip_install = True
|
||||||
basepython = python3.6
|
basepython = python3.6
|
||||||
deps =
|
deps =
|
||||||
flake8
|
flake8
|
||||||
commands = /bin/sh -c "flake8 s3_storage_provider.py setup.py {env:PEP8SUFFIX:}"
|
# We pin so that our tests don't start failing on new releases of black.
|
||||||
|
black==19.10b0
|
||||||
|
commands =
|
||||||
|
python -m black --check --diff .
|
||||||
|
/bin/sh -c "flake8 s3_storage_provider.py setup.py {env:PEP8SUFFIX:}"
|
||||||
|
|
||||||
[testenv:check_isort]
|
[testenv:check_isort]
|
||||||
skip_install = True
|
skip_install = True
|
||||||
|
|
Loading…
Reference in a new issue