-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathDurableOrchestrationClient.py
721 lines (623 loc) · 28.6 KB
/
DurableOrchestrationClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
import json
from datetime import datetime
from typing import List, Any, Optional, Dict, Union
from time import time
from asyncio import sleep
from urllib.parse import urlparse, quote
import azure.functions as func
from .PurgeHistoryResult import PurgeHistoryResult
from .DurableOrchestrationStatus import DurableOrchestrationStatus
from .EntityStateResponse import EntityStateResponse
from .RpcManagementOptions import RpcManagementOptions
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings
from .utils.http_utils import get_async_request, post_async_request, delete_async_request
from .utils.entity_utils import EntityId
from azure.functions._durable_functions import _serialize_custom_object
class DurableOrchestrationClient:
"""Durable Orchestration Client.
Client for starting, querying, terminating and raising events to
orchestration instances.
"""
def __init__(self, context: str):
self.task_hub_name: str
self._uniqueWebHookOrigins: List[str]
self._event_name_placeholder: str = "{eventName}"
self._function_name_placeholder: str = "{functionName}"
self._instance_id_placeholder: str = "[/{instanceId}]"
self._reason_placeholder: str = "{text}"
self._created_time_from_query_key: str = "createdTimeFrom"
self._created_time_to_query_key: str = "createdTimeTo"
self._runtime_status_query_key: str = "runtimeStatus"
self._show_history_query_key: str = "showHistory"
self._show_history_output_query_key: str = "showHistoryOutput"
self._show_input_query_key: str = "showInput"
self._orchestration_bindings: DurableOrchestrationBindings = \
DurableOrchestrationBindings.from_json(context)
self._post_async_request = post_async_request
self._get_async_request = get_async_request
self._delete_async_request = delete_async_request
async def start_new(self,
orchestration_function_name: str,
instance_id: Optional[str] = None,
client_input: Optional[Any] = None) -> str:
"""Start a new instance of the specified orchestrator function.
If an orchestration instance with the specified ID already exists, the
existing instance will be silently replaced by this new instance.
Parameters
----------
orchestration_function_name : str
The name of the orchestrator function to start.
instance_id : Optional[str]
The ID to use for the new orchestration instance. If no instance id is specified,
the Durable Functions extension will generate a random GUID (recommended).
client_input : Optional[Any]
JSON-serializable input value for the orchestrator function.
Returns
-------
str
The ID of the new orchestration instance if successful, None if not.
"""
request_url = self._get_start_new_url(
instance_id=instance_id, orchestration_function_name=orchestration_function_name)
response: List[Any] = await self._post_async_request(
request_url, self._get_json_input(client_input))
status_code: int = response[0]
if status_code <= 202 and response[1]:
return response[1]["id"]
elif status_code == 400:
# Orchestrator not found, report clean exception
exception_data: Dict[str, str] = response[1]
exception_message = exception_data["ExceptionMessage"]
raise Exception(exception_message)
else:
# Catch all: simply surfacing the durable-extension exception
# we surface the stack trace too, since this may be a more involed exception
ex_message: Any = response[1]
raise Exception(ex_message)
def create_check_status_response(
self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse:
"""Create a HttpResponse that contains useful information for \
checking the status of the specified instance.
Parameters
----------
request : HttpRequest
The HTTP request that triggered the current orchestration instance.
instance_id : str
The ID of the orchestration instance to check.
Returns
-------
HttpResponse
An HTTP 202 response with a Location header
and a payload containing instance management URLs
"""
http_management_payload = self.get_client_response_links(request, instance_id)
response_args = {
"status_code": 202,
"body": json.dumps(http_management_payload),
"headers": {
"Content-Type": "application/json",
"Location": http_management_payload["statusQueryGetUri"],
"Retry-After": "10",
},
}
return func.HttpResponse(**response_args)
def create_http_management_payload(self, instance_id: str) -> Dict[str, str]:
"""Create a dictionary of orchestrator management urls.
Parameters
----------
instance_id : str
The ID of the orchestration instance to check.
Returns
-------
Dict[str, str]
a dictionary object of orchestrator instance management urls
"""
return self.get_client_response_links(None, instance_id)
async def read_entity_state(
self,
entityId: EntityId,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None,
) -> EntityStateResponse:
"""Read the state of the entity.
Parameters
----------
entityId : EntityId
The EntityId of the targeted entity.
task_hub_name : Optional[str]
The task hub name of the target entity.
connection_name : Optional[str]
The name of the connection string associated with [task_hub_name].
Raises
------
Exception:
When an unexpected status code is returned
Returns
-------
EntityStateResponse
container object representing the state of the entity
"""
options = RpcManagementOptions(
connection_name=connection_name,
task_hub_name=task_hub_name,
entity_Id=entityId,
)
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._get_async_request(request_url)
switch_statement = {
200: lambda: EntityStateResponse(True, response[1]),
404: lambda: EntityStateResponse(False),
}
result = switch_statement.get(response[0])
if not result:
raise Exception(
f"The operation failed with an unexpected status code {response[0]}"
)
return result()
def get_client_response_links(
self,
request: Optional[func.HttpRequest], instance_id: str) -> Dict[str, str]:
"""Create a dictionary of orchestrator management urls.
Parameters
----------
request : Optional[HttpRequest]
The HTTP request that triggered the current orchestration instance.
instance_id : str
The ID of the orchestration instance to check.
Returns
-------
Dict[str, str]
a dictionary object of orchestrator instance management urls
"""
payload = self._orchestration_bindings.management_urls.copy()
for key, _ in payload.items():
if not (request is None) and request.url:
payload[key] = self._replace_url_origin(request.url, payload[key])
payload[key] = payload[key].replace(
self._orchestration_bindings.management_urls["id"], instance_id)
return payload
async def raise_event(
self, instance_id: str, event_name: str, event_data: Any = None,
task_hub_name: str = None, connection_name: str = None) -> None:
"""Send an event notification message to a waiting orchestration instance.
In order to handle the event, the target orchestration instance must be
waiting for an event named `eventName` using waitForExternalEvent API.
Parameters
----------
instance_id : str
The ID of the orchestration instance that will handle the event.
event_name : str
The name of the event.
event_data : Any, optional
The JSON-serializable data associated with the event.
task_hub_name : str, optional
The TaskHubName of the orchestration that will handle the event.
connection_name : str, optional
The name of the connection string associated with `taskHubName.`
Raises
------
ValueError
event name must be a valid string.
Exception
Raises an exception if the status code is 404 or 400 when raising the event.
"""
if event_name == "":
raise ValueError("event_name must be a non-empty string.")
request_url = self._get_raise_event_url(
instance_id, event_name, task_hub_name, connection_name)
response = await self._post_async_request(request_url, json.dumps(event_data))
switch_statement = {
202: lambda: None,
410: lambda: f"Instance with ID {instance_id} is gone: either completed or failed",
404: lambda: f"No instance with ID {instance_id} found.",
400: lambda: "Only application/json request content is supported"
}
has_error_message = switch_statement.get(
response[0], lambda: f"Webhook returned unrecognized status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
async def get_status(self, instance_id: str, show_history: bool = False,
show_history_output: bool = False,
show_input: bool = False) -> DurableOrchestrationStatus:
"""Get the status of the specified orchestration instance.
Parameters
----------
instance_id : str
The ID of the orchestration instance to query.
show_history: bool
Boolean marker for including execution history in the response.
show_history_output: bool
Boolean marker for including output in the execution history response.
show_input: bool
Boolean marker for including the input in the response.
Returns
-------
DurableOrchestrationStatus
The status of the requested orchestration instance
"""
options = RpcManagementOptions(instance_id=instance_id, show_history=show_history,
show_history_output=show_history_output,
show_input=show_input)
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._get_async_request(request_url)
switch_statement = {
200: lambda: None, # instance completed
202: lambda: None, # instance in progress
400: lambda: None, # instance failed or terminated
404: lambda: None, # instance not found or pending
500: lambda: None # instance failed with unhandled exception
}
has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
else:
return DurableOrchestrationStatus.from_json(response[1])
async def get_status_all(self) -> List[DurableOrchestrationStatus]:
"""Get the status of all orchestration instances.
Returns
-------
DurableOrchestrationStatus
The status of the requested orchestration instances
"""
options = RpcManagementOptions()
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
all_statuses: List[DurableOrchestrationStatus] = []
continuation_token = None
while True:
headers = {}
if continuation_token:
headers['x-ms-continuation-token'] = continuation_token
response = await self._get_async_request(request_url, headers=headers)
switch_statement = {
200: lambda: None, # instance completed
}
has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
else:
statuses: List[Any] = response[1]
all_statuses.extend([DurableOrchestrationStatus.from_json(o) for o in statuses])
continuation_token = response.headers.get('x-ms-continuation-token')
if not continuation_token:
break
return all_statuses
async def get_status_by(self, created_time_from: datetime = None,
created_time_to: datetime = None,
runtime_status: List[OrchestrationRuntimeStatus] = None) \
-> List[DurableOrchestrationStatus]:
"""Get the status of all orchestration instances that match the specified conditions.
Parameters
----------
created_time_from : datetime
Return orchestration instances which were created after this Date.
created_time_to: datetime
Return orchestration instances which were created before this Date.
runtime_status: List[OrchestrationRuntimeStatus]
Return orchestration instances which match any of the runtimeStatus values
in this list.
Returns
-------
DurableOrchestrationStatus
The status of the requested orchestration instances
"""
# TODO: do we really want folks to us this without specifying all the args?
options = RpcManagementOptions(created_time_from=created_time_from,
created_time_to=created_time_to,
runtime_status=runtime_status)
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._get_async_request(request_url)
switch_statement = {
200: lambda: None, # instance completed
}
has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
else:
return [DurableOrchestrationStatus.from_json(o) for o in response[1]]
async def purge_instance_history(self, instance_id: str) -> PurgeHistoryResult:
"""Delete the history of the specified orchestration instance.
Parameters
----------
instance_id : str
The ID of the orchestration instance to delete.
Returns
-------
PurgeHistoryResult
The results of the request to delete the orchestration instance
"""
request_url = f"{self._orchestration_bindings.rpc_base_url}instances/{instance_id}"
response = await self._delete_async_request(request_url)
return self._parse_purge_instance_history_response(response)
async def purge_instance_history_by(
self, created_time_from: Optional[datetime] = None,
created_time_to: Optional[datetime] = None,
runtime_status: Optional[List[OrchestrationRuntimeStatus]] = None) \
-> PurgeHistoryResult:
"""Delete the history of all orchestration instances that match the specified conditions.
Parameters
----------
created_time_from : Optional[datetime]
Delete orchestration history which were created after this Date.
created_time_to: Optional[datetime]
Delete orchestration history which were created before this Date.
runtime_status: Optional[List[OrchestrationRuntimeStatus]]
Delete orchestration instances which match any of the runtimeStatus values
in this list.
Returns
-------
PurgeHistoryResult
The results of the request to purge history
"""
options = RpcManagementOptions(created_time_from=created_time_from,
created_time_to=created_time_to,
runtime_status=runtime_status)
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._delete_async_request(request_url)
return self._parse_purge_instance_history_response(response)
async def terminate(self, instance_id: str, reason: str) -> None:
"""Terminate the specified orchestration instance.
Parameters
----------
instance_id : str
The ID of the orchestration instance to query.
reason: str
The reason for terminating the instance.
Raises
------
Exception:
When the terminate call failed with an unexpected status code
Returns
-------
None
"""
request_url = f"{self._orchestration_bindings.rpc_base_url}instances/{instance_id}/" \
f"terminate?reason={quote(reason)}"
response = await self._post_async_request(request_url, None)
switch_statement = {
202: lambda: None, # instance in progress
410: lambda: None, # instance failed or terminated
404: lambda: f"No instance with ID '{instance_id}' found.",
}
has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
async def wait_for_completion_or_create_check_status_response(
self, request, instance_id: str, timeout_in_milliseconds: int = 10000,
retry_interval_in_milliseconds: int = 1000) -> func.HttpResponse:
"""Create an HTTP response.
The response either contains a payload of management URLs for a non-completed instance or
contains the payload containing the output of the completed orchestration.
If the orchestration does not complete within the specified timeout, then the HTTP response
will be identical to that of [[createCheckStatusResponse]].
Parameters
----------
request
The HTTP request that triggered the current function.
instance_id:
The unique ID of the instance to check.
timeout_in_milliseconds:
Total allowed timeout for output from the durable function.
The default value is 10 seconds.
retry_interval_in_milliseconds:
The timeout between checks for output from the durable function.
The default value is 1 second.
"""
if retry_interval_in_milliseconds > timeout_in_milliseconds:
raise Exception(f'Total timeout {timeout_in_milliseconds} (ms) should be bigger than '
f'retry timeout {retry_interval_in_milliseconds} (ms)')
checking = True
start_time = time()
while checking:
status = await self.get_status(instance_id)
if status:
switch_statement = {
OrchestrationRuntimeStatus.Completed:
lambda: self._create_http_response(200, status.output),
OrchestrationRuntimeStatus.Canceled:
lambda: self._create_http_response(200, status.to_json()),
OrchestrationRuntimeStatus.Terminated:
lambda: self._create_http_response(200, status.to_json()),
OrchestrationRuntimeStatus.Failed:
lambda: self._create_http_response(500, status.to_json()),
None:
None
}
result = switch_statement.get(status.runtime_status)
if result:
return result()
elapsed = time() - start_time
elapsed_in_milliseconds = elapsed * 1000
if elapsed_in_milliseconds < timeout_in_milliseconds:
remaining_time = timeout_in_milliseconds - elapsed_in_milliseconds
sleep_time = retry_interval_in_milliseconds \
if remaining_time > retry_interval_in_milliseconds else remaining_time
sleep_time /= 1000
await sleep(sleep_time)
else:
return self.create_check_status_response(request, instance_id)
return self.create_check_status_response(request, instance_id)
async def signal_entity(self, entityId: EntityId, operation_name: str,
operation_input: Optional[Any] = None,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None) -> None:
"""Signals an entity to perform an operation.
Parameters
----------
entityId : EntityId
The EntityId of the targeted entity to perform operation.
operation_name: str
The name of the operation.
operation_input: Optional[Any]
The content for the operation.
task_hub_name: Optional[str]
The task hub name of the target entity.
connection_name: Optional[str]
The name of the connection string associated with [task_hub_name].
Raises
------
Exception:
When the signal entity call failed with an unexpected status code
Returns
-------
None
"""
options = RpcManagementOptions(operation_name=operation_name,
connection_name=connection_name,
task_hub_name=task_hub_name,
entity_Id=entityId)
request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._post_async_request(
request_url,
json.dumps(operation_input) if operation_input else None)
switch_statement = {
202: lambda: None # signal accepted
}
has_error_message = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
error_message = has_error_message()
if error_message:
raise Exception(error_message)
@staticmethod
def _create_http_response(
status_code: int, body: Union[str, Any]) -> func.HttpResponse:
body_as_json = body if isinstance(body, str) else json.dumps(body)
response_args = {
"status_code": status_code,
"body": body_as_json,
"mimetype": "application/json",
"headers": {
"Content-Type": "application/json",
}
}
return func.HttpResponse(**response_args)
@staticmethod
def _get_json_input(client_input: object) -> Optional[str]:
"""Serialize the orchestrator input.
Parameters
----------
client_input: object
The client's input, which we need to serialize
Returns
-------
Optional[str]
If `client_input` is not None, return a string representing
the JSON-serialization of `client_input`. Otherwise, returns
None
Exceptions
----------
TypeError
If the JSON serialization failed, see `serialize_custom_object`
"""
if client_input is not None:
return json.dumps(client_input, default=_serialize_custom_object)
return None
@staticmethod
def _replace_url_origin(request_url: str, value_url: str) -> str:
request_parsed_url = urlparse(request_url)
value_parsed_url = urlparse(value_url)
request_url_origin = '{url.scheme}://{url.netloc}/'.format(url=request_parsed_url)
value_url_origin = '{url.scheme}://{url.netloc}/'.format(url=value_parsed_url)
value_url = value_url.replace(value_url_origin, request_url_origin)
return value_url
@staticmethod
def _parse_purge_instance_history_response(
response: List[Any]) -> PurgeHistoryResult:
switch_statement = {
200: lambda: PurgeHistoryResult.from_json(response[1]), # instance completed
404: lambda: PurgeHistoryResult(instancesDeleted=0), # instance not found
}
switch_result = switch_statement.get(
response[0],
lambda: f"The operation failed with an unexpected status code {response[0]}")
result = switch_result()
if isinstance(result, PurgeHistoryResult):
return result
else:
raise Exception(result)
def _get_start_new_url(
self, instance_id: Optional[str], orchestration_function_name: str) -> str:
instance_path = f'/{instance_id}' if instance_id is not None else ''
request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \
f'{orchestration_function_name}{instance_path}'
return request_url
def _get_raise_event_url(
self, instance_id: str, event_name: str,
task_hub_name: Optional[str], connection_name: Optional[str]) -> str:
request_url = f'{self._orchestration_bindings.rpc_base_url}' \
f'instances/{instance_id}/raiseEvent/{event_name}'
query: List[str] = []
if task_hub_name:
query.append(f'taskHub={task_hub_name}')
if connection_name:
query.append(f'connection={connection_name}')
if len(query) > 0:
request_url += "?" + "&".join(query)
return request_url
async def rewind(self,
instance_id: str,
reason: str,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None):
"""Return / "rewind" a failed orchestration instance to a prior "healthy" state.
Parameters
----------
instance_id: str
The ID of the orchestration instance to rewind.
reason: str
The reason for rewinding the orchestration instance.
task_hub_name: Optional[str]
The TaskHub of the orchestration to rewind
connection_name: Optional[str]
Name of the application setting containing the storage
connection string to use.
Raises
------
Exception:
In case of a failure, it reports the reason for the exception
"""
request_url: str = ""
if self._orchestration_bindings.rpc_base_url:
path = f"instances/{instance_id}/rewind?reason={reason}"
query: List[str] = []
if not (task_hub_name is None):
query.append(f"taskHub={task_hub_name}")
if not (connection_name is None):
query.append(f"connection={connection_name}")
if len(query) > 0:
path += "&" + "&".join(query)
request_url = f"{self._orchestration_bindings.rpc_base_url}" + path
else:
raise Exception("The Python SDK only supports RPC endpoints."
+ "Please remove the `localRpcEnabled` setting from host.json")
response = await self._post_async_request(request_url, None)
status: int = response[0]
ex_msg: str = ""
if status == 200 or status == 202:
return
elif status == 404:
ex_msg = f"No instance with ID {instance_id} found."
raise Exception(ex_msg)
elif status == 410:
ex_msg = "The rewind operation is only supported on failed orchestration instances."
raise Exception(ex_msg)
elif isinstance(response[1], str):
ex_msg = response[1]
raise Exception(ex_msg)
else:
ex_msg = "Received unexpected payload from the durable-extension: " + str(response)
raise Exception(ex_msg)