Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Commit feacb6b

Browse files
authored
Merge pull request #1 from CSCfi/devel
Add resumablejs uploads and container / object replication
2 parents ef8cfd3 + 86c9250 commit feacb6b

File tree

8 files changed

+946
-28
lines changed

8 files changed

+946
-28
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
### swift-upload-runner – better browser file upload and download for swift-browser-ui
22

33
`swift-upload-runner` makes it possible to properly batch upload from browser
4-
into object storage backends using Openstack Swift API. It also will make possible
4+
into object storage backends using Openstack Swift API. It will also make possible
55
proper container uploads without plaintext passwords or tokens, to prevent
66
token expiration.
77

swift_upload_runner/api.py

+130-4
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33

44
import aiohttp.web
5-
# import asyncio
5+
import asyncio
66

7-
from .common import get_auth_instance
7+
from .common import get_auth_instance, get_upload_instance
8+
from .common import parse_multipart_in
89
from .download import FileDownloadProxy, ContainerArchiveDownloadProxy
10+
from .replicate import ObjectReplicationProxy
911

1012

1113
async def handle_get_object(
@@ -25,8 +27,8 @@ async def handle_get_object(
2527
resp = aiohttp.web.StreamResponse()
2628

2729
# Create headers
28-
resp.headers["Content-Type"] = download.get_type()
29-
resp.headers["Content-Length"] = str(download.get_size())
30+
resp.headers["Content-Type"] = await download.a_get_type()
31+
resp.headers["Content-Length"] = str(await download.a_get_size())
3032

3133
await resp.prepare(request)
3234

@@ -36,10 +38,134 @@ async def handle_get_object(
3638
return resp
3739

3840

41+
async def handle_replicate_container(
42+
request: aiohttp.web.Request
43+
) -> aiohttp.web.Response:
44+
"""Handle request to replicating a container from a source."""
45+
auth = get_auth_instance(request)
46+
47+
project = request.match_info["project"]
48+
container = request.match_info["container"]
49+
50+
source_project = request.query["from_project"]
51+
source_container = request.query["from_container"]
52+
53+
replicator = ObjectReplicationProxy(
54+
auth,
55+
request.app["client"],
56+
project,
57+
container,
58+
source_project,
59+
source_container
60+
)
61+
62+
asyncio.ensure_future(replicator.a_copy_from_container())
63+
64+
return aiohttp.web.Response(status=202)
65+
66+
67+
async def handle_replicate_object(
68+
request: aiohttp.web.Request
69+
) -> aiohttp.web.Response:
70+
"""Handle a request to replicating an object from a source."""
71+
auth = get_auth_instance(request)
72+
73+
project = request.match_info["project"]
74+
container = request.match_info["container"]
75+
76+
source_project = request.query["from_project"]
77+
source_container = request.query["from_container"]
78+
source_object = request.query["from_object"]
79+
80+
replicator = ObjectReplicationProxy(
81+
auth,
82+
request.app["client"],
83+
project,
84+
container,
85+
source_project,
86+
source_container
87+
)
88+
89+
asyncio.ensure_future(replicator.a_copy_object(source_object))
90+
91+
return aiohttp.web.Response(status=202)
92+
93+
94+
async def handle_post_object_chunk(
95+
request: aiohttp.web.Request
96+
) -> aiohttp.web.Response:
97+
"""Handle a request for posting an object chunk."""
98+
if "from_object" in request.query.keys():
99+
return await handle_replicate_object(request)
100+
if "from_container" in request.query.keys():
101+
return await handle_replicate_container(request)
102+
103+
project = request.match_info["project"]
104+
container = request.match_info["container"]
105+
106+
query, data = await parse_multipart_in(request)
107+
108+
upload_session = await get_upload_instance(
109+
request,
110+
project,
111+
container,
112+
p_query=query
113+
)
114+
115+
return await upload_session.a_add_chunk(
116+
query,
117+
data
118+
)
119+
120+
121+
async def handle_get_object_chunk(
122+
request: aiohttp.web.Request
123+
) -> aiohttp.web.Response:
124+
"""Handle a request for checking if a chunk exists."""
125+
get_auth_instance(request)
126+
127+
project = request.match_info["project"]
128+
container = request.match_info["container"]
129+
130+
try:
131+
# Infuriatingly resumable.js starts counting chunks from 1
132+
# thus, reducing said 1 from the resulting chunk number
133+
chunk_number = int(request.query["resumableChunkNumber"]) - 1
134+
except KeyError:
135+
raise aiohttp.web.HTTPBadRequest(reason="Malformed query string")
136+
137+
upload_session = await get_upload_instance(
138+
request,
139+
project,
140+
container
141+
)
142+
143+
return await upload_session.a_check_segment(
144+
chunk_number
145+
)
146+
147+
148+
async def handle_post_object_options(
149+
request: aiohttp.web.Request
150+
) -> aiohttp.web.Response:
151+
"""Handle options request for posting the object chunk."""
152+
resp = aiohttp.web.Response(
153+
headers={
154+
"Access-Control-Allow-Methods": "POST, OPTIONS, GET",
155+
"Access-Control-Max-Age": "84600",
156+
}
157+
)
158+
159+
return resp
160+
161+
39162
async def handle_get_container(
40163
request: aiohttp.web.Request
41164
) -> aiohttp.web.StreamResponse:
42165
"""Handle a request for getting container contents as an archive."""
166+
if "resumableChunkNumber" in request.query.keys():
167+
return await handle_get_object_chunk(request)
168+
43169
auth = get_auth_instance(request)
44170

45171
resp = aiohttp.web.StreamResponse()

swift_upload_runner/auth.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ async def handle_login(
3333
project = request.match_info["project"]
3434
login_form = await request.post()
3535
token = login_form["token"]
36-
request.app[session_key] = initiate_os_session(
36+
request.app[session_key] = {}
37+
request.app[session_key]["uploads"] = {}
38+
request.app[session_key]["auth"] = initiate_os_session(
3739
token,
3840
project
3941
)
@@ -96,7 +98,7 @@ async def handle_validate_authentication(
9698
path = request.url.path
9799
except KeyError:
98100
raise aiohttp.web.HTTPUnauthorized(
99-
reason="Query string missing validity or signature."
101+
reason="Query string missing validity or signature"
100102
)
101103

102104
await test_signature(

swift_upload_runner/common.py

+98-7
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66

77
import aiohttp.web
8-
98
import keystoneauth1.session
109

1110

11+
import swift_upload_runner.upload as upload
12+
13+
1214
def generate_download_url(
1315
host,
1416
container=None,
@@ -23,21 +25,110 @@ def generate_download_url(
2325
return f'{host}/{container}/{object_name}'
2426

2527

26-
def get_auth_instance(
28+
def get_download_host(
29+
auth: keystoneauth1.session.Session,
30+
project: str
31+
) -> str:
32+
"""Get the actual download host with shared container support."""
33+
ret = auth.get_endpoint(service_type="object-store")
34+
35+
if project not in ret:
36+
ret = ret.replace(ret.split("/")[-1], project)
37+
38+
return ret
39+
40+
41+
def get_session_id(
2742
request: aiohttp.web.Request
28-
) -> keystoneauth1.session.Session:
29-
"""Return the session specific keystone auth instance"""
43+
) -> str:
44+
"""Return the session id from request."""
3045
try:
31-
return request.app[request.cookies["RUNNER_SESSION_ID"]]
46+
return request.cookies["RUNNER_SESSION_ID"]
3247
except KeyError:
3348
try:
34-
return request.app[request.query["session"]]
49+
return request.query["session"]
3550
except KeyError:
3651
raise aiohttp.web.HTTPUnauthorized(
37-
reason="Runner session ID missing"
52+
reason="Missing runner session ID"
3853
)
3954

4055

56+
def get_auth_instance(
57+
request: aiohttp.web.Request
58+
) -> keystoneauth1.session.Session:
59+
"""Return the session specific keystone auth instance"""
60+
return request.app[get_session_id(request)]["auth"]
61+
62+
63+
async def parse_multipart_in(
64+
request: aiohttp.web.Request,
65+
) -> typing.Tuple[typing.Dict[str, typing.Any], aiohttp.MultipartReader]:
66+
"""Parse the form headers into a dictionary and chunk data as reader."""
67+
reader = await request.multipart()
68+
69+
ret_d = {}
70+
71+
while True:
72+
field = await reader.next()
73+
if field.name == "file": # type: ignore
74+
ret_d["filename"] = field.filename # type: ignore
75+
return ret_d, field # type: ignore
76+
if field.name == "resumableChunkNumber": # type: ignore
77+
ret_d["resumableChunkNumber"] = \
78+
int(await field.text()) # type: ignore
79+
else:
80+
ret_d[
81+
str(field.name) # type: ignore
82+
] = await field.text() # type: ignore
83+
84+
85+
async def get_upload_instance(
86+
request: aiohttp.web.Request,
87+
pro: str,
88+
cont: str,
89+
p_query: typing.Optional[dict] = None,
90+
) -> upload.ResumableFileUploadProxy:
91+
"""Return the specific upload proxy for the resumable upload."""
92+
session = get_session_id(request)
93+
94+
if p_query:
95+
query: dict = p_query
96+
else:
97+
query = request.query
98+
99+
# Check the existence of the dictionary structure
100+
try:
101+
request.app[session]["uploads"][pro]
102+
except KeyError:
103+
request.app[session]["uploads"][pro] = {}
104+
105+
try:
106+
request.app[session]["uploads"][pro][cont]
107+
except KeyError:
108+
request.app[session]["uploads"][pro][cont] = {}
109+
110+
try:
111+
ident = query["resumableIdentifier"]
112+
except KeyError:
113+
raise aiohttp.web.HTTPBadRequest(reason="Malformed query string")
114+
try:
115+
upload_session = request.app[session]["uploads"][pro][cont][ident]
116+
except KeyError:
117+
auth = get_auth_instance(request)
118+
upload_session = upload.ResumableFileUploadProxy(
119+
auth,
120+
query,
121+
request.match_info,
122+
request.app["client"]
123+
)
124+
await upload_session.a_check_container()
125+
if upload_session.get_segmented():
126+
await upload_session.a_sync_segments()
127+
request.app[session]["uploads"][pro][cont][ident] = upload_session
128+
129+
return upload_session
130+
131+
41132
def get_path_from_list(
42133
to_parse: typing.List[str],
43134
path_prefix: str

0 commit comments

Comments
 (0)