Skip to content

[WIP] Add continue as new support for preserving events #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -48,7 +48,10 @@ class DurableOrchestrationContext:
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0,
upperSchemaVersionNew: Optional[int] = None,
maximumShortTimerDuration: Optional[str] = None,
longRunningTimerIntervalDuration: Optional[str] = None, **kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
@@ -65,7 +68,31 @@ def __init__(self,
self._new_uuid_counter = 0
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._sequence_number = 0
self._replay_schema = ReplaySchema(upperSchemaVersion)

# maximum_short_timer_duration_in_days = None
# long_running_timer_interval_in_days = None
# if not (maximumShortTimerDuration is None)
# and not (longRunningTimerIntervalDuration is None):
# # Format is "DAYS.HOURS:MINUTES:SECONDS".
# For simplicity, we will only consider the days
# maximum_short_timer_duration_in_days =
# timedelta(days=int(maximumShortTimerDuration.split(".")[0]))
# long_running_timer_interval_in_days =
# timedelta(days=int(longRunningTimerIntervalDuration.split(".")[0]))

# self.max_short_timer_duration: Optional[timedelta] =
# maximum_short_timer_duration_in_days
# self.long_running_timer_interval_duration: Optional[timedelta] =
# long_running_timer_interval_in_days

if not(upperSchemaVersionNew is None):
# check if upperSchemaVersion can be parsed by ReplaySchema
if upperSchemaVersionNew > ReplaySchema.V4.value:
self._replay_schema = ReplaySchema.V4
else:
self._replay_schema = ReplaySchema(upperSchemaVersionNew)
else:
self._replay_schema = ReplaySchema(upperSchemaVersion)

self._action_payload_v1: List[List[Action]] = []
self._action_payload_v2: List[Action] = []
@@ -517,10 +544,10 @@ def _record_fire_and_forget_action(self, action: Action):
The action to append
"""
new_action: Union[List[Action], Action]
if self._replay_schema is ReplaySchema.V2:
new_action = action
else:
if self._replay_schema is ReplaySchema.V1:
new_action = [action]
else:
new_action = action
self._add_to_actions(new_action)
self._sequence_number += 1

@@ -586,15 +613,23 @@ def wait_for_external_event(self, name: str) -> TaskBase:
task = self._generate_task(action, id_=name)
return task

def continue_as_new(self, input_: Any):
def continue_as_new(self, input_: Any, preserve_unprocessed_events: bool = False):
"""Schedule the orchestrator to continue as new.

Parameters
----------
input_ : Any
The new starting input to the orchestrator.
preserve_unprocessed_events: bool
Whether to preserve unprocessed external events to the new orchestrator generation
"""
continue_as_new_action: Action = ContinueAsNewAction(input_)
if (preserve_unprocessed_events and self._replay_schema.value < ReplaySchema.V4.value):
error = "Preserving unprocessed events is unsupported in this Durable Functions "\
"extension version. Please ensure you're using "\
"'Microsoft.Azure.WebJobs.Extensions.DurableTask' >= 2.13.5"
raise Exception(error)

continue_as_new_action: Action = ContinueAsNewAction(input_, preserve_unprocessed_events)
self._record_fire_and_forget_action(continue_as_new_action)
self._continue_as_new_flag = True

@@ -641,7 +676,7 @@ def _add_to_actions(self, action_repr: Union[List[Action], Action]):

if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
self._action_payload_v1.append(action_repr)
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
elif isinstance(action_repr, Action):
self._action_payload_v2.append(action_repr)
else:
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"
@@ -673,7 +708,7 @@ def _add_to_open_tasks(self, task: TaskBase):
self.open_tasks[task.id].append(task)

if task.id in self.deferred_tasks:
task_update_action = self.deferred_tasks[task.id]
task_update_action = self.deferred_tasks.pop(task.id)
task_update_action()
else:
for child in task.children:
2 changes: 2 additions & 0 deletions azure/durable_functions/models/ReplaySchema.py
Original file line number Diff line number Diff line change
@@ -6,3 +6,5 @@ class ReplaySchema(Enum):

V1 = 0
V2 = 1
V3 = 2
V4 = 3
Original file line number Diff line number Diff line change
@@ -14,8 +14,9 @@ class ContinueAsNewAction(Action):
and continue as new.
"""

def __init__(self, input_=None):
def __init__(self, input_=None, preserve_unprocessed_actions: bool = False):
self.input_ = dumps(input_, default=_serialize_custom_object)
self.preserveUnprocessedEvents = preserve_unprocessed_actions

@property
def action_type(self) -> int:
@@ -33,4 +34,5 @@ def to_json(self) -> Dict[str, Union[int, str]]:
json_dict: Dict[str, Union[int, str]] = {}
add_attrib(json_dict, self, 'action_type', 'actionType')
add_attrib(json_dict, self, 'input_', 'input')
add_attrib(json_dict, self, 'preserveUnprocessedEvents', 'preserveUnprocessedEvents')
return json_dict
1 change: 1 addition & 0 deletions tests/orchestrator/schemas/OrchetrationStateSchema.py
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
"functionName": {"type": "string"},
"actionType": {"type": "number"},
"input": {},
"preserveUnprocessedEvents": {"type": "boolean"},
"retryOptions": {
"type": "object",
"properties": {