Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Commit 3300895

Browse files
committed
tidy up segmented upload concatenation
1 parent 05cef3f commit 3300895

File tree

2 files changed

+11
-95
lines changed

2 files changed

+11
-95
lines changed

swift_upload_runner/common.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ async def get_upload_instance(
122122
request.app["client"]
123123
)
124124
await upload_session.a_check_container()
125-
if upload_session.get_segmented():
126-
await upload_session.a_sync_segments()
127125
request.app[session]["uploads"][pro][cont][ident] = upload_session
128126

129127
return upload_session

swift_upload_runner/upload.py

Lines changed: 11 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -88,32 +88,6 @@ def __init__(
8888
if self.total_size >= 5368709120:
8989
self.segmented = True
9090

91-
async def a_sync_segments(
92-
self
93-
) -> None:
94-
"""Synchronize segments from storage."""
95-
# segments: typing.Union[typing.List, str]
96-
# async with self.client.get(
97-
# common.generate_download_url(
98-
# common.get_download_host(self.auth, self.project),
99-
# self.container + "_segments"
100-
# ),
101-
# headers={
102-
# "X-Auth-Token": self.auth.get_token()
103-
# },
104-
# ssl=ssl_context
105-
# ) as resp:
106-
# if resp.status in {200}:
107-
# segments = await resp.text()
108-
# segments = segments.rstrip().lstrip().split("\n")
109-
# segments = list(filter(
110-
# lambda i, # type: ignore
111-
# path=self.path: path in i, segments
112-
# ))
113-
# if segments:
114-
# for segment in segments:
115-
# self.done_chunks.add(int(segment.split("/")[-1]))
116-
11791
async def a_create_container(
11892
self,
11993
segmented: bool = False
@@ -251,52 +225,6 @@ async def a_add_chunk(
251225

252226
return aiohttp.web.Response(status=201)
253227

254-
# if self.segmented:
255-
# LOGGER.debug(f"Uploading chunk {chunk_number}")
256-
# async with self.client.put(
257-
# common.generate_download_url(
258-
# self.host,
259-
# container=self.container + "_segments",
260-
# object_name=f"""{self.path}/{chunk_number:08d}"""
261-
# ),
262-
# data=chunk_reader,
263-
# headers={
264-
# "X-Auth-Token": self.auth.get_token(),
265-
# "Content-Length": query["resumableCurrentChunkSize"],
266-
# "Content-Type": "application/swiftclient-segment",
267-
# },
268-
# timeout=UPL_TIMEOUT,
269-
# ssl=ssl_context
270-
# ) as resp:
271-
# if resp.status == 408:
272-
# raise aiohttp.web.HTTPRequestTimeout()
273-
# self.total_uploaded += int(query["resumableCurrentChunkSize"])
274-
# if self.total_uploaded == self.total_size:
275-
# await self.a_add_manifest()
276-
# self.done_chunks.add(chunk_number)
277-
# LOGGER.debug(f"Success in uploding chunk {chunk_number}")
278-
# return aiohttp.web.Response(status=201)
279-
# else:
280-
# LOGGER.debug(f"Concatenating chunk {chunk_number}")
281-
# await self.q.put((
282-
# # Using chunk number as priority, to handle chunks in any
283-
# # order
284-
# chunk_number,
285-
# {"query": query, "data": chunk_reader}
286-
# ))
287-
288-
# if not self.done_chunks:
289-
# LOGGER.debug("Scheduling upload coroutine")
290-
# self.coro_upload = asyncio.ensure_future(self.upload_file())
291-
292-
# if chunk_number + 1 == self.total_chunks:
293-
# LOGGER.debug("Waiting for upload to finish")
294-
# await self.coro_upload
295-
# else:
296-
# await self.a_wait_for_chunk(chunk_number)
297-
298-
# return aiohttp.web.Response(status=201)
299-
300228
async def upload_file(self) -> None:
301229
"""Upload the file with concatenated segments."""
302230
if not self.segmented:
@@ -328,7 +256,7 @@ async def upload_file(self) -> None:
328256
container=self.container + "_segments",
329257
object_name=f"""{self.path}/{segment_number:08d}"""
330258
),
331-
data=self.generate_segment_from_queue(),
259+
data=self.generate_from_queue(),
332260
headers={
333261
"X-Auth-Token": self.auth.get_token(),
334262
"Content-Type": "application/swiftclient-segment",
@@ -344,29 +272,12 @@ async def upload_file(self) -> None:
344272
segment_number += 1
345273
return
346274

347-
async def generate_segment_from_queue(self) -> typing.AsyncGenerator:
348-
"""Generate segment data from the internal queue."""
349-
LOGGER.debug("Generating next segment from the internal queue.""")
350-
initial_uploaded = self.total_uploaded
351-
while len(self.done_chunks) < self.total_chunks:
352-
chunk_number, segment = await self.q.get()
353-
LOGGER.debug(f"Geting chunk {chunk_number}")
354-
chunk_reader = segment["data"]
355-
chunk = await chunk_reader.read_chunk()
356-
while chunk:
357-
yield chunk
358-
chunk = await chunk_reader.read_chunk()
359-
LOGGER.debug(f"Chunk {chunk_number} exhausted.")
360-
self.total_uploaded += \
361-
int(segment["query"]["resumableCurrentChunkSize"])
362-
self.done_chunks.add(chunk_number)
363-
364-
if self.total_uploaded - initial_uploaded >= 1073741824:
365-
break
366-
367275
async def generate_from_queue(self) -> typing.AsyncGenerator:
368276
"""Generate the response data from the internal queue."""
369277
LOGGER.debug("Generating upload data from a queue.")
278+
279+
initial_uploaded = self.total_uploaded
280+
370281
while len(self.done_chunks) < self.total_chunks:
371282
chunk_number, segment = await self.q.get()
372283

@@ -384,6 +295,13 @@ async def generate_from_queue(self) -> typing.AsyncGenerator:
384295
int(segment["query"]["resumableCurrentChunkSize"])
385296
self.done_chunks.add(chunk_number)
386297

298+
# In case of a segmented upload cut the chunk at 1GiB or over
299+
if (
300+
self.segmented
301+
and self.total_uploaded - initial_uploaded >= 1073741824
302+
):
303+
break
304+
387305
async def a_wait_for_chunk(
388306
self,
389307
chunk_number: int

0 commit comments

Comments
 (0)