-
Notifications
You must be signed in to change notification settings - Fork 382
Dropbox implementation + add existing implementation in the registry #207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ba37f48
1e84ce2
5f862ff
aa6517f
4b06da3
fda2e8d
8149569
7f6a462
d193f04
5577251
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
import requests | ||
import dropbox | ||
from ..spec import AbstractFileSystem, AbstractBufferedFile | ||
from ..caching import AllBytes | ||
|
||
|
||
class DropboxDriveFileSystem(AbstractFileSystem): | ||
""" Interface dropbox to connect, list and manage files | ||
Parameters: | ||
---------- | ||
token : str | ||
Generated key by adding a dropbox app in the user dropbox account. | ||
Needs to be done by the user | ||
|
||
""" | ||
|
||
def __init__(self, token, *args, **storage_options): | ||
MarineChap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
super().__init__(token=token, *args, **storage_options) | ||
self.token = token | ||
self.kwargs = storage_options | ||
self.connect() | ||
|
||
def connect(self): | ||
""" connect to the dropbox account with the given token | ||
""" | ||
self.dbx = dropbox.Dropbox(self.token) | ||
self.session = requests.Session() | ||
self.session.auth = ("Authorization", self.token) | ||
|
||
def ls(self, path, detail=True, **kwargs): | ||
""" List objects at path | ||
""" | ||
while "//" in path: | ||
path = path.replace("//", "/") | ||
list_file = [] | ||
list_item = self.dbx.files_list_folder( | ||
path, recursive=True, include_media_info=True | ||
) | ||
items = list_item.entries | ||
while list_item.has_more: | ||
list_item = self.dbx.files_list_folder_continue(list_item.cursor) | ||
items = list_item.entries + items | ||
|
||
if detail: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noting that this does not cache listings for future calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I try to look how it is done in other implementations but I don't see any difference... Can you give an hint about what you were expecting here please ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some implementations populate a |
||
for item in list_item.entries: | ||
if isinstance(item, dropbox.files.FileMetadata): | ||
list_file.append( | ||
{"name": item.path_display, "size": item.size, "type": "file"} | ||
) | ||
elif isinstance(item, dropbox.files.FolderMetadata): | ||
list_file.append( | ||
{"name": item.path_display, "size": None, "type": "folder"} | ||
) | ||
else: | ||
list_file.append( | ||
{"name": item.path_display, "size": item.size, "type": "unknow"} | ||
) | ||
else: | ||
for item in list_item.entries: | ||
list_file.append(item.path_display) | ||
return list_file | ||
|
||
def _open( | ||
self, | ||
path, | ||
mode="rb", | ||
block_size=None, | ||
autocommit=True, | ||
cache_options=None, | ||
**kwargs | ||
): | ||
return DropboxDriveFile( | ||
self, | ||
path, | ||
mode=mode, | ||
block_size=4 * 1024 * 1024, | ||
autocommit=autocommit, | ||
cache_options=cache_options, | ||
**kwargs | ||
) | ||
|
||
def info(self, url, **kwargs): | ||
"""Get info of URL | ||
""" | ||
metadata = self.dbx.files_get_metadata(url) | ||
if isinstance(metadata, dropbox.files.FileMetadata): | ||
return { | ||
"name": metadata.path_display, | ||
"size": metadata.size, | ||
"type": "file", | ||
} | ||
elif isinstance(metadata, dropbox.files.FolderMetadata): | ||
return {"name": metadata.path_display, "size": None, "type": "folder"} | ||
else: | ||
return {"name": url, "size": None, "type": "unknow"} | ||
|
||
|
||
class DropboxDriveFile(AbstractBufferedFile): | ||
""" fetch_all, fetch_range, and read method are based from the http implementation | ||
""" | ||
|
||
def __init__( | ||
self, | ||
fs, | ||
path, | ||
mode="rb", | ||
block_size="default", | ||
autocommit=True, | ||
cache_type="readahead", | ||
cache_options=None, | ||
**kwargs | ||
): | ||
""" | ||
Open a file. | ||
Parameters | ||
---------- | ||
fs: instance of DropboxDriveFileSystem | ||
path : str | ||
file path to inspect in dropbox | ||
mode: str | ||
Normal file modes.'rb' or 'wb' | ||
block_size: int or None | ||
The amount of read-ahead to do, in bytes. Default is 5MB, or the value | ||
configured for the FileSystem creating this file | ||
""" | ||
for key, value in kwargs.items(): | ||
print("{0} = {1}".format(key, value)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be removed |
||
self.session = fs.session if fs.session is not None else requests.Session() | ||
super().__init__(fs=fs, path=path, mode=mode, block_size=block_size, **kwargs) | ||
|
||
self.path = path | ||
self.dbx = self.fs.dbx | ||
while "//" in path: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
path = path.replace("//", "/") | ||
self.url = self.dbx.files_get_temporary_link(path).link | ||
|
||
def read(self, length=-1): | ||
"""Read bytes from file | ||
|
||
Parameters | ||
---------- | ||
length: int | ||
Read up to this many bytes. If negative, read all content to end of | ||
file. If the server has not supplied the filesize, attempting to | ||
read only part of the data will raise a ValueError. | ||
""" | ||
if ( | ||
(length < 0 and self.loc == 0) | ||
or (length > (self.size or length)) # explicit read all | ||
or ( # read more than there is | ||
self.size and self.size < self.blocksize | ||
) # all fits in one block anyway | ||
): | ||
self._fetch_all() | ||
if self.size is None: | ||
if length < 0: | ||
self._fetch_all() | ||
else: | ||
length = min(self.size - self.loc, length) | ||
return super().read(length) | ||
|
||
def _fetch_all(self): | ||
"""Read whole file in one shot, without caching | ||
|
||
This is only called when position is still at zero, | ||
and read() is called without a byte-count. | ||
""" | ||
if not isinstance(self.cache, AllBytes): | ||
r = self.session.get(self.url, **self.kwargs) | ||
r.raise_for_status() | ||
out = r.content | ||
self.cache = AllBytes(out) | ||
self.size = len(out) | ||
|
||
def _fetch_range(self, start, end): | ||
"""Download a block of data | ||
|
||
The expectation is that the server returns only the requested bytes, | ||
with HTTP code 206. If this is not the case, we first check the headers, | ||
and then stream the output - if the data size is bigger than we | ||
requested, an exception is raised. | ||
""" | ||
kwargs = self.kwargs.copy() | ||
headers = kwargs.pop("headers", {}) | ||
headers["Range"] = "bytes=%i-%i" % (start, end - 1) | ||
r = self.session.get(self.url, headers=headers, stream=True, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much of this seems to be copied from HTTPFile - which is OK, I guess, but less duplication would be good There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a previous comment, I asked you how to proceed with this "copy-past". I did not know if I should import http implementation to use this method and avoid redundancy or if copy-past was fine as the implementations should stay independent between them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you could have subclassed HTTPFile successfully to reduce the amount of code here, but without trying it, I cannot be sure - you may have had to override many of the methods anyway. Initially I suspected that you could just use HTTPFile directly, passing the known size in the constructor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, both methods are working. Except, that by subclassed HTTPFile, a little trick is needed to avoid error raise by the mode "wb" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, well we don't like dirty code, so no need to change it. |
||
if r.status_code == 416: | ||
# range request outside file | ||
return b"" | ||
r.raise_for_status() | ||
if r.status_code == 206: | ||
# partial content, as expected | ||
out = r.content | ||
elif "Content-Length" in r.headers: | ||
cl = int(r.headers["Content-Length"]) | ||
if cl <= end - start: | ||
# data size OK | ||
out = r.content | ||
else: | ||
raise ValueError( | ||
"Got more bytes (%i) than requested (%i)" % (cl, end - start) | ||
) | ||
else: | ||
cl = 0 | ||
out = [] | ||
for chunk in r.iter_content(chunk_size=2 ** 20): | ||
# data size unknown, let's see if it goes too big | ||
if chunk: | ||
out.append(chunk) | ||
cl += len(chunk) | ||
if cl > end - start: | ||
raise ValueError( | ||
"Got more bytes so far (>%i) than requested (%i)" | ||
% (cl, end - start) | ||
) | ||
else: | ||
break | ||
out = b"".join(out) | ||
return out | ||
|
||
def _upload_chunk(self, final=False): | ||
self.cursor.offset += self.buffer.seek(0, 2) | ||
if final: | ||
self.dbx.files_upload_session_finish( | ||
self.buffer.getvalue(), self.cursor, self.commit | ||
) | ||
else: | ||
self.dbx.files_upload_session_append( | ||
self.buffer.getvalue(), self.cursor.session_id, self.cursor.offset | ||
) | ||
|
||
def _initiate_upload(self): | ||
""" Initiate the upload session | ||
""" | ||
session = self.dbx.files_upload_session_start(self.buffer.getvalue()) | ||
self.commit = dropbox.files.CommitInfo(path=self.path) | ||
self.cursor = dropbox.files.UploadSessionCursor( | ||
session_id=session.session_id, offset=self.offset | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from the http implementation, which seems unnecessary duplication. Indeed, if we can fetch ranges, then this is not required anymore at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment below about "copy-past".
Also, your colleague advises me to put this class in the caching file. If the http implementation is modified to used it from the catching module, it will remove the redundancy problem at least for this part of the code and it seems also more coherent. But as the pull request is not about the http implementation, I didn't wanted to modify it my-self here.
Tell me what you prefer about this please. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that this is clearly a caching method and should be included in the caching module, although it is not particularly useful for most files, since it just keeps the whole thing in memory. Moving the def out of the http module now is fine - there are tests for that functionality that would fail if somehow moving the code went wrong.