From 503fd7ba38fb95a515fffce942b6bb915aa3d877 Mon Sep 17 00:00:00 2001 From: Saad Rhoulam Date: Tue, 21 Aug 2018 01:02:43 -0400 Subject: [PATCH 1/6] Generalize for usage in other S3-like services. --- s3_storage_provider.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 1ec4951..bb97c66 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -46,12 +46,15 @@ class S3StorageProviderBackend(StorageProvider): self.cache_directory = hs.config.media_store_path self.bucket = config["bucket"] self.storage_class = config["storage_class"] + self.api_kwargs = {} + if "endpoint_url" in config: + self.api_kwargs["endpoint_url"] = config["endpoint_url"] def store_file(self, path, file_info): """See StorageProvider.store_file""" def _store_file(): - boto3.resource('s3').Bucket(self.bucket).upload_file( + boto3.resource('s3', **self.api_kwargs).Bucket(self.bucket).upload_file( Filename=os.path.join(self.cache_directory, path), Key=path, ExtraArgs={"StorageClass": self.storage_class}, @@ -64,7 +67,7 @@ class S3StorageProviderBackend(StorageProvider): def fetch(self, path, file_info): """See StorageProvider.fetch""" d = defer.Deferred() - _S3DownloadThread(self.bucket, path, d).start() + _S3DownloadThread(self.bucket, self.api_kwargs, path, d).start() return make_deferred_yieldable(d) @staticmethod @@ -82,17 +85,23 @@ class S3StorageProviderBackend(StorageProvider): assert isinstance(bucket, basestring) assert storage_class in _VALID_STORAGE_CLASSES - return { + result = { "bucket": bucket, "storage_class": storage_class, } + if "endpoint_url" in config: + result["endpoint_url"] = config["endpoint_url"] + + return result + class _S3DownloadThread(threading.Thread): """Attempts to download a file from S3. Args: bucket (str): The S3 bucket which may have the file + api_kwargs (dict): Keyword arguments to pass when invoking the API. Generally `endpoint_url`. key (str): The key of the file deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't @@ -105,15 +114,16 @@ class _S3DownloadThread(threading.Thread): READ_CHUNK_SIZE = 16 * 1024 - def __init__(self, bucket, key, deferred): + def __init__(self, bucket, api_kwargs, key, deferred): super(_S3DownloadThread, self).__init__(name="s3-download") self.bucket = bucket + self.api_kwargs = api_kwargs self.key = key self.deferred = deferred def run(self): session = boto3.session.Session() - s3 = session.client('s3') + s3 = session.client('s3', **self.api_kwargs) try: resp = s3.get_object(Bucket=self.bucket, Key=self.key) From 05d9bf6bd6503276ff692bedc369bf4cb7677418 Mon Sep 17 00:00:00 2001 From: Martin Honermeyer Date: Thu, 13 Sep 2018 23:37:21 +0200 Subject: [PATCH 2/6] Create a separate boto3 session for each file store thread This prevents threading problems, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#multithreading-multiprocessing. --- s3_storage_provider.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index bb97c66..8cd9482 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -54,7 +54,8 @@ class S3StorageProviderBackend(StorageProvider): """See StorageProvider.store_file""" def _store_file(): - boto3.resource('s3', **self.api_kwargs).Bucket(self.bucket).upload_file( + session = boto3.session.Session() + session.resource('s3', **self.api_kwargs).Bucket(self.bucket).upload_file( Filename=os.path.join(self.cache_directory, path), Key=path, ExtraArgs={"StorageClass": self.storage_class}, From f4df1a69eb34a1b3f480e995cde2ac537c1658cd Mon Sep 17 00:00:00 2001 From: Adam Hellberg Date: Thu, 14 Mar 2019 22:51:16 +0100 Subject: [PATCH 3/6] Allow configuration of region and access keys Adds the option to set more boto3 options: region_name, aws_access_key_id, and aws_secret_access_key. This makes it easier to configure without having to be careful about some CLI tool having the correct configuration. Also allows setting the region name. --- s3_storage_provider.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index ecfb074..d15d35c 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -51,9 +51,19 @@ class S3StorageProviderBackend(StorageProvider): self.bucket = config["bucket"] self.storage_class = config["storage_class"] self.api_kwargs = {} + + if "region_name" in config: + self.api_kwargs["region_name"] = config["region_name"] + if "endpoint_url" in config: self.api_kwargs["endpoint_url"] = config["endpoint_url"] + if "access_key_id" in config: + self.api_kwargs["aws_access_key_id"] = config["access_key_id"] + + if "secret_access_key" in config: + self.api_kwargs["aws_secret_access_key"] = config["secret_access_key"] + def store_file(self, path, file_info): """See StorageProvider.store_file""" @@ -95,9 +105,18 @@ class S3StorageProviderBackend(StorageProvider): "storage_class": storage_class, } + if "region_name" in config: + result["region_name"] = config["region_name"] + if "endpoint_url" in config: result["endpoint_url"] = config["endpoint_url"] + if "access_key_id" in config: + result["access_key_id"] = config["access_key_id"] + + if "secret_access_key" in config: + result["secret_access_key"] = config["secret_access_key"] + return result From dd1d03f955f12e51a1ada30856d75830ae0ee945 Mon Sep 17 00:00:00 2001 From: Adam Hellberg Date: Thu, 14 Mar 2019 22:54:41 +0100 Subject: [PATCH 4/6] Fix line too long --- s3_storage_provider.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index d15d35c..28cfe2d 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -125,7 +125,8 @@ class _S3DownloadThread(threading.Thread): Args: bucket (str): The S3 bucket which may have the file - api_kwargs (dict): Keyword arguments to pass when invoking the API. Generally `endpoint_url`. + api_kwargs (dict): Keyword arguments to pass when invoking the API. + Generally `endpoint_url`. key (str): The key of the file deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't From 5fa3b097dcf4302deebb8ea7da2362c80c3232ab Mon Sep 17 00:00:00 2001 From: Adam Hellberg Date: Thu, 14 Mar 2019 22:56:23 +0100 Subject: [PATCH 5/6] Update README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 90f3e50..37c41e9 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,12 @@ media_storage_providers: store_synchronous: True config: bucket: + # All of the below options are optional, for use with non-AWS S3-like + # services, or to specify access tokens here instead of some external method. + region_name: + endpoint_url: + access_key_id: + secret_access_key: ``` This module uses `boto3`, and so the credentials should be specified as From af9ee9df531e05d649ab7d02c3057947f48c1d02 Mon Sep 17 00:00:00 2001 From: Adam Hellberg Date: Thu, 14 Mar 2019 23:10:00 +0100 Subject: [PATCH 6/6] Fix test_s3.py to work with python 2 --- test_s3.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/test_s3.py b/test_s3.py index 379d381..b7a1f61 100644 --- a/test_s3.py +++ b/test_s3.py @@ -18,7 +18,13 @@ from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock from twisted.trial import unittest -from queue import Queue +import sys +is_py2 = sys.version[0] == '2' +if is_py2: + from Queue import Queue +else: + from queue import Queue + from threading import Event, Thread from mock import Mock