Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Commit 05cef3f

Browse files
committed
add segmented upload concatenation
1 parent b7bd4ed commit 05cef3f

File tree

1 file changed

+136
-71
lines changed

1 file changed

+136
-71
lines changed

swift_upload_runner/upload.py

Lines changed: 136 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -92,27 +92,27 @@ async def a_sync_segments(
9292
self
9393
) -> None:
9494
"""Synchronize segments from storage."""
95-
segments: typing.Union[typing.List, str]
96-
async with self.client.get(
97-
common.generate_download_url(
98-
common.get_download_host(self.auth, self.project),
99-
self.container + "_segments"
100-
),
101-
headers={
102-
"X-Auth-Token": self.auth.get_token()
103-
},
104-
ssl=ssl_context
105-
) as resp:
106-
if resp.status in {200}:
107-
segments = await resp.text()
108-
segments = segments.rstrip().lstrip().split("\n")
109-
segments = list(filter(
110-
lambda i, # type: ignore
111-
path=self.path: path in i, segments
112-
))
113-
if segments:
114-
for segment in segments:
115-
self.done_chunks.add(int(segment.split("/")[-1]))
95+
# segments: typing.Union[typing.List, str]
96+
# async with self.client.get(
97+
# common.generate_download_url(
98+
# common.get_download_host(self.auth, self.project),
99+
# self.container + "_segments"
100+
# ),
101+
# headers={
102+
# "X-Auth-Token": self.auth.get_token()
103+
# },
104+
# ssl=ssl_context
105+
# ) as resp:
106+
# if resp.status in {200}:
107+
# segments = await resp.text()
108+
# segments = segments.rstrip().lstrip().split("\n")
109+
# segments = list(filter(
110+
# lambda i, # type: ignore
111+
# path=self.path: path in i, segments
112+
# ))
113+
# if segments:
114+
# for segment in segments:
115+
# self.done_chunks.add(int(segment.split("/")[-1]))
116116

