diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py index 1dde1034..07d09b52 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/adapter.py @@ -13,6 +13,10 @@ @dataclass(frozen=True, kw_only=True, slots=True) class AsyncFileStream(ObjectStream[bytes]): + """ + wrapper type for opendal.AsyncFile to make it compatible with anyio streams + """ + file: AsyncFile async def send(self, item: bytes): @@ -42,21 +46,22 @@ async def aclose(self): @dataclass(kw_only=True) class OpendalAdapter(ClientAdapter): - operator: Operator - path: str - mode: Literal["rb", "wb"] = "rb" + operator: Operator # opendal.Operator for the storage backend + path: str # file path in storage backend relative to the storage root + mode: Literal["rb", "wb"] = "rb" # binary read or binary write mode async def __aenter__(self): + # if the access mode is binary read, and the storage backend supports presigned read requests if self.mode == "rb" and self.operator.capability().presign_read: + # create presigned url for the specified file with a 60 second expiration presigned = await self.operator.to_async_operator().presign_read(self.path, expire_second=60) return PresignedRequestResource( headers=presigned.headers, url=presigned.url, method=presigned.method ).model_dump(mode="json") + # otherwise stream the file content from the client to the exporter else: file = await self.operator.to_async_operator().open(self.path, self.mode) - self.resource = self.client.resource_async(AsyncFileStream(file=file)) - return await self.resource.__aenter__() async def __aexit__(self, exc_type, exc_value, traceback): diff --git a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py index 195c2183..b80f1522 100644 --- a/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py +++ b/packages/jumpstarter-driver-opendal/jumpstarter_driver_opendal/driver_test.py @@ -14,16 +14,23 @@ def test_drivers_mock_storage_mux_fs(monkeypatch: pytest.MonkeyPatch): with serve(MockStorageMux()) as client: with TemporaryDirectory() as tempdir: + # original file on the client to be pushed to the exporter original = Path(tempdir) / "original" + # new file read back from the exporter to the client readback = Path(tempdir) / "readback" - # absolute path + # test accessing files with absolute path + + # fill the original file with random bytes original.write_bytes(randbytes(1024 * 1024 * 10)) + # write the file to the storage on the exporter client.write_local_file(str(original)) + # read the storage on the exporter to a local file client.read_local_file(str(readback)) + # ensure the contents are equal assert original.read_bytes() == readback.read_bytes() - # relative path + # test accessing files with relative path with monkeypatch.context() as m: m.chdir(tempdir) @@ -39,6 +46,7 @@ def test_drivers_mock_storage_mux_fs(monkeypatch: pytest.MonkeyPatch): def test_drivers_mock_storage_mux_http(): + # dummy HTTP server returning static test content class StaticHandler(BaseHTTPRequestHandler): def do_HEAD(self): self.send_response(200) @@ -52,11 +60,13 @@ def do_GET(self): self.wfile.write(b"testcontent" * 1000) with serve(MockStorageMux()) as client: + # start the HTTP server server = HTTPServer(("127.0.0.1", 8080), StaticHandler) server_thread = Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() + # write a remote file from the http server to the exporter fs = Operator("http", endpoint="http://127.0.0.1:8080") client.write_file(fs, "test")