diff --git a/azure_functions_worker/bindings/meta.py b/azure_functions_worker/bindings/meta.py index c6148344..a0e86f39 100644 --- a/azure_functions_worker/bindings/meta.py +++ b/azure_functions_worker/bindings/meta.py @@ -150,7 +150,7 @@ def has_implicit_output(bind_name: str) -> bool: return getattr(binding, 'has_implicit_output', lambda: False)() -def from_incoming_proto( +async def from_incoming_proto( binding: str, pb: protos.ParameterBinding, *, pytype: typing.Optional[type], @@ -180,11 +180,11 @@ def from_incoming_proto( try: # if the binding is an sdk type binding if is_deferred_binding: - return deferred_bindings_decode(binding=binding, - pb=pb, - pytype=pytype, - datum=datum, - metadata=metadata) + return await deferred_bindings_decode(binding=binding, + pb=pb, + pytype=pytype, + datum=datum, + metadata=metadata) return binding.decode(datum, trigger_metadata=metadata) except NotImplementedError: # Binding does not support the data. @@ -277,11 +277,11 @@ def to_outgoing_param_binding(binding: str, obj: typing.Any, *, data=rpc_val) -def deferred_bindings_decode(binding: typing.Any, - pb: protos.ParameterBinding, *, - pytype: typing.Optional[type], - datum: typing.Any, - metadata: typing.Any): +async def deferred_bindings_decode(binding: typing.Any, + pb: protos.ParameterBinding, *, + pytype: typing.Optional[type], + datum: typing.Any, + metadata: typing.Any): """ This cache holds deferred binding types (ie. BlobClient, ContainerClient) That have already been created, so that the worker can reuse the @@ -298,9 +298,9 @@ def deferred_bindings_decode(binding: typing.Any, pytype, datum.value.content)) else: - deferred_binding_type = binding.decode(datum, - trigger_metadata=metadata, - pytype=pytype) + deferred_binding_type = await binding.decode(datum, + trigger_metadata=metadata, + pytype=pytype) deferred_bindings_cache[(pb.name, pytype, datum.value.content)] = deferred_binding_type diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 807985cc..d7a39e87 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -567,7 +567,7 @@ async def _handle__invocation_request(self, request): else: trigger_metadata = None - args[pb.name] = bindings.from_incoming_proto( + args[pb.name] = await bindings.from_incoming_proto( pb_type_info.binding_name, pb, trigger_metadata=trigger_metadata, diff --git a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py index 1a8062aa..22ce56a7 100644 --- a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py +++ b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py @@ -4,6 +4,7 @@ import azure.functions as func import azurefunctions.extensions.bindings.blob as blob +import azurefunctions.extensions.bindings.blob.aio as aioblob app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) @@ -259,6 +260,43 @@ def blob_cache(req: func.HttpRequest, return cachedClient.download_blob(encoding='utf-8').readall() +@app.function_name(name="aio_blob_client") +@app.blob_input(arg_name="client", + path="python-worker-tests/test-blobclient-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_blob_client") +async def aio_blob_client(req: func.HttpRequest, + client: aioblob.BlobClient) -> str: + stream = await client.download_blob() + data = await stream.readall() + return str(data) + + +@app.function_name(name="aio_container_client") +@app.blob_input(arg_name="client", + path="python-worker-tests/test-containerclient-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_container_client") +async def aio_container_client(req: func.HttpRequest, + client: aioblob.ContainerClient) -> str: + stream = await client.download_blob("test-containerclient-triggered.txt", + encoding='utf-8') + data = await stream.readall() + return str(data) + + +@app.function_name(name="aio_ssd") +@app.blob_input(arg_name="stream", + path="python-worker-tests/test-ssd-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_ssd") +async def aio_ssd(req: func.HttpRequest, + stream: aioblob.StorageStreamDownloader) -> str: + file = await stream.readall() + decoded = file.decode('utf-8') + return str(decoded) + + @app.function_name(name="invalid_connection_info") @app.blob_input(arg_name="client", path="python-worker-tests/test-blobclient-triggered.txt", diff --git a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py index da60861f..7ebdb02a 100644 --- a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py +++ b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py @@ -184,6 +184,24 @@ def test_caching(self): r = self.webhost.request('GET', 'blob_cache') self.assertEqual(r.status_code, 200) + def test_aio_clients(self): + r = self.webhost.request('GET', 'aio_blob_client') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, + ('b\'{"name": "test-blobclient-trigger.txt",' + ' "length": 9, "content": ' + '"DummyData"}\'')) + + r = self.webhost.request('GET', 'aio_container_client') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, + '{\r\n "name": "python-worker-tests",\r\n ' + '"content": "DummyData"\r\n}') + + r = self.webhost.request('GET', 'aio_ssd') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, '{\r\n "content": "DummyData"\r\n}') + def test_failed_client_creation(self): r = self.webhost.request('GET', 'invalid_connection_info') # Without the http_v2_enabled default definition, this request would time out.