From cd5644af202563bd0765d359c4fed7de999b2e32 Mon Sep 17 00:00:00 2001 From: dewmal Date: Sun, 25 Aug 2024 23:36:30 +0530 Subject: [PATCH] modify task operator to run parallel tasks --- bindings/ceylon/ceylon/task/task_operation.py | 71 +++++++++---------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/bindings/ceylon/ceylon/task/task_operation.py b/bindings/ceylon/ceylon/task/task_operation.py index 40adcd46..e0b4d47b 100644 --- a/bindings/ceylon/ceylon/task/task_operation.py +++ b/bindings/ceylon/ceylon/task/task_operation.py @@ -7,6 +7,7 @@ from loguru import logger from pydantic import BaseModel from pydantic import Field +import asyncio class SubTask(BaseModel): @@ -93,14 +94,12 @@ def _create_dependency_graph(self) -> nx.DiGraph: def validate_sub_tasks(self) -> bool: subtask_names = set(self.subtasks.keys()) - # Check if all dependencies are present for subtask in self.subtasks.values(): if not subtask.depends_on.issubset(subtask_names): missing_deps = subtask.depends_on - subtask_names logger.info(f"Subtask '{subtask.name}' has missing dependencies: {missing_deps}") return False - # Check for circular dependencies try: self._validate_dependencies() except ValueError as e: @@ -113,12 +112,13 @@ def get_execution_order(self) -> List[str]: graph = self._create_dependency_graph() return list(nx.topological_sort(graph)) - def get_next_subtask(self) -> Optional[Tuple[str, SubTask]]: + def get_ready_subtasks(self) -> List[Tuple[str, SubTask]]: + ready_subtasks = [] for subtask_name in self.execution_order: subtask = self.subtasks[subtask_name] - if all(self.subtasks[dep].completed for dep in subtask.depends_on): - return (subtask_name, subtask) - return None + if not subtask.completed and all(self.subtasks[dep].completed for dep in subtask.depends_on): + ready_subtasks.append((subtask_name, subtask)) + return ready_subtasks def update_subtask_status(self, subtask_name: str, result: str): if subtask_name not in self.subtasks: @@ -128,9 +128,6 @@ def update_subtask_status(self, subtask_name: str, result: str): if result is not None: subtask.complete(result) - if subtask_name in self.execution_order: - self.execution_order.remove(subtask_name) - def update_subtask_executor(self, subtask_name: str, executor: str) -> SubTask: if subtask_name not in self.subtasks: raise ValueError(f"Subtask {subtask_name} not found") @@ -197,41 +194,35 @@ class TaskResult(BaseModel): final_answer: str -if __name__ == "__main__": - def execute_task(task: Task) -> None: - while True: - # Get the next subtask - next_subtask: Optional[tuple[str, SubTask]] = task.get_next_subtask() +async def execute_subtask(subtask_name: str, subtask: SubTask) -> str: + print(f"Executing: {subtask}") + # Simulate subtask execution with a delay + await asyncio.sleep(1) + return f"Success: {subtask_name}" - # If there are no more subtasks, break the loop - if next_subtask is None: - break - subtask_name, subtask = next_subtask - print(f"Executing: {subtask}") +async def execute_task(task: Task) -> None: + while not task.is_completed(): + ready_subtasks = task.get_ready_subtasks() + print(f"Ready subtasks: {ready_subtasks}") - # Here you would actually execute the subtask - # For this example, we'll simulate execution with a simple print statement - print(f"Simulating execution of {subtask_name}") + if not ready_subtasks: + await asyncio.sleep(0.1) + continue - # Simulate a result (in a real scenario, this would be the outcome of the subtask execution) - result = "Success" + # Execute ready subtasks in parallel + subtask_coroutines = [execute_subtask(name, subtask) for name, subtask in ready_subtasks] + results = await asyncio.gather(*subtask_coroutines) - # Update the subtask status + # Update task status + for (subtask_name, _), result in zip(ready_subtasks, results): task.update_subtask_status(subtask_name, result) + print(f"Completed: {subtask_name}") - # Check if the entire task is completed - if task.is_completed(): - print("All subtasks completed successfully!") - break - - # Final check to see if all subtasks were completed - if task.is_completed(): - print("Task execution completed successfully!") - else: - print("Task execution incomplete. Some subtasks may have failed.") + print("All subtasks completed successfully!") +if __name__ == "__main__": # Create a task with initial subtasks web_app = Task.create_task("Build Web App", "Create a simple web application", subtasks=[ @@ -242,6 +233,9 @@ def execute_task(task: Task) -> None: SubTask(name="testing", description="Perform unit and integration tests", depends_on={"backend", "frontend"}, required_specialty="Knowledge about testing tools"), + SubTask(name="qa_test_cases", description="Perform unit and integration tests", + depends_on={"backend", "frontend"}, + required_specialty="Knowledge about testing tools"), SubTask(name="frontend", description="Develop the frontend UI", depends_on={"setup", "backend"}, required_specialty="Knowledge about frontend tools"), @@ -255,24 +249,23 @@ def execute_task(task: Task) -> None: depends_on={"deployment"}, required_specialty="Knowledge about delivery tools"), SubTask(name="qa", description="Perform quality assurance", - depends_on={"testing"}, + depends_on={"testing", "qa_test_cases"}, required_specialty="Knowledge about testing tools") ]) - # Execute the task print("Execution order:", [web_app.subtasks[task_id].name for task_id in web_app.get_execution_order()]) if web_app.validate_sub_tasks(): print("Subtasks are valid") print("\nExecuting task:") - execute_task(task=web_app) + asyncio.run(execute_task(task=web_app)) print("\nFinal task status:") print(web_app) else: print("Subtasks are invalid") - # Serialization example + # Serialization example print("\nSerialized Task:") print(web_app.model_dump_json(indent=2))