From 273dbe88c276d25a2b364b248890d991c16f64d2 Mon Sep 17 00:00:00 2001 From: dewmal Date: Mon, 26 Aug 2024 00:14:07 +0530 Subject: [PATCH] modify task operator to run parallel tasks --- .../ceylon/ceylon/task/task_coordinator.py | 37 +++--- bindings/ceylon/ceylon/task/task_operation.py | 109 +++++++++++------- .../ceylon/tests/tasks/manage_tasks-agents.py | 57 +++++---- 3 files changed, 116 insertions(+), 87 deletions(-) diff --git a/bindings/ceylon/ceylon/task/task_coordinator.py b/bindings/ceylon/ceylon/task/task_coordinator.py index 275e909e..a35c854d 100644 --- a/bindings/ceylon/ceylon/task/task_coordinator.py +++ b/bindings/ceylon/ceylon/task/task_coordinator.py @@ -51,29 +51,34 @@ async def run_tasks(self): return for task in self.tasks: self.results[task.id] = [] - sub_task = task.get_next_subtask() - if sub_task is None: + sub_tasks = task.get_next_subtasks() + if len(sub_tasks) == 0: continue - subtask_name, subtask_ = sub_task - if subtask_.executor is None: - assigned_agent = await self.get_task_executor(subtask_) - subtask_ = task.update_subtask_executor(subtask_name, assigned_agent) - logger.debug(f"Assigned agent {subtask_.executor} to subtask {subtask_name}") - await self.broadcast_data( - TaskAssignment(task=subtask_, assigned_agent=subtask_.executor)) + for sub_task in sub_tasks: + if sub_task is None: + continue + subtask_name, subtask_ = sub_task + logger.info(f"Assigning agent to subtask {subtask_name}") + if subtask_.executor is None: + assigned_agent = await self.get_task_executor(subtask_) + subtask_ = task.update_subtask_executor(subtask_name, assigned_agent) + await self.broadcast_data( + TaskAssignment(task=subtask_, assigned_agent=subtask_.executor)) + logger.info(f"Assigned agent {subtask_.executor} to subtask {subtask_name}") @on_message(type=SubTaskResult) async def on_task_result(self, result: SubTaskResult): logger.info(f"Received task result: {result}") if result.status == TaskResultStatus.COMPLETED: for idx, task in enumerate(self.tasks): - sub_task = task.get_next_subtask() - print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id) - if sub_task is None or result.task_id != sub_task[1].id: - continue - if result.task_id == sub_task[1].id: - task.update_subtask_status(sub_task[1].name, result.result) - break + sub_tasks = task.get_next_subtasks() + for sub_task in sub_tasks: + print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id) + if sub_task is None or result.task_id != sub_task[1].id: + continue + if result.task_id == sub_task[1].id: + task.update_subtask_status(sub_task[1].name, result.result) + break # Task is completed for task in self.tasks: diff --git a/bindings/ceylon/ceylon/task/task_operation.py b/bindings/ceylon/ceylon/task/task_operation.py index e0b4d47b..687fd292 100644 --- a/bindings/ceylon/ceylon/task/task_operation.py +++ b/bindings/ceylon/ceylon/task/task_operation.py @@ -7,7 +7,6 @@ from loguru import logger from pydantic import BaseModel from pydantic import Field -import asyncio class SubTask(BaseModel): @@ -54,6 +53,15 @@ class TaskDeliverable(BaseModel): def __str__(self): return f"TaskDeliverable: {self.deliverable} - Key Features: {self.key_features} - Considerations: {self.considerations} - Objective: {self.objective}" + @staticmethod + def default(description: str) -> 'TaskDeliverable': + return TaskDeliverable( + objective="Complete the assigned task", + deliverable=description, + key_features=["Basic functionality"], + considerations=["Meet minimum requirements"] + ) + class Task(BaseModel): id: str = Field(default_factory=lambda: str(uuid4())) @@ -94,12 +102,14 @@ def _create_dependency_graph(self) -> nx.DiGraph: def validate_sub_tasks(self) -> bool: subtask_names = set(self.subtasks.keys()) + # Check if all dependencies are present for subtask in self.subtasks.values(): if not subtask.depends_on.issubset(subtask_names): missing_deps = subtask.depends_on - subtask_names logger.info(f"Subtask '{subtask.name}' has missing dependencies: {missing_deps}") return False + # Check for circular dependencies try: self._validate_dependencies() except ValueError as e: @@ -112,13 +122,20 @@ def get_execution_order(self) -> List[str]: graph = self._create_dependency_graph() return list(nx.topological_sort(graph)) - def get_ready_subtasks(self) -> List[Tuple[str, SubTask]]: - ready_subtasks = [] + # def get_next_subtask(self) -> Optional[Tuple[str, SubTask]]: + # for subtask_name in self.execution_order: + # subtask = self.subtasks[subtask_name] + # if all(self.subtasks[dep].completed for dep in subtask.depends_on): + # return (subtask_name, subtask) + # return None + + def get_next_subtasks(self) -> List[Tuple[str, SubTask]]: + subtasks = [] for subtask_name in self.execution_order: subtask = self.subtasks[subtask_name] - if not subtask.completed and all(self.subtasks[dep].completed for dep in subtask.depends_on): - ready_subtasks.append((subtask_name, subtask)) - return ready_subtasks + if all(self.subtasks[dep].completed for dep in subtask.depends_on): + subtasks.append((subtask_name, subtask)) + return subtasks def update_subtask_status(self, subtask_name: str, result: str): if subtask_name not in self.subtasks: @@ -128,6 +145,9 @@ def update_subtask_status(self, subtask_name: str, result: str): if result is not None: subtask.complete(result) + if subtask_name in self.execution_order: + self.execution_order.remove(subtask_name) + def update_subtask_executor(self, subtask_name: str, executor: str) -> SubTask: if subtask_name not in self.subtasks: raise ValueError(f"Subtask {subtask_name} not found") @@ -194,35 +214,46 @@ class TaskResult(BaseModel): final_answer: str -async def execute_subtask(subtask_name: str, subtask: SubTask) -> str: - print(f"Executing: {subtask}") - # Simulate subtask execution with a delay - await asyncio.sleep(1) - return f"Success: {subtask_name}" - - -async def execute_task(task: Task) -> None: - while not task.is_completed(): - ready_subtasks = task.get_ready_subtasks() - print(f"Ready subtasks: {ready_subtasks}") - - if not ready_subtasks: - await asyncio.sleep(0.1) - continue - - # Execute ready subtasks in parallel - subtask_coroutines = [execute_subtask(name, subtask) for name, subtask in ready_subtasks] - results = await asyncio.gather(*subtask_coroutines) - - # Update task status - for (subtask_name, _), result in zip(ready_subtasks, results): - task.update_subtask_status(subtask_name, result) - print(f"Completed: {subtask_name}") - - print("All subtasks completed successfully!") +if __name__ == "__main__": + def execute_task(task: Task) -> None: + execution_step = 0 + while True: + # Get the next subtask + next_subtasks: List[tuple[str, SubTask]] = task.get_next_subtasks() + print(f"Step {execution_step}: {next_subtasks}") + if next_subtasks is None or len(next_subtasks) == 0: + break + for next_subtask in next_subtasks: + # If there are no more subtasks, break the loop + if next_subtask is None: + break + + subtask_name, subtask = next_subtask + print(f"Executing: {subtask}") + + # Here you would actually execute the subtask + # For this example, we'll simulate execution with a simple print statement + print(f"Simulating execution of {subtask_name}") + + # Simulate a result (in a real scenario, this would be the outcome of the subtask execution) + result = "Success" + + # Update the subtask status + task.update_subtask_status(subtask_name, result) + + # Check if the entire task is completed + if task.is_completed(): + print("All subtasks completed successfully!") + break + + # Final check to see if all subtasks were completed + if task.is_completed(): + print("Task execution completed successfully!") + else: + print("Task execution incomplete. Some subtasks may have failed.") + execution_step += 1 -if __name__ == "__main__": # Create a task with initial subtasks web_app = Task.create_task("Build Web App", "Create a simple web application", subtasks=[ @@ -233,11 +264,8 @@ async def execute_task(task: Task) -> None: SubTask(name="testing", description="Perform unit and integration tests", depends_on={"backend", "frontend"}, required_specialty="Knowledge about testing tools"), - SubTask(name="qa_test_cases", description="Perform unit and integration tests", - depends_on={"backend", "frontend"}, - required_specialty="Knowledge about testing tools"), SubTask(name="frontend", description="Develop the frontend UI", - depends_on={"setup", "backend"}, + depends_on={"setup", "database"}, required_specialty="Knowledge about frontend tools"), SubTask(name="backend", description="Develop the backend API", depends_on={"setup", "database"}, @@ -249,23 +277,24 @@ async def execute_task(task: Task) -> None: depends_on={"deployment"}, required_specialty="Knowledge about delivery tools"), SubTask(name="qa", description="Perform quality assurance", - depends_on={"testing", "qa_test_cases"}, + depends_on={"testing"}, required_specialty="Knowledge about testing tools") ]) + # Execute the task print("Execution order:", [web_app.subtasks[task_id].name for task_id in web_app.get_execution_order()]) if web_app.validate_sub_tasks(): print("Subtasks are valid") print("\nExecuting task:") - asyncio.run(execute_task(task=web_app)) + execute_task(task=web_app) print("\nFinal task status:") print(web_app) else: print("Subtasks are invalid") - # Serialization example + # Serialization example print("\nSerialized Task:") print(web_app.model_dump_json(indent=2)) diff --git a/bindings/ceylon/tests/tasks/manage_tasks-agents.py b/bindings/ceylon/tests/tasks/manage_tasks-agents.py index 845ada56..9284dd62 100644 --- a/bindings/ceylon/tests/tasks/manage_tasks-agents.py +++ b/bindings/ceylon/tests/tasks/manage_tasks-agents.py @@ -1,7 +1,8 @@ from langchain_community.chat_models import ChatOllama -from ceylon.llm import LLMTaskAgent, LLMTaskManager +from ceylon.llm import LLMTaskCoordinator, LLMTaskOperator from ceylon.task import Task, SubTask +from ceylon.task.task_operation import TaskDeliverable # Example usage if __name__ == "__main__": @@ -32,7 +33,7 @@ required_specialty="Knowledge about testing tools") ]) - + web_app.task_deliverable = TaskDeliverable.default(web_app.description) tasks = [ web_app ] @@ -40,60 +41,54 @@ llm = ChatOllama(model="llama3.1:latest", temperature=0) # Create specialized agents agents = [ - LLMTaskAgent( + LLMTaskOperator( name="backend", role="Backend Developer", context="Knowledge about backend tools", skills=["Python", "Java", "Node.js"], # Example skills - tools=["Django", "Spring Boot", "Express.js"], # Example tools llm=llm ), - LLMTaskAgent( + LLMTaskOperator( name="frontend", role="Frontend Developer", context="Knowledge about frontend tools", skills=["HTML", "CSS", "JavaScript", "React"], # Example skills - tools=["React", "Angular", "Vue.js"], # Example tools llm=llm ), - LLMTaskAgent( + LLMTaskOperator( name="database", role="Database Administrator", context="Knowledge about database management tools", skills=["SQL", "NoSQL", "Database Design"], # Example skills - tools=["MySQL", "MongoDB", "PostgreSQL"], # Example tools llm=llm ), - - LLMTaskAgent( + # + LLMTaskOperator( name="deployment", role="Deployment Manager", context="Knowledge about deployment tools and CI tools", skills=["CI/CD", "Docker", "Kubernetes"], # Example skills - tools=["Jenkins", "Docker", "Kubernetes"], # Example tools llm=llm ), - - LLMTaskAgent( - name="qa", - role="Quality Assurance Engineer", - context="Knowledge about testing tools", - skills=["Automated Testing", "Manual Testing", "Test Case Design"], # Example skills - tools=["Selenium", "JUnit", "TestNG"], # Example tools - llm=llm - ), - - LLMTaskAgent( - name="delivery", - role="Delivery Manager", - context="Knowledge about delivery tools", - skills=["Release Management", "Continuous Delivery"], # Example skills - tools=["Jira", "Confluence", "GitLab CI"], # Example tools - llm=llm - ) + # + # LLMTaskOperator( + # name="qa", + # role="Quality Assurance Engineer", + # context="Knowledge about testing tools", + # skills=["Automated Testing", "Manual Testing", "Test Case Design"], # Example skills + # llm=llm + # ), + # + # LLMTaskOperator( + # name="delivery", + # role="Delivery Manager", + # context="Knowledge about delivery tools", + # skills=["Release Management", "Continuous Delivery"], # Example skills + # llm=llm + # ) ] - task_manager = LLMTaskManager(tasks, agents, tool_llm=llm) - task_manager.run_admin(inputs=b"", workers=agents) + task_manager = LLMTaskCoordinator(tasks, agents, llm=llm) + task_manager.do()