Skip to content
This repository was archived by the owner on May 28, 2025. It is now read-only.

Commit 90a0967

Browse files
committed
Add queue based trigger
1 parent c314139 commit 90a0967

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

src/AIDocumentPipeline/host.json

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,15 @@
1313
"extensionBundle": {
1414
"id": "Microsoft.Azure.Functions.ExtensionBundle",
1515
"version": "[4.*, 5.0.0)"
16+
},
17+
"extensions": {
18+
"queues": {
19+
"maxPollingInterval": "00:00:02",
20+
"visibilityTimeout": "00:00:30",
21+
"batchSize": 16,
22+
"maxDequeueCount": 5,
23+
"newBatchThreshold": 8,
24+
"messageEncoding": "base64"
25+
}
1626
}
17-
}
27+
}

src/AIDocumentPipeline/invoices/process_invoice_batch_workflow.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,22 @@ async def process_invoice_batch_http(req: func.HttpRequest, client: df.DurableOr
2727
return client.create_check_status_response(req, instance_id)
2828

2929

30+
@bp.function_name(queue_trigger_name)
31+
@bp.queue_trigger(arg_name="msg", queue_name="invoices", connection="INVOICES_QUEUE_CONNECTION")
32+
@bp.durable_client_input(client_name="client")
33+
async def process_invoice_batch_queue(msg: func.QueueMessage, client: df.DurableOrchestrationClient):
34+
request_body = msg.get_json()
35+
invoice_batch_request = InvoiceBatchRequest.from_dict(request_body)
36+
37+
instance_id = await client.start_new(name, client_input=invoice_batch_request)
38+
39+
logging.info(f"Started workflow with instance ID: {instance_id}")
40+
41+
response = client.create_http_management_payload(instance_id)
42+
43+
logging.info(f"Response: {response}")
44+
45+
3046
@bp.function_name(name)
3147
@bp.orchestration_trigger(context_name="context", orchestration=name)
3248
def run(context: df.DurableOrchestrationContext) -> WorkflowResult:

0 commit comments

Comments
 (0)