diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index df101003..07cf9f27 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -48,7 +48,10 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, + upperSchemaVersionNew: Optional[int] = None, + maximumShortTimerDuration: Optional[str] = None, + longRunningTimerIntervalDuration: Optional[str] = None, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying @@ -65,7 +68,31 @@ def __init__(self, self._new_uuid_counter = 0 self._function_context: FunctionContext = FunctionContext(**kwargs) self._sequence_number = 0 - self._replay_schema = ReplaySchema(upperSchemaVersion) + + # maximum_short_timer_duration_in_days = None + # long_running_timer_interval_in_days = None + # if not (maximumShortTimerDuration is None) + # and not (longRunningTimerIntervalDuration is None): + # # Format is "DAYS.HOURS:MINUTES:SECONDS". + # For simplicity, we will only consider the days + # maximum_short_timer_duration_in_days = + # timedelta(days=int(maximumShortTimerDuration.split(".")[0])) + # long_running_timer_interval_in_days = + # timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0])) + + # self.max_short_timer_duration: Optional[timedelta] = + # maximum_short_timer_duration_in_days + # self.long_running_timer_interval_duration: Optional[timedelta] = + # long_running_timer_interval_in_days + + if not(upperSchemaVersionNew is None): + # check if upperSchemaVersion can be parsed by ReplaySchema + if upperSchemaVersionNew > ReplaySchema.V4.value: + self._replay_schema = ReplaySchema.V4 + else: + self._replay_schema = ReplaySchema(upperSchemaVersionNew) + else: + self._replay_schema = ReplaySchema(upperSchemaVersion) self._action_payload_v1: List[List[Action]] = [] self._action_payload_v2: List[Action] = [] @@ -517,10 +544,10 @@ def _record_fire_and_forget_action(self, action: Action): The action to append """ new_action: Union[List[Action], Action] - if self._replay_schema is ReplaySchema.V2: - new_action = action - else: + if self._replay_schema is ReplaySchema.V1: new_action = [action] + else: + new_action = action self._add_to_actions(new_action) self._sequence_number += 1 @@ -586,15 +613,23 @@ def wait_for_external_event(self, name: str) -> TaskBase: task = self._generate_task(action, id_=name) return task - def continue_as_new(self, input_: Any): + def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False): """Schedule the orchestrator to continue as new. Parameters ---------- input_ : Any The new starting input to the orchestrator. + preserve_unprocessed_events: bool + Whether to preserve unprocessed external events to the new orchestrator generation """ - continue_as_new_action: Action = ContinueAsNewAction(input_) + if (preserve_unprocessed_events and self._replay_schema.value < ReplaySchema.V4.value): + error = "Preserving unprocessed events is unsupported in this Durable Functions "\ + "extension version. Please ensure you're using "\ + "'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5" + raise Exception(error) + + continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events) self._record_fire_and_forget_action(continue_as_new_action) self._continue_as_new_flag = True @@ -641,7 +676,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]): if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): self._action_payload_v1.append(action_repr) - elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action): + elif isinstance(action_repr, Action): self._action_payload_v2.append(action_repr) else: raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" @@ -673,7 +708,7 @@ def _add_to_open_tasks(self, task: TaskBase): self.open_tasks[task.id].append(task) if task.id in self.deferred_tasks: - task_update_action = self.deferred_tasks[task.id] + task_update_action = self.deferred_tasks.pop(task.id) task_update_action() else: for child in task.children: diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py index 1fb79b95..33f2a7f8 100644 --- a/azure/durable_functions/models/ReplaySchema.py +++ b/azure/durable_functions/models/ReplaySchema.py @@ -6,3 +6,5 @@ class ReplaySchema(Enum): V1 = 0 V2 = 1 + V3 = 2 + V4 = 3 diff --git a/azure/durable_functions/models/actions/ContinueAsNewAction.py b/azure/durable_functions/models/actions/ContinueAsNewAction.py index 7af0508b..24e07c9c 100644 --- a/azure/durable_functions/models/actions/ContinueAsNewAction.py +++ b/azure/durable_functions/models/actions/ContinueAsNewAction.py @@ -14,8 +14,9 @@ class ContinueAsNewAction(Action): and continue as new. """ - def __init__(self, input_=None): + def __init__(self, input_=None, preserve_unprocessed_actions: bool = False): self.input_ = dumps(input_, default=_serialize_custom_object) + self.preserveUnprocessedEvents = preserve_unprocessed_actions @property def action_type(self) -> int: @@ -33,4 +34,5 @@ def to_json(self) -> Dict[str, Union[int, str]]: json_dict: Dict[str, Union[int, str]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'input_', 'input') + add_attrib(json_dict, self, 'preserveUnprocessedEvents', 'preserveUnprocessedEvents') return json_dict diff --git a/tests/orchestrator/schemas/OrchetrationStateSchema.py b/tests/orchestrator/schemas/OrchetrationStateSchema.py index eb33e44b..f18f1a08 100644 --- a/tests/orchestrator/schemas/OrchetrationStateSchema.py +++ b/tests/orchestrator/schemas/OrchetrationStateSchema.py @@ -22,6 +22,7 @@ "functionName": {"type": "string"}, "actionType": {"type": "number"}, "input": {}, + "preserveUnprocessedEvents": {"type": "boolean"}, "retryOptions": { "type": "object", "properties": {