diff --git a/apps/miroflow-agent/src/config/http.py b/apps/miroflow-agent/src/config/http.py new file mode 100644 index 00000000..1d20dd58 --- /dev/null +++ b/apps/miroflow-agent/src/config/http.py @@ -0,0 +1,23 @@ +from typing import Literal, TypedDict + + +class Config(TypedDict): + name: str + kind: Literal["streamable_http"] + url: str + + +def hydrate_mcp_client_with_streamable_http(tool_list: list[str]) -> list[Config]: + """ + assert all(tool.endswith("-http") for tool in tool_list) + """ + configs: list[Config] = [] + # for tool_name in tool_list: + # if tool_name == "tool-google-search-http": + # config = Config(name=tool_name, kind="streamable_http", url="whatever") + # else: + # print("not supported") + # continue + # configs.append(config) + + return configs diff --git a/apps/miroflow-agent/src/config/settings_v2.py b/apps/miroflow-agent/src/config/settings_v2.py new file mode 100644 index 00000000..c70678ea --- /dev/null +++ b/apps/miroflow-agent/src/config/settings_v2.py @@ -0,0 +1,35 @@ +import os + +from omegaconf import DictConfig + +from .http import hydrate_mcp_client_with_streamable_http +from .sse import hydrate_mcp_client_with_sse_transport +from .stdio import hydrate_mcp_client_with_stdio_transport + + +def create_mcp_server_parameters(cfg: DictConfig, agent_cfg: DictConfig): + os.environ["OPENAI_BASE_URL"] = ( + cfg.llm.get("openai_base_url") or "https://api.openai.com/v1" + ) + os.environ["ANTHROPIC_BASE_URL"] = ( + cfg.llm.get("anthropic_base_url") or "https://api.anthropic.com" + ) + + OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL") + ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL") + + tool_list = agent_cfg.get("tools", []) + stdio_configs = hydrate_mcp_client_with_stdio_transport( + tool_list, + anthropic_base_url=ANTHROPIC_BASE_URL, + openai_base_url=OPENAI_BASE_URL, + ) + sse_configs = hydrate_mcp_client_with_sse_transport(tool_list) + http_configs = hydrate_mcp_client_with_streamable_http(tool_list) + + configs = [*stdio_configs, *sse_configs, *http_configs] + + blacklist = set() + for item in agent_cfg.get("tool_blacklist", []): + blacklist.add((item[0], item[1])) + return configs, blacklist diff --git a/apps/miroflow-agent/src/config/sse.py b/apps/miroflow-agent/src/config/sse.py new file mode 100644 index 00000000..6a4e41e3 --- /dev/null +++ b/apps/miroflow-agent/src/config/sse.py @@ -0,0 +1,23 @@ +from typing import Literal, TypedDict + + +class Config(TypedDict): + name: str + kind: Literal["sse"] + url: str + + +def hydrate_mcp_client_with_sse_transport(tool_list: list[str]) -> list[Config]: + """ + assert all(tool.endswith("-sse") for tool in tool_list) + """ + configs: list[Config] = [] + # for tool_name in tool_list: + # if tool_name == "tool-google-search-sse": + # config = Config(name=tool_name, kind="sse", url="whatever") + # else: + # print("not supported") + # continue + # configs.append(config) + + return configs diff --git a/apps/miroflow-agent/src/config/stdio.py b/apps/miroflow-agent/src/config/stdio.py new file mode 100644 index 00000000..8cad0ca3 --- /dev/null +++ b/apps/miroflow-agent/src/config/stdio.py @@ -0,0 +1,235 @@ +import sys +from typing import Literal, TypedDict + +from mcp import StdioServerParameters + +from .settings import ( + ANTHROPIC_API_KEY, + E2B_API_KEY, + JINA_API_KEY, + JINA_BASE_URL, + OPENAI_API_KEY, + REASONING_API_KEY, + REASONING_BASE_URL, + REASONING_MODEL_NAME, + SERPER_API_KEY, + SERPER_BASE_URL, + TENCENTCLOUD_SECRET_ID, + TENCENTCLOUD_SECRET_KEY, + VISION_API_KEY, + VISION_BASE_URL, + VISION_MODEL_NAME, + WHISPER_API_KEY, + WHISPER_BASE_URL, + WHISPER_MODEL_NAME, +) + + +class Config(TypedDict): + name: str + kind: Literal["stdio"] + params: StdioServerParameters + + +def hydrate_mcp_client_with_stdio_transport( + tool_list: list[str], anthropic_base_url: str, openai_base_url: str +) -> list[Config]: + configs: list[Config] = [] + for tool_name in tool_list: + if tool_name == "tool-google-search": + configs.append( + { + "name": "tool-google-search", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.searching_google_mcp_server", + ], + env={ + "SERPER_API_KEY": SERPER_API_KEY, + "SERPER_BASE_URL": SERPER_BASE_URL, + "JINA_API_KEY": JINA_API_KEY, + "JINA_BASE_URL": JINA_BASE_URL, + }, + ), + } + ) + elif tool_name == "tool-sougou-search": + configs.append( + { + "name": "tool-sougou-search", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.searching_sougou_mcp_server", + ], + env={ + "TENCENTCLOUD_SECRET_ID": TENCENTCLOUD_SECRET_ID, + "TENCENTCLOUD_SECRET_KEY": TENCENTCLOUD_SECRET_KEY, + "JINA_API_KEY": JINA_API_KEY, + "JINA_BASE_URL": JINA_BASE_URL, + }, + ), + } + ) + + elif tool_name == "tool-python": + configs.append( + { + "name": "tool-python", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.python_mcp_server"], + env={"E2B_API_KEY": E2B_API_KEY}, + ), + } + ) + elif tool_name == "tool-code": + configs.append( + { + "name": "tool-code", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.python_mcp_server"], + env={"E2B_API_KEY": E2B_API_KEY}, + ), + } + ) + elif tool_name == "tool-vqa": + configs.append( + { + "name": "tool-vqa", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.vision_mcp_server"], + env={ + "ANTHROPIC_API_KEY": ANTHROPIC_API_KEY, + "ANTHROPIC_BASE_URL": anthropic_base_url, + }, + ), + } + ) + elif tool_name == "tool-vqa-os": + configs.append( + { + "name": "tool-vqa-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.vision_mcp_server_os"], + env={ + "VISION_API_KEY": VISION_API_KEY, + "VISION_BASE_URL": VISION_BASE_URL, + "VISION_MODEL_NAME": VISION_MODEL_NAME, + }, + ), + } + ) + + elif tool_name == "tool_transcribe": + configs.append( + { + "name": "tool-transcribe", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.audio_mcp_server"], + env={ + "OPENAI_API_KEY": OPENAI_API_KEY, + "OPENAI_BASE_URL": openai_base_url, + }, + ), + } + ) + + elif tool_name == "tool-transcribe-os": + configs.append( + { + "name": "tool-transcribe-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.audio_mcp_server_os"], + env={ + "WHISPER_BASE_URL": WHISPER_BASE_URL, + "WHISPER_API_KEY": WHISPER_API_KEY, + "WHISPER_MODEL_NAME": WHISPER_MODEL_NAME, + }, + ), + } + ) + + elif tool_name == "tool-reasoning": + configs.append( + { + "name": "tool-reasoning", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.reasoning_mcp_server", + ], + env={ + "ANTHROPIC_API_KEY": ANTHROPIC_API_KEY, + "ANTHROPIC_BASE_URL": anthropic_base_url, + }, + ), + } + ) + + elif tool_name == "tool-reasoning-os": + configs.append( + { + "name": "tool-reasoning-os", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=[ + "-m", + "miroflow_tools.mcp_servers.reasoning_mcp_server_os", + ], + env={ + "REASONING_API_KEY": REASONING_API_KEY, + "REASONING_BASE_URL": REASONING_BASE_URL, + "REASONING_MODEL_NAME": REASONING_MODEL_NAME, + }, + ), + } + ) + + # reader + elif tool_name == "tool-reader": + configs.append( + { + "name": "tool-reader", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "markitdown_mcp"], + ), + } + ) + + elif tool_name == "tool-reading": + configs.append( + { + "name": "tool-reading", + "kind": "stdio", + "params": StdioServerParameters( + command=sys.executable, + args=["-m", "miroflow_tools.mcp_servers.reading_mcp_server"], + ), + } + ) + else: + print("not supported") + + return configs diff --git a/apps/miroflow-agent/src/core/pipeline.py b/apps/miroflow-agent/src/core/pipeline.py index 09478334..9ae3d462 100644 --- a/apps/miroflow-agent/src/core/pipeline.py +++ b/apps/miroflow-agent/src/core/pipeline.py @@ -38,7 +38,7 @@ async def execute_task_pipeline( task_description: str, task_file_name: str, main_agent_tool_manager: ToolManager, - sub_agent_tool_managers: List[Dict[str, ToolManager]], + sub_agent_tool_managers: Dict[str, ToolManager], output_formatter: OutputFormatter, ground_truth: Optional[Any] = None, log_dir: str = "logs", diff --git a/libs/miroflow-tools/src/miroflow_tools/manager_v2.py b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py new file mode 100644 index 00000000..caf8ae2d --- /dev/null +++ b/libs/miroflow-tools/src/miroflow_tools/manager_v2.py @@ -0,0 +1,222 @@ +from contextlib import AsyncExitStack, asynccontextmanager +from typing import Any, Literal, cast + +from mcp import ClientSession +from mcp.client.sse import sse_client +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.client.streamable_http import streamablehttp_client +from mcp.types import TextContent +from miroflow_tools.manager import ToolManagerProtocol +from pydantic import BaseModel, HttpUrl + + +class ConfigBase(BaseModel): + name: str + + +class StdIOConfig(ConfigBase): + kind: Literal["stdio"] + params: StdioServerParameters + + +class SSEConfig(ConfigBase): + kind: Literal["sse"] + url: HttpUrl + + +class StreamableHttpConfig(ConfigBase): + kind: Literal["streamable_http"] + url: HttpUrl + + +Config = StdIOConfig | SSEConfig | StreamableHttpConfig + + +@asynccontextmanager +async def connect(cfg: Config): + """ + returns a mcp.ClientSession instance, depending on Config. + """ + async with AsyncExitStack() as stack: + read, write = None, None + if cfg.kind == "stdio": + cfg = cast(StdIOConfig, cfg) + read, write = await stack.enter_async_context(stdio_client(cfg.params)) + elif cfg.kind == "sse": + cfg = cast(SSEConfig, cfg) + read, write = await stack.enter_async_context(sse_client(str(cfg.url))) + elif cfg.kind == "streamable_http": + cfg = cast(StreamableHttpConfig, cfg) + read, write, _ = await stack.enter_async_context( + streamablehttp_client(str(cfg.url)) + ) + else: # type: ignore + raise TypeError("unknown kind {} in cfg".format(cfg.kind)) + if read is not None and write is not None: + session = await stack.enter_async_context(ClientSession(read, write)) + await session.initialize() + yield session + + +class LoggingMixin: + """ + add logging instance (.task_log) and helper functions (info(), error()) to any class. + """ + + task_log: Any = None + + def add_log(self, logger: Any): + self.task_log = logger + + def _log(self, level: str, step_name: str, message: str, metadata=None): + """Helper method to log using task_log if available, otherwise skip logging.""" + if self.task_log: + self.task_log.log_step(level, step_name, message, metadata) + + def info(self, step_name: str, message: str): + self._log("info", f"ToolManagerV2 | {step_name}", message) + + def error(self, step_name: str, message: str): + self._log("error", f"ToolManagerV2 | {step_name}", message) + + +class ToolManagerV2(ToolManagerProtocol, LoggingMixin): + """ + implements a barebone ToolManager. Difference in Version 2: + 1. Deprecate huggingface block + browser session (tool name no longer matches). + 2. add supports for streamable_http. + """ + + def __init__(self, server_configs: list[dict[str, Any]]): + """ + Initialize ToolManager. + :param server_configs: List returned by create_server_parameters() + """ + parsed_configs = [] + for config in server_configs: + kind = config.get("kind") + if kind == "stdio": + config = StdIOConfig.model_validate(config) + elif kind == "sse": + config = SSEConfig.model_validate(config) + elif kind == "streamable_http": + config = StreamableHttpConfig.model_validate(config) + else: + raise ValueError(f"unknown kind {kind} in config") + parsed_configs.append(config) + self.server_dict = {config.name: config for config in parsed_configs} + + async def get_all_tool_definitions(self): + """ + Connect to all configured servers and get their tool definitions. + Returns a list suitable for passing to the Prompt generator. + """ + + async def inner_list_tools(session: ClientSession): + """helper function to reduce indentation level""" + try: + response = await session.list_tools() + return response, None + except Exception as e: + return None, e + + final = [] + # Process remote server tools + for name, config in self.server_dict.items(): + self.info( + "Get Tool Definitions", + f"Getting tool definitions for server '{name}'...", + ) + curr = {"name": name, "tools": []} + try: + async with connect(config) as session: + response, error = await inner_list_tools(session) + if error is not None: + self.error( + "List Tools Error", + f"Unable to connect or get tools from server '{name}': {str(error)}", + ) + curr["tools"] = [ + {"error": f"Unable to fetch tools: {str(error)}"} + ] + if response is not None: + for tool in response.tools: + curr["tools"].append( + { + "name": tool.name, # type: ignore + "description": tool.description, + "schema": tool.inputSchema, + } + ) + except Exception as e: + self.error("MCP session Error", f"MCP session error: {str(e)}") + curr["tools"] = [{"error": f"MCP session error: {str(e)}"}] + finally: + final.append(curr) + + return final + + async def execute_tool_call( + self, *, server_name: str, tool_name: str, arguments: dict[str, Any] + ) -> Any: + """ + Execute a single tool call. + :param server_name: Server name + :param tool_name: Tool name + :param arguments: Tool arguments dictionary + :return: Dictionary containing result or error + """ + + def rv(*, exc: str | None = None, res: str | None = None): + common = {"server_name": server_name, "tool_name": tool_name} + depends = {"error": exc} if exc is not None else {"result": res} + return common | depends + + async def inner_call_tool( + session: ClientSession, tool_name: str, arguments: dict[str, Any] + ): + """helper function to reduce indentation level""" + try: + tool_result = await session.call_tool(tool_name, arguments=arguments) + final = "" + if tool_result is not None: + if ( + getattr(tool_result, "content", None) is not None + and len(tool_result.content) > 0 + ): + block = tool_result.content[-1] + if isinstance(block, TextContent): + final = block.text + return final, None + except Exception as e: + return None, e + + config = self.server_dict.get(server_name, None) + if config is None: + self.error( + "Server Not Found", + f"Attempting to call server '{server_name}' not found", + ) + return rv(exc=f"Server '{server_name}' not found.") + + self.info( + "Tool Call Start", + f"Connecting to server '{server_name}' to call tool '{tool_name}'", + ) + try: + async with connect(config) as session: + res, exc = await inner_call_tool(session, tool_name, arguments) + if exc is not None: + self.error( + "Tool Execution Error", + f"Tool execution error: {exc}", + ) + return rv(exc=f"Tool execution failed: {str(exc)}") + if res is not None: + return rv(res=res) + except Exception as e: + self.error( + "MCP Session Error", + f"MCP session error: {e}", + ) + return rv(exc=f"MCP session error: {str(e)}")