-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservices.py
59 lines (48 loc) · 1.51 KB
/
services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from typing import Optional
from contextlib import closing
from hashlib import md5
from storage import FileStorage
from db import Blob,Session
from storage import ExistsError,NotFoundError
from exceptions import DbCorruptionError
storage_backend = FileStorage()
def withsession(fn):
def inner(sess=None,*args,**kwargs):
if sess is None:
with closing(Session()) as sess:
return fn(sess = sess,*args,**kwargs)
else:
fn(sess,*args,**kwargs)
return inner
@withsession
def store(data: bytes,sess: Optional[Session] = None):
if sess is not None:
data_id = md5(data).hexdigest()
existing = sess.query(Blob).get(data_id)
if existing is not None:
raise ExistsError
blob = Blob(id=data_id)
sess.add(blob)
storage_backend.store(data,data_id)
sess.commit()
return data_id
else:
return None
@withsession
def retrieve(did: str, sess: Optional[Session] = None):
blob_entry = sess.query(Blob).get(did)
if blob_entry is None:
raise NotFoundError
try:
storage_backend.retrieve(blob_entry.id)
except NotFoundError:
sess.delete(blob_entry)
sess.commit()
raise DbCorruptionError(f"{blob_entry.id}"
" was not associated with a file")
return storage_backend.retrieve(blob_entry.id)
@withsession
def getlist(sess: Optional[Session] = None):
blobs = sess.query(Blob).all()
ids = [b.id for b in blobs]
return ids