117117
async def a_create_container(
118118
self,
@@ -232,75 +232,140 @@ async def a_add_chunk(
232232
if chunk_number in self.done_chunks:
233233
return aiohttp.web.Response(status=200)
234234

235-
if self.segmented:
236-
LOGGER.debug(f"Uploading chunk {chunk_number}")
235+
LOGGER.debug(f"Adding chunk {chunk_number}")
236+
await self.q.put((
237+
# Using chunk number as priority, enabling out-of-order chunks
238+
chunk_number,
239+
{"query": query, "data": chunk_reader},
240+
))
241+
242+
if not self.done_chunks:
243+
LOGGER.debug("Scheduling upload coroutine")
244+
self.coro_upload = asyncio.ensure_future(self.upload_file())
245+
246+
if chunk_number + 1 == self.total_chunks:
247+
LOGGER.debug("Waiting for upload to finish")
248+
await self.coro_upload
249+
else:
250+
await self.a_wait_for_chunk(chunk_number)
251+
252+
return aiohttp.web.Response(status=201)
253+
254+
# if self.segmented:
255+
# LOGGER.debug(f"Uploading chunk {chunk_number}")
256+
# async with self.client.put(
257+
# common.generate_download_url(
258+
# self.host,
259+
# container=self.container + "_segments",
260+
# object_name=f"""{self.path}/{chunk_number:08d}"""
261+
# ),
262+
# data=chunk_reader,
263+
# headers={
264+
# "X-Auth-Token": self.auth.get_token(),
265+
# "Content-Length": query["resumableCurrentChunkSize"],
266+
# "Content-Type": "application/swiftclient-segment",
267+
# },
268+
# timeout=UPL_TIMEOUT,
269+
# ssl=ssl_context
270+
# ) as resp:
271+
# if resp.status == 408:
272+
# raise aiohttp.web.HTTPRequestTimeout()
273+
# self.total_uploaded += int(query["resumableCurrentChunkSize"])
274+
# if self.total_uploaded == self.total_size:
275+
# await self.a_add_manifest()
276+
# self.done_chunks.add(chunk_number)
277+
# LOGGER.debug(f"Success in uploding chunk {chunk_number}")
278+
# return aiohttp.web.Response(status=201)
279+
# else:
280+
# LOGGER.debug(f"Concatenating chunk {chunk_number}")
281+
# await self.q.put((
282+
# # Using chunk number as priority, to handle chunks in any
283+
# # order
284+
# chunk_number,
285+
# {"query": query, "data": chunk_reader}
286+
# ))
287+
288+
# if not self.done_chunks:
289+
# LOGGER.debug("Scheduling upload coroutine")
290+
# self.coro_upload = asyncio.ensure_future(self.upload_file())
291+
292+
# if chunk_number + 1 == self.total_chunks:
293+
# LOGGER.debug("Waiting for upload to finish")
294+
# await self.coro_upload
295+
# else:
296+
# await self.a_wait_for_chunk(chunk_number)
297+
298+
# return aiohttp.web.Response(status=201)
299+
300+
async def upload_file(self) -> None:
301+
"""Upload the file with concatenated segments."""
302+
if not self.segmented:
303+
async with self.client.put(
304+
self.url,
305+
data=self.generate_from_queue(),
306+
headers={
307+
"X-Auth-Token": self.auth.get_token(),
308+
"Content-Length": str(self.total_size),
309+
},
310+
timeout=UPL_TIMEOUT,
311+
ssl=ssl_context
312+
) as resp:
313+
if resp.status == 408:
314+
raise aiohttp.web.HTTPRequestTimeout()
315+
if resp.status == 411:
316+
raise aiohttp.web.HTTPLengthRequired()
317+
if resp.status == 422:
318+
raise aiohttp.web.HTTPUnprocessableEntity()
319+
else:
320+
return
321+
322+
# Otherwise segmented upload
323+
segment_number: int = 0
324+
while len(self.done_chunks) < self.total_chunks:
237325
async with self.client.put(
238326
common.generate_download_url(
239327
self.host,
240328
container=self.container + "_segments",
241-
object_name=f"""{self.path}/{chunk_number:08d}"""
329+
object_name=f"""{self.path}/{segment_number:08d}"""
242330
),
243-
data=chunk_reader,
331+
data=self.generate_segment_from_queue(),
244332
headers={
245333
"X-Auth-Token": self.auth.get_token(),
246-
"Content-Length": query["resumableCurrentChunkSize"],
247334
"Content-Type": "application/swiftclient-segment",
248335
},
249336
timeout=UPL_TIMEOUT,
250337
ssl=ssl_context
251338
) as resp:
252339
if resp.status == 408:
253340
raise aiohttp.web.HTTPRequestTimeout()
254-
self.total_uploaded += int(query["resumableCurrentChunkSize"])
255341
if self.total_uploaded == self.total_size:
256342
await self.a_add_manifest()
257-
self.done_chunks.add(chunk_number)
258-
LOGGER.debug(f"Success in uploding chunk {chunk_number}")
259-
return aiohttp.web.Response(status=201)
260-
else:
261-
LOGGER.debug(f"Concatenating chunk {chunk_number}")
262-
await self.q.put((
263-
# Using chunk number as priority, to handle chunks in any
264-
# order
265-
chunk_number,
266-
{"query": query, "data": chunk_reader}
267-
))
268-
269-
if not self.done_chunks:
270-
LOGGER.debug("Scheduling upload coroutine")
271-
self.coro_upload = asyncio.ensure_future(self.upload_file())
272-
273-
if chunk_number + 1 == self.total_chunks:
274-
LOGGER.debug("Waiting for upload to finish")
275-
await self.coro_upload
276-
else:
277-
await self.a_wait_for_chunk(chunk_number)
278-
279-
return aiohttp.web.Response(status=201)
343+
LOGGER.debug(f"Success in uploding chunk {segment_number}")
344+
segment_number += 1
345+
return
346+
347+
async def generate_segment_from_queue(self) -> typing.AsyncGenerator:
348+
"""Generate segment data from the internal queue."""
349+
LOGGER.debug("Generating next segment from the internal queue.""")
350+
initial_uploaded = self.total_uploaded
351+
while len(self.done_chunks) < self.total_chunks:
352+
chunk_number, segment = await self.q.get()
353+
LOGGER.debug(f"Geting chunk {chunk_number}")
354+
chunk_reader = segment["data"]
355+
chunk = await chunk_reader.read_chunk()
356+
while chunk:
357+
yield chunk
358+
chunk = await chunk_reader.read_chunk()
359+
LOGGER.debug(f"Chunk {chunk_number} exhausted.")
360+
self.total_uploaded += \
361+
int(segment["query"]["resumableCurrentChunkSize"])
362+
self.done_chunks.add(chunk_number)
280363

281-
async def upload_file(self) -> None:
282-
"""Upload the file with concatenated segments."""
283-
async with self.client.put(
284-
self.url,
285-
data=self.generate_from_queue(),
286-
headers={
287-
"X-Auth-Token": self.auth.get_token(),
288-
"Content-Length": str(self.total_size),
289-
},
290-
timeout=UPL_TIMEOUT,
291-
ssl=ssl_context
292-
) as resp:
293-
if resp.status == 408:
294-
raise aiohttp.web.HTTPRequestTimeout()
295-
if resp.status == 411:
296-
raise aiohttp.web.HTTPLengthRequired()
297-
if resp.status == 422:
298-
raise aiohttp.web.HTTPUnprocessableEntity()
299-
else:
300-
return
364+
if self.total_uploaded - initial_uploaded >= 1073741824:
365+
break
301366

302367
async def generate_from_queue(self) -> typing.AsyncGenerator:
303-
"""Generate the response data form the internal queue."""
368+
"""Generate the response data from the internal queue."""
304369
LOGGER.debug("Generating upload data from a queue.")
305370
while len(self.done_chunks) < self.total_chunks:
306371
chunk_number, segment = await self.q.get()

0 commit comments

Comments
 (0)