-
-
Notifications
You must be signed in to change notification settings - Fork 141
/
Copy path_clouds.py
209 lines (162 loc) · 6.39 KB
/
_clouds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
ffmpeg_streaming.clouds
~~~~~~~~~~~~
Upload and download files -> clouds
:copyright: (c) 2020 by Amin Yazdanpanah.
:website: https://www.aminyazdanpanah.com
:email: [email protected]
:license: MIT, see LICENSE for more details.
"""
import abc
import logging
import tempfile
from os import listdir
from os.path import isfile, join, basename
class Clouds(abc.ABC):
"""
@TODO: add documentation
"""
@abc.abstractmethod
def upload_directory(self, directory: str, **options) -> None:
pass
@abc.abstractmethod
def download(self, filename: str = None, **options) -> str:
pass
class S3(Clouds):
def __init__(self, **options):
"""
@TODO: add documentation
"""
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
"pip install boto3")
self.s3 = boto3.client('s3', **options)
self.err = ClientError
def upload_directory(self, directory, **options):
bucket_name = options.pop('bucket_name', None)
folder = options.pop('folder', '')
if bucket_name is None:
raise ValueError('You should pass a bucket name')
files = [f for f in listdir(directory) if isfile(join(directory, f))]
try:
for file in files:
self.s3.upload_file(join(directory, file), bucket_name, join(folder, file).replace("\\", "/"))
except self.err as e:
logging.error(e)
raise RuntimeError(e)
logging.info("The {} directory was uploaded to Amazon S3 successfully".format(directory))
def download(self, filename=None, **options):
bucket_name = options.pop('bucket_name', None)
key = options.pop('key', None)
if bucket_name is None or key is None:
raise ValueError('You should pass a bucket and key name')
if filename is None:
filename = tempfile.NamedTemporaryFile(prefix=basename(key), delete=False)
else:
filename = open(filename, 'wb')
try:
with filename as f:
self.s3.download_fileobj(bucket_name, key, f)
logging.info(f'The {filename.name} file was downloaded')
except self.err as e:
logging.error(e)
raise RuntimeError(e)
return filename.name
class GCS(Clouds):
CLIENT = None
def __init__(self, **options):
"""
@TODO: add documentation
"""
try:
from google.cloud import storage
except ImportError as e:
raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
"pip install google-cloud-storage")
GCS.CLIENT = storage.Client(**options)
def upload_directory(self, directory, **options):
bucket_name = options.pop('bucket_name', None)
if bucket_name is None:
raise ValueError('You should pass a bucket name')
bucket = GCS.CLIENT.get_bucket(bucket_name)
folder = options.pop('folder', '')
files = [f for f in listdir(directory) if isfile(join(directory, f))]
for file in files:
blob = bucket.blob(join(folder, file).replace("\\", "/"), **options)
blob.upload_from_filename(join(directory, file))
def download(self, filename=None, **options):
bucket_name = options.pop('bucket_name', None)
if bucket_name is None:
raise ValueError('You should pass a bucket name')
bucket = GCS.CLIENT.get_bucket(bucket_name)
object_name = options.pop('object_name', None)
if object_name is None:
raise ValueError('You should pass an object name')
if filename is None:
with tempfile.NamedTemporaryFile(prefix=basename(object_name), delete=False) as tmp:
filename = tmp.name
blob = bucket.get_blob(object_name, **options)
blob.download_to_filename(filename)
return filename
class MAS(Clouds):
def __init__(self, **options):
"""
@TODO: add documentation
"""
try:
from azure.storage.blob import BlockBlobService
except ImportError as e:
raise ImportError("No specified import name! make sure that you have installed the package via pip:\n\n"
"pip install azure-storage-blob")
self.block_blob_service = BlockBlobService(**options)
def upload_directory(self, directory, **options):
container = options.pop('container', None)
if container is None:
raise ValueError('You should pass a container name')
files = [f for f in listdir(directory) if isfile(join(directory, f))]
try:
for file in files:
self.block_blob_service.create_blob_from_path(container, file, join(directory, file))
except:
error = "An error occurred while uploading the directory"
logging.error(error)
raise RuntimeError(error)
def download(self, filename=None, **options):
container = options.pop('container', None)
blob = options.pop('blob', None)
if container is None or blob is None:
raise ValueError('You should pass a container name and a blob name')
if filename is None:
with tempfile.NamedTemporaryFile(prefix=basename(blob), delete=False) as tmp:
filename = tmp.name
try:
self.block_blob_service.get_blob_to_path(container, blob, filename)
logging.info(f'The {filename} file was downloaded')
except:
error = "An error occurred while downloading the file"
logging.error(error)
raise RuntimeError(error)
return filename
class CloudManager:
def __init__(self, filename: str = None):
"""
@TODO: add documentation
"""
self.filename = filename
self.clouds = []
def add(self, cloud: Clouds, **options):
self.clouds.append((cloud, options))
return self
def transfer(self, method, path):
for cloud in self.clouds:
getattr(cloud[0], method)(path, **cloud[1])
__all__ = [
'Clouds',
'CloudManager',
'S3',
'GCS',
'MAS'
]