2
2
3
3
4
4
import logging
5
+ import typing
5
6
6
7
import aiohttp .web
7
8
import aiohttp .client
15
16
LOGGER .setLevel (logging .DEBUG )
16
17
17
18
19
+ # The replication process needs a generous timeout, due to aiohttp having
20
+ # a default value of 5 minutes. This is too low for the replication
21
+ # process, as the segments can be up to 5 GiB in size.
22
+ # The new value is approx 4.5 hours.
23
+ REPL_TIMEOUT = 16384
24
+
25
+
18
26
class ObjectReplicationProxy ():
19
27
"""A class for replicating objects."""
20
28
@@ -34,6 +42,8 @@ def __init__(
34
42
self .source_project = source_project
35
43
self .source_container = source_container
36
44
45
+ self .client = client
46
+
37
47
self .auth = auth
38
48
self .host : str = common .get_download_host (self .auth , self .project )
39
49
self .source_host : str = common .get_download_host (
@@ -46,23 +56,14 @@ async def a_generate_object_from_reader(
46
56
resp : aiohttp .client .ClientResponse
47
57
):
48
58
"""Generate uploaded object chunks from a response."""
49
- chunk = await resp .content .read (1048576 )
50
-
51
- while chunk :
52
- yield chunk
53
- chunk = await resp .content .read (1048576 )
54
-
55
- async def a_generate_object_segment_from_reader (
56
- self ,
57
- resp : aiohttp .client .ClientResponse
58
- ):
59
- """Generate next uploaded object segment from a response."""
60
- # Yield at max 5GiB of information in a segment
61
- for i in range (0 , 5120 ):
59
+ number = 0
60
+ while True :
62
61
chunk = await resp .content .read (1048576 )
63
62
if not chunk :
64
63
break
65
64
yield chunk
65
+ number += 1
66
+ LOGGER .debug (f"Response stream complete." )
66
67
67
68
async def a_create_container (
68
69
self ,
@@ -88,6 +89,98 @@ async def a_create_container(
88
89
)
89
90
LOGGER .debug (f"Created container { container } " )
90
91
92
+ async def a_sync_object_segments (
93
+ self ,
94
+ manifest
95
+ ) -> typing .List [str ]:
96
+ """Get object segments."""
97
+ async with self .client .get (
98
+ common .generate_download_url (
99
+ self .source_host ,
100
+ container = manifest .split ('/' )[0 ]
101
+ ),
102
+ headers = {
103
+ "X-Auth-Token" : self .auth .get_token (),
104
+ "Accept-Encoding" : "identity"
105
+ },
106
+ timeout = REPL_TIMEOUT
107
+ ) as resp :
108
+ if resp .status == 404 :
109
+ raise aiohttp .web .HTTPNotFound (
110
+ reason = "Couldn't find segment container."
111
+ )
112
+ if resp .status == 403 :
113
+ raise aiohttp .web .HTTPForbidden (
114
+ reason = "Not allowed to access segment container."
115
+ )
116
+ prefix = manifest .replace (manifest .split ('/' )[0 ], "" ).lstrip ('/' )
117
+ LOGGER .debug (f"Segment prefix: { prefix } " )
118
+ segments_str = await resp .text ()
119
+ segments_list = segments_str .lstrip ().rstrip ().split ('\n ' )
120
+ LOGGER .debug (f"Segments before filtering: { segments_list } " )
121
+ segments = list (filter (
122
+ lambda x , # type: ignore
123
+ pref = prefix :
124
+ pref in x ,
125
+ segments_list
126
+ ))
127
+
128
+ LOGGER .debug (f"Got following segments: { segments } " )
129
+
130
+ for segment in segments :
131
+ from_url = common .generate_download_url (
132
+ self .source_host ,
133
+ container = manifest .split ('/' )[0 ],
134
+ object_name = segment
135
+ )
136
+ LOGGER .debug (f"Getting segment from url: { from_url } " )
137
+ async with self .client .get (
138
+ from_url ,
139
+ headers = {
140
+ "X-Auth-Token" : self .auth .get_token (),
141
+ "Accept-Encoding" : "identity"
142
+ },
143
+ timeout = REPL_TIMEOUT
144
+ ) as resp_g :
145
+ length = int (resp_g .headers ["Content-Length" ])
146
+ headers = {
147
+ "X-Auth-Token" : self .auth .get_token ()
148
+ }
149
+
150
+ if resp_g .status not in {200 , 201 , 202 }:
151
+ raise aiohttp .web .HTTPNotFound (
152
+ reason = "Couldn't find segment"
153
+ )
154
+ LOGGER .debug (f"Copying segment { segment } " )
155
+ headers ["Content-Length" ] = str (length )
156
+ headers ["Content-Type" ] = resp_g .headers ["Content-Type" ]
157
+ headers ["ETag" ] = resp_g .headers ["ETag" ]
158
+
159
+ to_url = common .generate_download_url (
160
+ self .host ,
161
+ container = f"{ self .container } _segments" ,
162
+ object_name = segment
163
+ )
164
+ LOGGER .debug (f"Posting segment to url: { to_url } " )
165
+ async with self .client .put (
166
+ to_url ,
167
+ data = self .a_generate_object_from_reader (resp_g ),
168
+ headers = headers ,
169
+ timeout = REPL_TIMEOUT
170
+ ) as resp_p :
171
+ LOGGER .debug (f"Segment { segment } status { resp_p .status } " )
172
+ if resp_p .status == 408 :
173
+ raise aiohttp .web .HTTPRequestTimeout ()
174
+ if resp_p .status not in {201 , 202 }:
175
+ raise aiohttp .web .HTTPBadRequest (
176
+ reason = "Couldn't upload object segment"
177
+ )
178
+ LOGGER .debug (f"Success in copying segment { segment } " )
179
+
180
+ new_manifest = manifest .replace (manifest .split ('/' )[0 ],
181
+ f"{ self .container } _segments" )
182
+ return new_manifest
183
+
91
184
async def a_copy_object (
92
185
self ,
93
186
object_name
@@ -101,8 +194,10 @@ async def a_copy_object(
101
194
object_name = object_name
102
195
),
103
196
headers = {
104
- "X-Auth-Token" : self .auth .get_token ()
105
- }
197
+ "X-Auth-Token" : self .auth .get_token (),
198
+ "Accept-Encoding" : "identity"
199
+ },
200
+ timeout = REPL_TIMEOUT
106
201
) as resp_g :
107
202
# If the source object doesn't exist, abort
108
203
if resp_g .status != 200 :
@@ -114,6 +209,8 @@ async def a_copy_object(
114
209
# Ensure that the upload container exists
115
210
await self .a_create_container ()
116
211
212
+ LOGGER .debug (f"Got headers: { resp_g .headers } " )
213
+
117
214
headers = {
118
215
"X-Auth-Token" : self .auth .get_token ()
119
216
}
@@ -137,52 +234,30 @@ async def a_copy_object(
137
234
self .container ,
138
235
object_name
139
236
),
140
- data = self .a_generate_object_from_response (resp_g ),
141
- headers = headers
237
+ data = self .a_generate_object_from_reader (resp_g ),
238
+ headers = headers ,
239
+ timeout = REPL_TIMEOUT
142
240
) as resp_p :
143
241
if resp_p .status == 408 :
144
242
raise aiohttp .web .HTTPRequestTimeout ()
243
+ if resp_p .status not in {201 , 202 }:
244
+ raise aiohttp .web .HTTPBadRequest (
245
+ reason = "Couldn't upload object segment"
246
+ )
145
247
LOGGER .debug (f"Success in copying object { object_name } " )
146
248
else :
147
249
# Ensure the segment container exists, since performing
148
250
# segmented upload
149
251
LOGGER .debug (f"Copying object { object_name } in segments." )
150
252
await self .a_create_container (segmented = True )
151
- total_chunks = length // 5368709120 + 1
152
- size_left = length
153
-
154
- LOGGER .debug (f"Copying in total { total_chunks } chunks" )
155
-
156
- for i in range (0 , total_chunks ):
157
- async with self .client .put (
158
- common .generate_download_url (
159
- self .host ,
160
- container = f"{ self .container } _segments" ,
161
- object_name = f"{ object_name } /{ i :08d} "
162
- ),
163
- data = self .a_generate_object_segment_from_reader (
164
- resp_g
165
- ),
166
- headers = {
167
- "X-Auth-Token" : self .auth .get_token (),
168
- "Content-Length" : (
169
- size_left if size_left > 5368709120
170
- else 5368709120
171
- ),
172
- "Content-Type" :
173
- "application/swiftclient-segment"
174
- }
175
- ) as resp :
176
- if resp .status == 408 :
177
- raise aiohttp .web .HTTPRequestTimeout ()
178
- size_left -= 5368709120
179
- LOGGER .debug (f"Copied chunk { i } " )
180
- LOGGER .debug (f"{ size_left } bytes left" )
253
+
254
+ manifest = await self .a_sync_object_segments (
255
+ resp_g .headers ["X-Object-Manifest" ]
256
+ )
181
257
182
258
LOGGER .debug ("Uploading manifest" )
183
259
# Add manifest headers
184
- manifest = f"{ self .container } _segments/{ object_name } /"
185
- headers ["X-Object-Manifest" ]: manifest
260
+ headers ["X-Object-Manifest" ] = manifest
186
261
# Create manifest file
187
262
async with self .client .put (
188
263
common .generate_download_url (
@@ -191,7 +266,8 @@ async def a_copy_object(
191
266
object_name = object_name
192
267
),
193
268
data = b"" ,
194
- headers = headers
269
+ headers = headers ,
270
+ timeout = REPL_TIMEOUT
195
271
) as resp :
196
272
if resp .status != 201 :
197
273
raise aiohttp .web .HTTPInternalServerError (
@@ -201,21 +277,33 @@ async def a_copy_object(
201
277
202
278
async def a_copy_from_container (self ):
203
279
"""Copy objects from a source container."""
280
+ LOGGER .debug (
281
+ f"Fetching objects from container { self .source_container } "
282
+ )
283
+ container_url = common .generate_download_url (
284
+ self .source_host ,
285
+ container = self .source_container
286
+ )
287
+ LOGGER .debug (f"Container url: { container_url } " )
204
288
async with self .client .get (
205
289
common .generate_download_url (
206
290
self .source_host ,
207
291
container = self .source_container ,
208
292
),
209
293
headers = {
210
294
"X-Auth-Token" : self .auth .get_token ()
211
- }
295
+ },
296
+ timeout = REPL_TIMEOUT
212
297
) as resp :
213
298
if resp .status != 200 :
299
+ LOGGER .debug (
300
+ f"Container fetch failed with status { resp .status } "
301
+ )
214
302
raise aiohttp .web .HTTPBadRequest (
215
303
reason = "Couldn't fetch the source container"
216
304
)
217
- LOGGER .debug ("Got container object listing" )
218
- objects = await resp .text ()
219
- objects = objects .rstrip ().lstrip ().split ("\n " )
220
- for i in objects :
221
- self .a_copy_object (i )
305
+ LOGGER .debug ("Got container object listing" )
306
+ objects = await resp .text ()
307
+ objects = objects .rstrip ().lstrip ().split ("\n " )
308
+ for i in objects :
309
+ await self .a_copy_object (i )
0 commit comments