diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index c95e2ba3..7f5c47cc 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,7 +1,7 @@ from collections import defaultdict from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction -from azure.durable_functions.models.Task import TaskBase, TimerTask +from azure.durable_functions.models.Task import TaskBase from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \ @@ -46,7 +46,7 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, tasks = [], **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying @@ -78,6 +78,11 @@ def __init__(self, self.open_tasks = defaultdict(list) self.deferred_tasks: Dict[Union[int, str], Tuple[HistoryEvent, bool, str]] = {} + # we do a list comprehension to unpack the data in a proper Python tuple. + # this is a hack, and shouldn't be necessary once I figure out C#'s serialization options :) + tasks = [(data["Item1"], (data["Item2"]["Item1"], data["Item2"]["Item2"])) for data in tasks] + self.tasks = dict(tasks) + @classmethod def from_json(cls, json_string: str): """Convert the value passed into a new instance of the class. diff --git a/azure/durable_functions/models/TaskOrchestrationExecutor.py b/azure/durable_functions/models/TaskOrchestrationExecutor.py index 11ea74bb..5efa0ded 100644 --- a/azure/durable_functions/models/TaskOrchestrationExecutor.py +++ b/azure/durable_functions/models/TaskOrchestrationExecutor.py @@ -73,10 +73,7 @@ def execute(self, context: DurableOrchestrationContext, # them with values when the history provides them if isinstance(evaluated_user_code, GeneratorType): self.generator = evaluated_user_code - for event in history: - self.process_event(event) - if self.has_execution_completed: - break + self.__execute() # Due to backwards compatibility reasons, it's possible # for the `continue_as_new` API to be called without `yield` statements. @@ -87,6 +84,59 @@ def execute(self, context: DurableOrchestrationContext, self.output = evaluated_user_code return self.get_orchestrator_state_str() + def lookup_result(self, task: TaskBase): + # This looks up a task's result in the extension's payload + found_task = False + if isinstance(task, AtomicTask): + # First, assign this task an ID + if task.id is None: + task.id = self.context._sequence_number + self.context._sequence_number += 1 + + key = task.id + # Now, see if TaskID can be found in the extension's payload + if key in self.context.tasks.keys(): + (payload, is_error) = self.context.tasks[key] + if is_error: + payload = Exception(payload) + task.set_value(is_error, payload) + found_task = True + # We've set the result, we're ready to continue! + else: + for child in task.children: + # this is a simplified algorithm, I think there are a few + # more cases here to manage. This does the trick for now though :) + found_task = self.lookup_result(child) + return found_task + + def __execute(self): + # Replay user-code until a task is yielded that does not have + # a result according to the extension's payload. + + task_result = None + should_throw = False + have_task_result = True + while have_task_result: + execute_code = self.generator.throw if should_throw else self.generator.send + try: + # Run user-code, feed in prev Task's result + task = execute_code(task_result) + except StopIteration as stop_exception: + self.orchestrator_returned = True + self.output = stop_exception.value + return + except Exception as exception: + self.exception = exception + return + # record actions, to send back to C# + self.context._add_to_actions(task.action_repr) + # lookup task result + have_task_result = self.lookup_result(task) + + if have_task_result: + task_result = task.result + should_throw = task.state is TaskState.FAILED + def process_event(self, event: HistoryEvent): """Evaluate a history event.