Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments to the opendal packages #246

Merged
merged 5 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

@dataclass(frozen=True, kw_only=True, slots=True)
class AsyncFileStream(ObjectStream[bytes]):
"""
wrapper type for opendal.AsyncFile to make it compatible with anyio streams
"""

file: AsyncFile

async def send(self, item: bytes):
Expand Down Expand Up @@ -42,21 +46,22 @@ async def aclose(self):

@dataclass(kw_only=True)
class OpendalAdapter(ClientAdapter):
operator: Operator
path: str
mode: Literal["rb", "wb"] = "rb"
operator: Operator # opendal.Operator for the storage backend
path: str # file path in storage backend relative to the storage root
mode: Literal["rb", "wb"] = "rb" # binary read or binary write mode

async def __aenter__(self):
# if the access mode is binary read, and the storage backend supports presigned read requests
if self.mode == "rb" and self.operator.capability().presign_read:
# create presigned url for the specified file with a 60 second expiration
presigned = await self.operator.to_async_operator().presign_read(self.path, expire_second=60)
return PresignedRequestResource(
headers=presigned.headers, url=presigned.url, method=presigned.method
).model_dump(mode="json")
# otherwise stream the file content from the client to the exporter
else:
file = await self.operator.to_async_operator().open(self.path, self.mode)

self.resource = self.client.resource_async(AsyncFileStream(file=file))

return await self.resource.__aenter__()

async def __aexit__(self, exc_type, exc_value, traceback):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@
def test_drivers_mock_storage_mux_fs(monkeypatch: pytest.MonkeyPatch):
with serve(MockStorageMux()) as client:
with TemporaryDirectory() as tempdir:
# original file on the client to be pushed to the exporter
original = Path(tempdir) / "original"
# new file read back from the exporter to the client
readback = Path(tempdir) / "readback"

# absolute path
# test accessing files with absolute path

# fill the original file with random bytes
original.write_bytes(randbytes(1024 * 1024 * 10))
# write the file to the storage on the exporter
client.write_local_file(str(original))
# read the storage on the exporter to a local file
client.read_local_file(str(readback))
# ensure the contents are equal
assert original.read_bytes() == readback.read_bytes()

# relative path
# test accessing files with relative path
with monkeypatch.context() as m:
m.chdir(tempdir)

Expand All @@ -39,6 +46,7 @@ def test_drivers_mock_storage_mux_fs(monkeypatch: pytest.MonkeyPatch):


def test_drivers_mock_storage_mux_http():
# dummy HTTP server returning static test content
class StaticHandler(BaseHTTPRequestHandler):
def do_HEAD(self):
self.send_response(200)
Expand All @@ -52,11 +60,13 @@ def do_GET(self):
self.wfile.write(b"testcontent" * 1000)

with serve(MockStorageMux()) as client:
# start the HTTP server
server = HTTPServer(("127.0.0.1", 8080), StaticHandler)
server_thread = Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()

# write a remote file from the http server to the exporter
fs = Operator("http", endpoint="http://127.0.0.1:8080")
client.write_file(fs, "test")

Expand Down