From b01ce7aa6454cded48f9c1c829239f75f4998194 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:36:57 -0700 Subject: [PATCH 1/6] first draft --- .../models/DurableOrchestrationContext.py | 46 +++++++++++++++---- .../durable_functions/models/ReplaySchema.py | 3 +- .../models/actions/ContinueAsNewAction.py | 4 +- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index df101003..f4e084ec 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -25,6 +25,8 @@ from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union, Callable from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone +import warnings +from datetime import timedelta from .RetryOptions import RetryOptions from .FunctionContext import FunctionContext @@ -48,7 +50,9 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, + upperSchemaVersionNew: Optional[int] = None, maximumShortTimerDuration: Optional[str] = None, + longRunningTimerIntervalDuration: Optional[str] = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying @@ -65,7 +69,25 @@ def __init__(self, self._new_uuid_counter = 0 self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 - self._replay_schema = ReplaySchema(upperSchemaVersion) + + maximum_short_timer_duration_in_days = None + long_running_timer_interval_in_days = None + if not (maximumShortTimerDuration is None) and not (longRunningTimerIntervalDuration is None): + # Format is "DAYS.HOURS:MINUTES:SECONDS". For simplicity, we will only consider the days + maximum_short_timer_duration_in_days = timedelta(days=int(maximumShortTimerDuration.split(".")[0])) + long_running_timer_interval_in_days = timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0])) + + self.max_short_timer_duration: Optional[timedelta] = maximum_short_timer_duration_in_days + self.long_running_timer_interval_duration: Optional[timedelta] = long_running_timer_interval_in_days + + if not(upperSchemaVersionNew is None): + # check if upperSchemaVersion can be parsed by ReplaySchema + if upperSchemaVersionNew > ReplaySchema.V4.value: + self._replay_schema = ReplaySchema.V4 + else: + self._replay_schema = ReplaySchema(upperSchemaVersionNew) + else: + self._replay_schema = ReplaySchema(upperSchemaVersion) self._action_payload_v1: List[List[Action]] = [] self._action_payload_v2: List[Action] = [] @@ -517,10 +539,10 @@ def _record_fire_and_forget_action(self, action: Action): The action to append """ new_action: Union[List[Action], Action] - if self._replay_schema is ReplaySchema.V2: - new_action = action - else: + if self._replay_schema is ReplaySchema.V1: new_action = [action] + else: + new_action = action self._add_to_actions(new_action) self._sequence_number += 1 @@ -586,15 +608,21 @@ def wait_for_external_event(self, name: str) -> TaskBase: task = self._generate_task(action, id_=name) return task - def continue_as_new(self, input_: Any): + def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False): """Schedule the orchestrator to continue as new. Parameters ---------- input_ : Any The new starting input to the orchestrator. + preserve_unprocessed_events: bool + Whether to preserve unprocessed external events to the new orchestrator generation """ - continue_as_new_action: Action = ContinueAsNewAction(input_) + if (preserve_unprocessed_events and self._replay_schema.value < ReplaySchema.V4.value): + raise Exception("Preserving unprocessed events is unsupported in this Durable Functions extension version. "\ + "Please ensure you're using 'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5"); + + continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events) self._record_fire_and_forget_action(continue_as_new_action) self._continue_as_new_flag = True @@ -641,7 +669,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action): + elif isinstance(action_repr, Action): self._action_payload_v2.append(action_repr) else: raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" @@ -673,7 +701,7 @@ def _add_to_open_tasks(self, task: TaskBase): self.open_tasks[task.id].append(task) if task.id in self.deferred_tasks: - task_update_action = self.deferred_tasks[task.id] + task_update_action = self.deferred_tasks.pop(task.id) task_update_action() else: for child in task.children: diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py index 1fb79b95..b171bcf9 100644 --- a/azure/durable_functions/models/ReplaySchema.py +++ b/azure/durable_functions/models/ReplaySchema.py @@ -1,8 +1,9 @@ from enum import Enum - class ReplaySchema(Enum): """Enum representing the ReplaySchemas supported by this SDK version.""" V1 = 0 V2 = 1 + V3 = 2 + V4 = 3 diff --git a/azure/durable_functions/models/actions/ContinueAsNewAction.py b/azure/durable_functions/models/actions/ContinueAsNewAction.py index 7af0508b..24e07c9c 100644 --- a/azure/durable_functions/models/actions/ContinueAsNewAction.py +++ b/azure/durable_functions/models/actions/ContinueAsNewAction.py @@ -14,8 +14,9 @@ class ContinueAsNewAction(Action): and continue as new. """ - def __init__(self, input_=None): + def __init__(self, input_=None, preserve_unprocessed_actions: bool = False): self.input_ = dumps(input_, default=_serialize_custom_object) + self.preserveUnprocessedEvents = preserve_unprocessed_actions @property def action_type(self) -> int: @@ -33,4 +34,5 @@ def to_json(self) -> Dict[str, Union[int, str]]: json_dict: Dict[str, Union[int, str]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'input_', 'input') + add_attrib(json_dict, self, 'preserveUnprocessedEvents', 'preserveUnprocessedEvents') return json_dict From 0c8450a77c7a2ea08cfa79e97864396ca8a446f4 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:50:27 -0700 Subject: [PATCH 2/6] pass linter --- .../models/DurableOrchestrationContext.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index f4e084ec..35582e96 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -25,7 +25,6 @@ from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union, Callable from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone -import warnings from datetime import timedelta from .RetryOptions import RetryOptions @@ -51,7 +50,8 @@ class DurableOrchestrationContext: def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, - upperSchemaVersionNew: Optional[int] = None, maximumShortTimerDuration: Optional[str] = None, + upperSchemaVersionNew: Optional[int] = None, + maximumShortTimerDuration: Optional[str] = None, longRunningTimerIntervalDuration: Optional[str] = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId @@ -69,16 +69,22 @@ def __init__(self, self._new_uuid_counter = 0 self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 - - maximum_short_timer_duration_in_days = None - long_running_timer_interval_in_days = None - if not (maximumShortTimerDuration is None) and not (longRunningTimerIntervalDuration is None): - # Format is "DAYS.HOURS:MINUTES:SECONDS". For simplicity, we will only consider the days - maximum_short_timer_duration_in_days = timedelta(days=int(maximumShortTimerDuration.split(".")[0])) - long_running_timer_interval_in_days = timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0])) - self.max_short_timer_duration: Optional[timedelta] = maximum_short_timer_duration_in_days - self.long_running_timer_interval_duration: Optional[timedelta] = long_running_timer_interval_in_days + # maximum_short_timer_duration_in_days = None + # long_running_timer_interval_in_days = None + # if not (maximumShortTimerDuration is None) + # and not (longRunningTimerIntervalDuration is None): + # # Format is "DAYS.HOURS:MINUTES:SECONDS". + # For simplicity, we will only consider the days + # maximum_short_timer_duration_in_days = + # timedelta(days=int(maximumShortTimerDuration.split(".")[0])) + # long_running_timer_interval_in_days = + # timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0])) + + # self.max_short_timer_duration: Optional[timedelta] = + # maximum_short_timer_duration_in_days + # self.long_running_timer_interval_duration: Optional[timedelta] = + # long_running_timer_interval_in_days if not(upperSchemaVersionNew is None): # check if upperSchemaVersion can be parsed by ReplaySchema @@ -619,8 +625,9 @@ def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False Whether to preserve unprocessed external events to the new orchestrator generation """ if (preserve_unprocessed_events and self._replay_schema.value < ReplaySchema.V4.value): - raise Exception("Preserving unprocessed events is unsupported in this Durable Functions extension version. "\ - "Please ensure you're using 'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5"); + error = "Preserving unprocessed events is unsupported in this Durable Functions extension version. "\ + "Please ensure you're using 'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5" + raise Exception(error); continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events) self._record_fire_and_forget_action(continue_as_new_action) From 994f3a45de3b71ac0bfdd356c55fd3b1ac538d04 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:52:50 -0700 Subject: [PATCH 3/6] pass linter --- .../models/DurableOrchestrationContext.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 35582e96..e9c23d2a 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -74,16 +74,16 @@ def __init__(self, # long_running_timer_interval_in_days = None # if not (maximumShortTimerDuration is None) # and not (longRunningTimerIntervalDuration is None): - # # Format is "DAYS.HOURS:MINUTES:SECONDS". + # # Format is "DAYS.HOURS:MINUTES:SECONDS". # For simplicity, we will only consider the days - # maximum_short_timer_duration_in_days = + # maximum_short_timer_duration_in_days = # timedelta(days=int(maximumShortTimerDuration.split(".")[0])) - # long_running_timer_interval_in_days = + # long_running_timer_interval_in_days = # timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0])) - # self.max_short_timer_duration: Optional[timedelta] = + # self.max_short_timer_duration: Optional[timedelta] = # maximum_short_timer_duration_in_days - # self.long_running_timer_interval_duration: Optional[timedelta] = + # self.long_running_timer_interval_duration: Optional[timedelta] = # long_running_timer_interval_in_days if not(upperSchemaVersionNew is None): From 7f6f16cb839d54a56f5714bb62b3f398210a503a Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:56:45 -0700 Subject: [PATCH 4/6] pass linter --- .../models/DurableOrchestrationContext.py | 8 ++++---- azure/durable_functions/models/ReplaySchema.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index e9c23d2a..bdeb94bc 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -25,7 +25,6 @@ from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union, Callable from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone -from datetime import timedelta from .RetryOptions import RetryOptions from .FunctionContext import FunctionContext @@ -625,9 +624,10 @@ def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False Whether to preserve unprocessed external events to the new orchestrator generation """ if (preserve_unprocessed_events and self._replay_schema.value < ReplaySchema.V4.value): - error = "Preserving unprocessed events is unsupported in this Durable Functions extension version. "\ - "Please ensure you're using 'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5" - raise Exception(error); + error = "Preserving unprocessed events is unsupported in this Durable Functions "\ + "extension version. Please ensure you're using "\ + "'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5" + raise Exception(error) continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events) self._record_fire_and_forget_action(continue_as_new_action) diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py index b171bcf9..33f2a7f8 100644 --- a/azure/durable_functions/models/ReplaySchema.py +++ b/azure/durable_functions/models/ReplaySchema.py @@ -1,5 +1,6 @@ from enum import Enum + class ReplaySchema(Enum): """Enum representing the ReplaySchemas supported by this SDK version.""" From ad1af507283f293ede7aab5381f042fff5f59f86 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:58:33 -0700 Subject: [PATCH 5/6] pass linter --- azure/durable_functions/models/DurableOrchestrationContext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index bdeb94bc..07cf9f27 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -628,7 +628,7 @@ def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False "extension version. Please ensure you're using "\ "'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5" raise Exception(error) - + continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events) self._record_fire_and_forget_action(continue_as_new_action) self._continue_as_new_flag = True From b9d868c23c8bc7a18acb748a4d393d123959ab51 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 20:07:04 -0700 Subject: [PATCH 6/6] edit schema --- tests/orchestrator/schemas/OrchetrationStateSchema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/orchestrator/schemas/OrchetrationStateSchema.py b/tests/orchestrator/schemas/OrchetrationStateSchema.py index eb33e44b..f18f1a08 100644 --- a/tests/orchestrator/schemas/OrchetrationStateSchema.py +++ b/tests/orchestrator/schemas/OrchetrationStateSchema.py @@ -22,6 +22,7 @@ "functionName": {"type": "string"}, "actionType": {"type": "number"}, "input": {}, + "preserveUnprocessedEvents": {"type": "boolean"}, "retryOptions": { "type": "object", "properties": {