diff --git a/config/tool/tool-reasoning.yaml b/config/tool/tool-reasoning.yaml index ba9cc64d..e553b7c3 100644 --- a/config/tool/tool-reasoning.yaml +++ b/config/tool/tool-reasoning.yaml @@ -8,7 +8,7 @@ env: # These values will be loaded from the .env file at runtime OPENAI_API_KEY: "${oc.env:OPENROUTER_API_KEY}" OPENAI_BASE_URL: "${oc.env:OPENROUTER_BASE_URL,https://openrouter.ai/api/v1}" - OPENAI_MODEL_NAME: "${oc.env:OPENROUTER_MODEL_NAME,anthropic/claude-3-7-sonnet:thinking}" + OPENAI_MODEL_NAME: "${oc.env:OPENROUTER_MODEL_NAME,anthropic/claude-3.7-sonnet:thinking}" ANTHROPIC_API_KEY: "${oc.env:ANTHROPIC_API_KEY}" ANTHROPIC_BASE_URL: "${oc.env:ANTHROPIC_BASE_URL,https://api.anthropic.com}" ANTHROPIC_MODEL_NAME: "${oc.env:ANTHROPIC_MODEL_NAME,claude-3-7-sonnet-20250219}" diff --git a/docs/mkdocs/docs/contribute_tools.md b/docs/mkdocs/docs/contribute_tools.md index 6e3106eb..6a3d5079 100644 --- a/docs/mkdocs/docs/contribute_tools.md +++ b/docs/mkdocs/docs/contribute_tools.md @@ -27,7 +27,8 @@ async def tool_name(param: str) -> str: Explanation of the tool, its parameters, and return value. """ tool_result = ... # Your logic here - return tool_result + usage = ... # Usage of billing tools + return ["text": tool_result, "usage": usage] if __name__ == "__main__": mcp.run(transport="stdio") @@ -75,6 +76,9 @@ sub_agents: --- +### Step 4: Update Tool Usage Pricing + +You need to add your price per unit of usage to `"tool"` in `utils/usage/pricing.json`. This needs to be in the appropriate string format; it can be a formula or a conditional statement. If you need to add other formats, please make the corresponding modifications at `utils/usage/calculate_usage_from_log.py`. !!! info "Documentation Info" **Last Updated:** September 2025 · **Doc Contributor:** Team @ MiroMind AI \ No newline at end of file diff --git a/src/core/orchestrator.py b/src/core/orchestrator.py index 854b6832..9a62de29 100644 --- a/src/core/orchestrator.py +++ b/src/core/orchestrator.py @@ -528,6 +528,12 @@ async def run_sub_agent( sub_agent_name ].execute_tool_call(server_name, tool_name, arguments) + self.task_log.log_step( + "tool_usage", + str(tool_result["usage"]), + ) + tool_result.pop("usage", None) + call_end_time = time.time() call_duration_ms = int((call_end_time - call_start_time) * 1000) @@ -899,6 +905,12 @@ async def run_main_agent( ) ) + self.task_log.log_step( + "tool_usage", + str(tool_result["usage"]), + ) + tool_result.pop("usage", None) + call_end_time = time.time() call_duration_ms = int((call_end_time - call_start_time) * 1000) diff --git a/src/llm/provider_client_base.py b/src/llm/provider_client_base.py index a8aeb72c..60b5f45d 100644 --- a/src/llm/provider_client_base.py +++ b/src/llm/provider_client_base.py @@ -36,9 +36,11 @@ class LLMProviderClientBase(ABC): client: Any = dataclasses.field(init=False) # Usage tracking - cumulative for each agent session total_input_tokens: int = dataclasses.field(init=False, default=0) - total_input_cached_tokens: int = dataclasses.field(init=False, default=0) + total_input_cached_read_tokens: int = dataclasses.field(init=False, default=0) + total_input_cached_write_tokens: int = dataclasses.field(init=False, default=0) total_output_tokens: int = dataclasses.field(init=False, default=0) total_output_reasoning_tokens: int = dataclasses.field(init=False, default=0) + total_fee: float = dataclasses.field(init=False, default=0) def __post_init__(self): # Explicitly assign from cfg object @@ -208,11 +210,17 @@ async def create_message( usage = self._extract_usage_from_response(response) if usage: self.total_input_tokens += usage.get("input_tokens", 0) - self.total_input_cached_tokens += usage.get("cached_tokens", 0) + self.total_input_cached_read_tokens += usage.get( + "cached_read_tokens", 0 + ) + self.total_input_cached_write_tokens += usage.get( + "cached_write_tokens", 0 + ) self.total_output_tokens += usage.get("output_tokens", 0) self.total_output_reasoning_tokens += usage.get( "reasoning_tokens", 0 ) + self.total_fee += usage.get("fee", 0) except Exception as e: logger.warning(f"Failed to accumulate usage: {e}") @@ -341,9 +349,11 @@ def _extract_usage_from_response(self, response): if not hasattr(response, "usage"): return { "input_tokens": 0, - "cached_tokens": 0, + "cached_read_tokens": 0, + "cached_write_tokens": 0, "output_tokens": 0, "reasoning_tokens": 0, + "fee": 0, } usage = response.usage @@ -358,9 +368,11 @@ def _extract_usage_from_response(self, response): usage_dict = { "input_tokens": getattr(usage, "prompt_tokens", 0), - "cached_tokens": prompt_tokens_details.get("cached_tokens", 0), + "cached_read_tokens": prompt_tokens_details.get("cached_tokens", 0), + "cached_write_tokens": 0, "output_tokens": getattr(usage, "completion_tokens", 0), "reasoning_tokens": completion_tokens_details.get("reasoning_tokens", 0), + "fee": getattr(usage, "cost", 0), } return usage_dict @@ -369,20 +381,27 @@ def get_usage_log(self) -> str: """Get cumulative usage for current agent session as formatted string""" # Format: [Provider | Model] Total Input: X, Cache Input: Y, Output: Z, ... provider_model = f"[{self.provider_class} | {self.model_name}]" - input_uncached = self.total_input_tokens - self.total_input_cached_tokens + input_uncached = ( + self.total_input_tokens + - self.total_input_cached_read_tokens + - self.total_input_cached_write_tokens + ) output_response = self.total_output_tokens - self.total_output_reasoning_tokens total_tokens = self.total_input_tokens + self.total_output_tokens return ( f"Usage log: {provider_model}, " - f"Total Input: {self.total_input_tokens} (Cached: {self.total_input_cached_tokens}, Uncached: {input_uncached}), " + f"Total Input: {self.total_input_tokens} (Cached Read: {self.total_input_cached_read_tokens}, Cached Write: {self.total_input_cached_write_tokens}, Uncached: {input_uncached}), " f"Total Output: {self.total_output_tokens} (Reasoning: {self.total_output_reasoning_tokens}, Response: {output_response}), " - f"Total Tokens: {total_tokens}" + f"Total Tokens: {total_tokens}, " + f"Total Fee: {self.total_fee}" ) def reset_usage_stats(self): """Reset usage stats for new agent session""" self.total_input_tokens = 0 - self.total_input_cached_tokens = 0 + self.total_input_cached_read_tokens = 0 + self.total_input_cached_write_tokens = 0 self.total_output_tokens = 0 self.total_output_reasoning_tokens = 0 + self.total_fee = 0 diff --git a/src/llm/providers/claude_anthropic_client.py b/src/llm/providers/claude_anthropic_client.py index eecbcbf3..a946dd00 100644 --- a/src/llm/providers/claude_anthropic_client.py +++ b/src/llm/providers/claude_anthropic_client.py @@ -190,9 +190,11 @@ def _extract_usage_from_response(self, response): if not hasattr(response, "usage"): return { "input_tokens": 0, - "cached_tokens": 0, + "cached_read_tokens": 0, + "cached_write_tokens": 0, "output_tokens": 0, "reasoning_tokens": 0, + "fee": 0, } usage = response.usage @@ -205,9 +207,11 @@ def _extract_usage_from_response(self, response): "input_tokens": cache_creation_input_tokens + cache_read_input_tokens + input_tokens, - "cached_tokens": cache_read_input_tokens, + "cached_read_tokens": cache_read_input_tokens, + "cached_write_tokens": cache_creation_input_tokens, "output_tokens": output_tokens, "reasoning_tokens": 0, + "fee": getattr(usage, "cost", 0), } return usage_dict diff --git a/src/tool/manager.py b/src/tool/manager.py index 2d542c23..c360309e 100644 --- a/src/tool/manager.py +++ b/src/tool/manager.py @@ -318,6 +318,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "error": error_message, + "usage": {}, } logger.info( @@ -342,22 +343,26 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "result": f"Tool '{tool_name}' returned empty result - this may be expected (e.g., delete operations) or indicate an issue with tool execution", + "usage": {}, } return { "server_name": server_name, "tool_name": tool_name, "result": tool_result, + "usage": {}, } except Exception as e: return { "server_name": server_name, "tool_name": tool_name, "error": f"Tool call failed: {str(e)}", + "usage": {}, } else: try: result_content = None + usage = {} if isinstance(server_params, StdioServerParameters): async with stdio_client( update_server_params_with_context_var(server_params) @@ -371,8 +376,18 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: tool_name, arguments=arguments ) # Safely extract result content without changing original format - if tool_result.content and len(tool_result.content) > 0: - text_content = tool_result.content[-1].text + if tool_result.structuredContent: + text_content = tool_result.structuredContent["text"] + usage = tool_result.structuredContent.get( + "usage", {} + ) + if ( + not usage + and server_name == "tool-searching-serper" + ): + usage = {"SERPER": 1} + logger.info(f"Tool result content: {text_content}") + logger.info(f"Tool result usage: {usage}") if ( text_content is not None and text_content.strip() @@ -386,7 +401,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: result_content = f"Tool '{tool_name}' completed but returned no content - this may be expected or indicate an issue" # If result is empty, log warning - if not tool_result.content: + if not tool_result.structuredContent: logger.error( f"Tool '{tool_name}' returned empty content, tool_result.content: {tool_result.content}" ) @@ -400,6 +415,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "error": f"Tool execution failed: {str(tool_error)}", + "usage": {}, } elif isinstance(server_params, str) and server_params.startswith( ("http://", "https://") @@ -414,8 +430,18 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: tool_name, arguments=arguments ) # Safely extract result content without changing original format - if tool_result.content and len(tool_result.content) > 0: - text_content = tool_result.content[-1].text + if tool_result.structuredContent: + text_content = tool_result.structuredContent["text"] + usage = tool_result.structuredContent.get( + "usage", {} + ) + if ( + not usage + and server_name == "tool-searching-serper" + ): + usage = {"SERPER": 1} + logger.info(f"Tool result content: {text_content}") + logger.info(f"Tool result usage: {usage}") if ( text_content is not None and text_content.strip() @@ -429,7 +455,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: result_content = f"Tool '{tool_name}' completed but returned no content - this may be expected or indicate an issue" # If result is empty, log warning - if not tool_result.content: + if not tool_result.structuredContent: logger.error( f"Tool '{tool_name}' returned empty content, tool_result.content: {tool_result.content}" ) @@ -443,6 +469,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "error": f"Tool execution failed: {str(tool_error)}", + "usage": {}, } else: raise TypeError( @@ -470,6 +497,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "result": result_content, # Return extracted text content + "usage": usage, } except Exception as outer_e: # Rename this to outer_e to avoid shadowing @@ -501,6 +529,7 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "result": result.text_content, # Return extracted text content + "usage": {}, } except ( Exception @@ -514,4 +543,5 @@ async def execute_tool_call(self, server_name, tool_name, arguments) -> Any: "server_name": server_name, "tool_name": tool_name, "error": f"Tool call failed: {error_message}", + "usage": {}, } diff --git a/src/tool/mcp_servers/audio_mcp_server.py b/src/tool/mcp_servers/audio_mcp_server.py index b0f948bc..a9e2cdb4 100755 --- a/src/tool/mcp_servers/audio_mcp_server.py +++ b/src/tool/mcp_servers/audio_mcp_server.py @@ -129,7 +129,7 @@ def _encode_audio_file(audio_path: str) -> tuple[str, str]: @mcp.tool() -async def audio_transcription(audio_path_or_url: str) -> str: +async def audio_transcription(audio_path_or_url: str) -> dict: """ Transcribe audio file to text and return the transcription. Args: @@ -148,10 +148,13 @@ async def audio_transcription(audio_path_or_url: str) -> str: if os.path.exists(audio_path_or_url): # Check if the file exists locally with open(audio_path_or_url, "rb") as audio_file: transcription = client.audio.transcriptions.create( - model="gpt-4o-transcribe", file=audio_file + model=OPENAI_TRANSCRIPTION_MODEL_NAME, file=audio_file ) elif "home/user" in audio_path_or_url: - return "The audio_transcription tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The audio_transcription tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } else: # download the audio file from the URL response = requests.get(audio_path_or_url) @@ -188,19 +191,35 @@ async def audio_transcription(audio_path_or_url: str) -> str: except requests.RequestException as e: retry += 1 if retry >= max_retries: - return f"[ERROR]: Audio transcription failed: Failed to download audio file - {e}.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported." + return { + "text": f"[ERROR]: Audio transcription failed: Failed to download audio file - {e}.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported.", + "usage": {}, + } await asyncio.sleep(5 * (2**retry)) except Exception as e: retry += 1 if retry >= max_retries: - return f"[ERROR]: Audio transcription failed: {e}\nNote: Files from sandbox are not available. You should use local path given in the instruction. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported." + return { + "text": f"[ERROR]: Audio transcription failed: {e}\nNote: Files from sandbox are not available. You should use local path given in the instruction. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported.", + "usage": {}, + } await asyncio.sleep(5 * (2**retry)) - return transcription.text + audio_usage = ( + getattr(transcription, "usage", {}) + .get("input_token_details", {}) + .get("audio_tokens", 0) + ) + return { + "text": transcription.text, + "usage": { + f"audio_transcription_openai_{OPENAI_TRANSCRIPTION_MODEL_NAME}": audio_usage + }, + } @mcp.tool() -async def audio_question_answering(audio_path_or_url: str, question: str) -> str: +async def audio_question_answering(audio_path_or_url: str, question: str) -> dict: """ Answer the question based on the given audio information. @@ -221,7 +240,10 @@ async def audio_question_answering(audio_path_or_url: str, question: str) -> str encoded_string, file_format = _encode_audio_file(audio_path_or_url) duration = _get_audio_duration(audio_path_or_url) elif "home/user" in audio_path_or_url: - return "The audio_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The audio_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } else: # download the audio file from the URL response = requests.get( @@ -234,7 +256,10 @@ async def audio_question_answering(audio_path_or_url: str, question: str) -> str # Basic content validation - check if response has content if not response.content: - return "[ERROR]: Audio question answering failed: Downloaded file is empty.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3.\nNote: YouTube video URL is not supported." + return { + "text": "[ERROR]: Audio question answering failed: Downloaded file is empty.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3.\nNote: YouTube video URL is not supported.", + "usage": {}, + } # Check content type if available content_type = response.headers.get("content-type", "").lower() @@ -258,7 +283,10 @@ async def audio_question_answering(audio_path_or_url: str, question: str) -> str os.remove(temp_audio_path) if encoded_string is None or file_format is None: - return "[ERROR]: Audio question answering failed: Failed to encode audio file.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3.\nNote: YouTube video URL is not supported." + return { + "text": "[ERROR]: Audio question answering failed: Failed to encode audio file.\nNote: Files from sandbox are not available. You should use local path given in the instruction. \nURLs must include the proper scheme (e.g., 'https://') and be publicly accessible. The file should be in a common audio format such as MP3.\nNote: YouTube video URL is not supported.", + "usage": {}, + } response = client.chat.completions.create( model=OPENAI_AUDIO_MODEL_NAME, @@ -283,12 +311,30 @@ async def audio_question_answering(audio_path_or_url: str, question: str) -> str ], ) except Exception as e: - return f"[ERROR]: Audio question answering failed when calling OpenAI API: {e}\nNote: Files from sandbox are not available. You should use local path given in the instruction. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported." + return { + "text": f"[ERROR]: Audio question answering failed when calling OpenAI API: {e}\nNote: Files from sandbox are not available. You should use local path given in the instruction. The file should be in a common audio format such as MP3, WAV, or M4A.\nNote: YouTube video URL is not supported.", + "usage": {}, + } - response = response.choices[0].message.content - response += f"\n\nAudio duration: {duration} seconds" + content = response.choices[0].message.content + content += f"\n\nAudio duration: {duration} seconds" - return response + try: + audio_qa_usage_audio = response.usage.prompt_tokens_details.audio_tokens + audio_qa_usage_input_text = response.usage.prompt_tokens_details.text_tokens + audio_qa_usage_output_text = response.usage.completion_tokens + return { + "text": content, + "usage": { + f"audio_qa_openai_{OPENAI_AUDIO_MODEL_NAME}": { + "audio": audio_qa_usage_audio, + "input_text": audio_qa_usage_input_text, + "output_text": audio_qa_usage_output_text, + } + }, + } + except Exception: + return {"text": content, "usage": {}} if __name__ == "__main__": diff --git a/src/tool/mcp_servers/miroapi_serper_mcp_server.py b/src/tool/mcp_servers/miroapi_serper_mcp_server.py index 95fd25b1..bb8dadf5 100644 --- a/src/tool/mcp_servers/miroapi_serper_mcp_server.py +++ b/src/tool/mcp_servers/miroapi_serper_mcp_server.py @@ -100,17 +100,23 @@ def google_search( # Check for API key if not SERPER_API_KEY: return { - "success": False, - "error": "SERPER_API_KEY environment variable not set", - "results": [], + "text": { + "success": False, + "error": "SERPER_API_KEY environment variable not set", + "results": [], + }, + "usage": {}, } # Validate required parameter if not q or not q.strip(): return { - "success": False, - "error": "Search query 'q' is required and cannot be empty", - "results": [], + "text": { + "success": False, + "error": "Search query 'q' is required and cannot be empty", + "results": [], + }, + "usage": {}, } try: @@ -155,10 +161,17 @@ def google_search( response_data["organic"] = organic_results response_data = decode_http_urls_in_dict(response_data) - return response_data + return {"text": response_data, "usage": {"MIRO_SERPER": 1}} except Exception as e: - return {"success": False, "error": f"Unexpected error: {str(e)}", "results": []} + return { + "text": { + "success": False, + "error": f"Unexpected error: {str(e)}", + "results": [], + }, + "usage": {}, + } if __name__ == "__main__": diff --git a/src/tool/mcp_servers/python_server.py b/src/tool/mcp_servers/python_server.py index dbaaa5ea..8e2c0dcc 100755 --- a/src/tool/mcp_servers/python_server.py +++ b/src/tool/mcp_servers/python_server.py @@ -11,6 +11,8 @@ # Initialize FastMCP server from src.logging.logger import setup_mcp_logging +import time + setup_mcp_logging(tool_name=os.path.basename(__file__)) mcp = FastMCP("e2b-python-interpreter") @@ -135,7 +137,7 @@ async def _install_common_packages(sandbox, sandbox_id: str) -> bool: @mcp.tool() -async def create_sandbox() -> str: +async def create_sandbox() -> dict: """Create a linux sandbox and get the `sandbox_id` for safely executing commands and running python code. Note that the `sandbox_id` can only be assigned and cannot be manually specified. The sandbox may timeout and automatically shutdown. If so, you will need to create a new sandbox. @@ -155,16 +157,40 @@ async def create_sandbox() -> str: api_key=E2B_API_KEY, ) info = sandbox.get_info() + metrics = None + for _ in range(10): + await asyncio.sleep(5) + metrics = sandbox.get_metrics() + if metrics: + break + if not metrics: + raise TimeoutError("Failed to get metrics after 50s") + + cpu_count = metrics[0].cpu_count + mem_total = metrics[0].mem_total # Install common packages before running code # await _install_common_packages(sandbox, info.sandbox_id) tmpfiles_dir = os.path.join(LOGS_DIR, "tmpfiles") os.makedirs(tmpfiles_dir, exist_ok=True) - return f"Sandbox created with sandbox_id: {info.sandbox_id}" + return { + "text": f"Sandbox created with sandbox_id: {info.sandbox_id}", + "usage": { + f"sandbox_{info.sandbox_id}": { + "cpu": cpu_count, + "mem": mem_total, + "start_time": time.time(), + "end_time": time.time() + DEFAULT_TIMEOUT, + } + }, + } except Exception as e: if attempt == max_retries: - return f"Failed to create sandbox after {max_retries} attempts: {e}, please retry later." + return { + "text": f"Failed to create sandbox after {max_retries} attempts: {e}, please retry later.", + "usage": {}, + } await asyncio.sleep(attempt * 2) # Exponential backoff finally: # Set timeout before exit to prevent timeout after function exits @@ -175,7 +201,7 @@ async def create_sandbox() -> str: @mcp.tool() -async def run_command(sandbox_id: str, command: str) -> str: +async def run_command(sandbox_id: str, command: str) -> dict: """Execute a shell command in the linux sandbox. The sandbox is already installed with common system packages for the task. @@ -190,7 +216,10 @@ async def run_command(sandbox_id: str, command: str) -> str: try: sandbox = Sandbox.connect(sandbox_id, api_key=E2B_API_KEY) except Exception: - return f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct." + return { + "text": f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct.", + "usage": {}, + } max_retries = 5 for attempt in range(1, max_retries + 1): @@ -205,7 +234,12 @@ async def run_command(sandbox_id: str, command: str) -> str: if "pip install" in command or "apt-get" in command: result_str += "\n\n[PACKAGE INSTALL STATUS]: The system packages and Python packages required for the task have been installed. No need to install them again unless a missing package error occurs during execution." - return result_str + return { + "text": result_str, + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } except Exception as e: if attempt == max_retries: error_msg = f"[ERROR]: Failed to run command after {max_retries} attempts. Exception type: {type(e).__name__}, Details: {e}. \n\n[HINT]: Shell commands can be error-prone. Consider using the `run_python_code` tool instead to accomplish the same task with Python code, which often provides better error handling and more detailed error messages.\n\n[PERMISSION HINT]: You are running as user, not root. If you encounter permission issues, use `sudo` for commands that require administrator privileges (e.g., `sudo apt-get install`, `sudo systemctl`, etc.)." @@ -214,7 +248,14 @@ async def run_command(sandbox_id: str, command: str) -> str: if "pip install" in command or "apt-get" in command: error_msg += "\n\n[PACKAGE INSTALL STATUS]: The system packages and Python packages required for the task have been installed. No need to install them again unless a missing package error occurs during execution." - return error_msg + return { + "text": error_msg, + "usage": { + f"sandbox_{sandbox_id}": { + "end_time": time.time() + DEFAULT_TIMEOUT + } + }, + } await asyncio.sleep(attempt * 2) # Exponential backoff finally: # Set timeout before exit to prevent timeout after function exits @@ -225,7 +266,7 @@ async def run_command(sandbox_id: str, command: str) -> str: @mcp.tool() -async def run_python_code(sandbox_id: str, code_block: str) -> str: +async def run_python_code(sandbox_id: str, code_block: str) -> dict: """Run python code in the sandbox and return the execution result. The sandbox is already installed with common python packages for the task. @@ -240,7 +281,10 @@ async def run_python_code(sandbox_id: str, code_block: str) -> str: try: sandbox = Sandbox.connect(sandbox_id=sandbox_id, api_key=E2B_API_KEY) except Exception: - return f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct." + return { + "text": f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct.", + "usage": {}, + } max_retries = 5 for attempt in range(1, max_retries + 1): @@ -250,10 +294,22 @@ async def run_python_code(sandbox_id: str, code_block: str) -> str: ) # refresh the timeout for each command execution execution = sandbox.run_code(code_block) - return str(execution) + return { + "text": str(execution), + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } except Exception as e: if attempt == max_retries: - return f"[ERROR]: Failed to run code in sandbox {sandbox_id} after {max_retries} attempts. Exception type: {type(e).__name__}, Details: {e}." + return { + "text": f"[ERROR]: Failed to run code in sandbox {sandbox_id} after {max_retries} attempts. Exception type: {type(e).__name__}, Details: {e}.", + "usage": { + f"sandbox_{sandbox_id}": { + "end_time": time.time() + DEFAULT_TIMEOUT + } + }, + } await asyncio.sleep(attempt * 2) # Exponential backoff finally: # Set timeout before exit to prevent timeout after function exits @@ -266,7 +322,7 @@ async def run_python_code(sandbox_id: str, code_block: str) -> str: @mcp.tool() async def upload_file_from_local_to_sandbox( sandbox_id: str, local_file_path: str, sandbox_file_path: str = "/home/user" -) -> str: +) -> dict: """Upload a local file to the `/home/user` dir of the sandbox. Args: @@ -281,7 +337,10 @@ async def upload_file_from_local_to_sandbox( try: sandbox = Sandbox.connect(sandbox_id, api_key=E2B_API_KEY) except Exception: - return f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct." + return { + "text": f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct.", + "usage": {}, + } try: sandbox.set_timeout( @@ -297,9 +356,19 @@ async def upload_file_from_local_to_sandbox( with open(local_file_path, "rb") as f: sandbox.files.write(uploaded_file_path, f) - return f"File uploaded to {uploaded_file_path}\n\n[INFO]: For directly reading local files without uploading to sandbox, consider using the `read_file` tool which can read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) directly from local paths or URLs. Note that `read_file` doesn't support files already in the sandbox." + return { + "text": f"File uploaded to {uploaded_file_path}\n\n[INFO]: For directly reading local files without uploading to sandbox, consider using the `read_file` tool which can read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) directly from local paths or URLs. Note that `read_file` doesn't support files already in the sandbox.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } except Exception as e: - return f"[ERROR]: Failed to upload file {local_file_path} to sandbox {sandbox_id}: {e}\n\n[INFO]: This tool is for uploading local files to the sandbox. For security reasons, downloading files from sandbox to local system is not supported. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from local paths or URLs without uploading to sandbox." + return { + "text": f"[ERROR]: Failed to upload file {local_file_path} to sandbox {sandbox_id}: {e}\n\n[INFO]: This tool is for uploading local files to the sandbox. For security reasons, downloading files from sandbox to local system is not supported. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from local paths or URLs without uploading to sandbox.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } finally: # Set timeout before exit to prevent timeout after function exits try: @@ -311,7 +380,7 @@ async def upload_file_from_local_to_sandbox( @mcp.tool() async def download_file_from_internet_to_sandbox( sandbox_id: str, url: str, sandbox_file_path: str = "/home/user" -) -> str: +) -> dict: """Download a file from the internet to the `/home/user` dir of the sandbox. You should use this tool to download files from the internet. @@ -327,7 +396,10 @@ async def download_file_from_internet_to_sandbox( try: sandbox = Sandbox.connect(sandbox_id, api_key=E2B_API_KEY) except Exception: - return f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct." + return { + "text": f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct.", + "usage": {}, + } try: sandbox.set_timeout( @@ -341,14 +413,33 @@ async def download_file_from_internet_to_sandbox( for attempt in range(1, max_retries + 1): result = sandbox.commands.run(f"wget {url} -O {downloaded_file_path}") if result.exit_code == 0: - return f"File downloaded to {downloaded_file_path}\n\n[INFO]: For directly reading files from internet URLs without downloading to sandbox, consider using the `read_file` tool which can read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) directly from URLs. Note that `read_file` doesn't support files already in the sandbox." + return { + "text": f"File downloaded to {downloaded_file_path}\n\n[INFO]: For directly reading files from internet URLs without downloading to sandbox, consider using the `read_file` tool which can read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) directly from URLs. Note that `read_file` doesn't support files already in the sandbox.", + "usage": { + f"sandbox_{sandbox_id}": { + "end_time": time.time() + DEFAULT_TIMEOUT + } + }, + } elif attempt < max_retries: await asyncio.sleep(4**attempt) continue # Retry else: - return f"[ERROR]: Failed to download file from {url} to {downloaded_file_path} after {max_retries} attempts: {result}.\n\n[INFO]: This tool is for downloading files from the internet to the sandbox. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from internet URLs without downloading to sandbox." + return { + "text": f"[ERROR]: Failed to download file from {url} to {downloaded_file_path} after {max_retries} attempts: {result}.\n\n[INFO]: This tool is for downloading files from the internet to the sandbox. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from internet URLs without downloading to sandbox.", + "usage": { + f"sandbox_{sandbox_id}": { + "end_time": time.time() + DEFAULT_TIMEOUT + } + }, + } except Exception as e: - return f"[ERROR]: Failed to download file from {url}: {e}\n\n[INFO]: This tool is for downloading files from the internet to the sandbox. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from internet URLs without downloading to sandbox." + return { + "text": f"[ERROR]: Failed to download file from {url}: {e}\n\n[INFO]: This tool is for downloading files from the internet to the sandbox. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead. Alternatively, consider using the `read_file` tool which can directly read various file types (Doc, PPT, PDF, Excel, CSV, ZIP, etc.) from internet URLs without downloading to sandbox.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } finally: # Set timeout before exit to prevent timeout after function exits try: @@ -360,7 +451,7 @@ async def download_file_from_internet_to_sandbox( @mcp.tool() async def download_file_from_sandbox_to_local( sandbox_id: str, sandbox_file_path: str, local_filename: str = None -) -> str: +) -> dict: """Download a file from the sandbox to local system. Files in sandbox cannot be processed by tools from other servers - only local files and internet URLs can be processed by them. Args: @@ -375,7 +466,10 @@ async def download_file_from_sandbox_to_local( try: sandbox = Sandbox.connect(sandbox_id, api_key=E2B_API_KEY) except Exception: - return f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct." + return { + "text": f"[ERROR]: Failed to connect to sandbox {sandbox_id}, retry later. Make sure the sandbox is created and the id is correct.", + "usage": {}, + } try: sandbox.set_timeout( @@ -384,7 +478,12 @@ async def download_file_from_sandbox_to_local( # Create tmpfiles directory if it doesn't exist if not LOGS_DIR: - return "[ERROR]: LOGS_DIR environment variable is not set. Cannot determine where to save the file." + return { + "text": "[ERROR]: LOGS_DIR environment variable is not set. Cannot determine where to save the file.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } tmpfiles_dir = os.path.join(LOGS_DIR, "tmpfiles") os.makedirs(tmpfiles_dir, exist_ok=True) @@ -402,9 +501,19 @@ async def download_file_from_sandbox_to_local( content = sandbox.files.read(sandbox_file_path, format="bytes") f.write(content) - return f"File downloaded successfully to: {local_file_path}\n\n[INFO]: The file can now be accessed by other tools (reading, question-answering, etc.) which only support local files and internet URLs, not sandbox files." + return { + "text": f"File downloaded successfully to: {local_file_path}\n\n[INFO]: The file can now be accessed by other tools (reading, question-answering, etc.) which only support local files and internet URLs, not sandbox files.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } except Exception as e: - return f"[ERROR]: Failed to download file {sandbox_file_path} from sandbox {sandbox_id}: {e}\n\n[INFO]: This tool is for downloading files from the sandbox to local system. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead." + return { + "text": f"[ERROR]: Failed to download file {sandbox_file_path} from sandbox {sandbox_id}: {e}\n\n[INFO]: This tool is for downloading files from the sandbox to local system. To upload local files to the sandbox, use `upload_file_from_local_to_sandbox` instead.", + "usage": { + f"sandbox_{sandbox_id}": {"end_time": time.time() + DEFAULT_TIMEOUT} + }, + } finally: # Set timeout before exit to prevent timeout after function exits try: diff --git a/src/tool/mcp_servers/reading_mcp_server.py b/src/tool/mcp_servers/reading_mcp_server.py index 5e70fa15..06ee0f96 100644 --- a/src/tool/mcp_servers/reading_mcp_server.py +++ b/src/tool/mcp_servers/reading_mcp_server.py @@ -24,7 +24,7 @@ @mcp.tool() -async def read_file(uri: str) -> str: +async def read_file(uri: str) -> dict: """Read various types of resources (Doc, PPT, PDF, Excel, CSV, ZIP file etc.) described by an file: or data: URI. @@ -35,10 +35,16 @@ async def read_file(uri: str) -> str: str: The content of the resource, or an error message if reading fails. """ if not uri or not uri.strip(): - return "[ERROR]: URI parameter is required and cannot be empty." + return { + "text": "[ERROR]: URI parameter is required and cannot be empty.", + "usage": {}, + } if "home/user" in uri: - return "The read_file tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The read_file tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } # Validate URI scheme valid_schemes = ["http:", "https:", "file:", "data:"] @@ -49,7 +55,10 @@ async def read_file(uri: str) -> str: # Validate URI scheme if not any(uri.lower().startswith(scheme) for scheme in valid_schemes): - return f"[ERROR]: Invalid URI scheme. Supported schemes are: {', '.join(valid_schemes)}" + return { + "text": f"[ERROR]: Invalid URI scheme. Supported schemes are: {', '.join(valid_schemes)}", + "usage": {}, + } # If it’s an HTTP(S) URL, download it first with a compliant UA: if uri.lower().startswith(("http://", "https://")): @@ -69,16 +78,24 @@ async def read_file(uri: str) -> str: if retry_count > 3: # Try scrape_website tool as fallback try: - scrape_result = await smart_request( + response = await smart_request( uri, env={ "SERPER_API_KEY": SERPER_API_KEY, "JINA_API_KEY": JINA_API_KEY, }, ) - return f"[INFO]: Download failed, automatically tried `scrape_website` tool instead.\n\n{scrape_result}" + scrape_result = response.get("text", None) + scrape_usage = response.get("usage", {}) + return { + "text": f"[INFO]: Download failed, automatically tried `scrape_website` tool instead.\n\n{scrape_result}", + "usage": scrape_usage, + } except Exception as scrape_error: - return f"[ERROR]: Failed to download {uri}: {e}. Also failed to scrape with `scrape_website` tool: {scrape_error}" + return { + "text": f"[ERROR]: Failed to download {uri}: {e}. Also failed to scrape with `scrape_website` tool: {scrape_error}", + "usage": {}, + } await asyncio.sleep(4**retry_count) # write to a temp file and switch URI to file: @@ -120,13 +137,17 @@ def _cleanup_tempfile(path): ) result_content += "\n\nNote: If the document contains instructions or important information, please review them thoroughly and ensure you follow all relevant guidance." except Exception as tool_error: - return f"[ERROR]: Tool execution failed: {str(tool_error)}.\nHint: The reading tool cannot access to sandbox file, use the local path provided by original instruction instead." + return { + "text": f"[ERROR]: Tool execution failed: {str(tool_error)}.\nHint: The reading tool cannot access to sandbox file, use the local path provided by original instruction instead.", + "usage": {}, + } except Exception as session_error: - return ( - f"[ERROR]: Failed to connect to markitdown-mcp server: {str(session_error)}" - ) + return { + "text": f"[ERROR]: Failed to connect to markitdown-mcp server: {str(session_error)}", + "usage": {}, + } - return result_content + return {"text": result_content, "usage": {}} if __name__ == "__main__": diff --git a/src/tool/mcp_servers/reasoning_mcp_server.py b/src/tool/mcp_servers/reasoning_mcp_server.py index 6411ceac..6d52592b 100755 --- a/src/tool/mcp_servers/reasoning_mcp_server.py +++ b/src/tool/mcp_servers/reasoning_mcp_server.py @@ -26,7 +26,7 @@ @mcp.tool() -async def reasoning(question: str) -> str: +async def reasoning(question: str) -> dict: """This tool is for pure text-based reasoning, analysis, and logical thinking. It integrates collected information, organizes final logic, and provides planning insights. IMPORTANT: This tool cannot access the internet, read files, program, or process multimodal content. It only performs pure text reasoning. @@ -73,10 +73,34 @@ async def reasoning(question: str) -> str: # Check if content is empty and retry if so if content and content.strip(): - return content + if not hasattr(response, "usage"): + return {"text": content, "usage": {}} + else: + usage = response.usage + cache_tokens = getattr( + getattr(usage, "prompt_tokens_details", {}), + "cached_tokens", + 0, + ) + text_input_tokens = getattr(usage, "prompt_tokens", 0) + text_output_tokens = getattr(usage, "completion_tokens", 0) + return { + "text": content, + "usage": { + f"reasoning_openrouter_{OPENAI_MODEL_NAME}": { + "cache_read": cache_tokens, + "input_text": text_input_tokens, + "output_text": text_output_tokens, + "cost": getattr(usage, "cost", 0), + } + }, + } else: if attempt >= max_retries: - return f"Reasoning (OpenRouter Client) failed after {max_retries} retries: Empty response received\n" + return { + "text": f"Reasoning (OpenRouter Client) failed after {max_retries} retries: Empty response received\n", + "usage": {}, + } await asyncio.sleep( 5 * (2**attempt) ) # Exponential backoff with max 30s @@ -84,7 +108,10 @@ async def reasoning(question: str) -> str: except Exception as e: if attempt >= max_retries: - return f"Reasoning (OpenRouter Client) failed after {max_retries} retries: {e}\n" + return { + "text": f"Reasoning (OpenRouter Client) failed after {max_retries} retries: {e}\n", + "usage": {}, + } await asyncio.sleep( 5 * (2**attempt) ) # Exponential backoff with max 30s @@ -109,10 +136,29 @@ async def reasoning(question: str) -> str: # Check if content is empty and retry if so if content and content.strip(): - return content + if not hasattr(response, "usage"): + usage = {} + else: + usage_temp = response.usage + usage = { + f"reasoning_anthropic_{ANTHROPIC_MODEL_NAME}": { + "input": getattr(usage_temp, "input_tokens", 0), + "output": getattr(usage_temp, "output_tokens", 0), + "cache_read": getattr( + usage_temp, "cache_read_input_tokens", 0 + ), + "cache_write": getattr( + usage_temp, "cache_creation_input_tokens", 0 + ), + } + } + return {"text": content, "usage": usage} else: if attempt >= max_retries: - return f"[ERROR]: Reasoning (Anthropic Client) failed after {max_retries} retries: Empty response received\n" + return { + "text": f"[ERROR]: Reasoning (Anthropic Client) failed after {max_retries} retries: Empty response received\n", + "usage": {}, + } await asyncio.sleep( 5 * (2**attempt) ) # Exponential backoff with max 30s @@ -120,7 +166,10 @@ async def reasoning(question: str) -> str: except Exception as e: if attempt >= max_retries: - return f"[ERROR]: Reasoning (Anthropic Client) failed after {max_retries} retries: {e}\n" + return { + "text": f"[ERROR]: Reasoning (Anthropic Client) failed after {max_retries} retries: {e}\n", + "usage": {}, + } await asyncio.sleep( 5 * (2**attempt) ) # Exponential backoff with max 30s diff --git a/src/tool/mcp_servers/searching_mcp_server.py b/src/tool/mcp_servers/searching_mcp_server.py index 4187e4a1..7c10209d 100644 --- a/src/tool/mcp_servers/searching_mcp_server.py +++ b/src/tool/mcp_servers/searching_mcp_server.py @@ -8,6 +8,7 @@ import requests import datetime import calendar + from fastmcp import FastMCP from mcp.client.stdio import stdio_client from mcp import ClientSession, StdioServerParameters # (already imported in config.py) @@ -15,6 +16,7 @@ import asyncio from .utils.smart_request import smart_request, request_to_json from src.logging.logger import setup_mcp_logging +from collections import Counter SERPER_API_KEY = os.environ.get("SERPER_API_KEY", "") @@ -42,6 +44,13 @@ mcp = FastMCP("searching-mcp-server") +def merge_usage(*usage_dicts): + total = Counter() + for d in usage_dicts: + total.update(d) + return dict(total) + + def filter_google_search_result(result_content: str) -> str: """Filter google search result content based on environment variables. @@ -94,7 +103,7 @@ async def google_search( num: int = 10, tbs: str = None, page: int = 1, -) -> str: +) -> dict: """Perform google searches via Serper API and retrieve rich results. It is able to retrieve organic search results, people also ask, related searches, and knowledge graph. @@ -111,9 +120,10 @@ async def google_search( The search results. """ if SERPER_API_KEY == "": - return ( - "[ERROR]: SERPER_API_KEY is not set, google_search tool is not available." - ) + return { + "text": "[ERROR]: SERPER_API_KEY is not set, google_search tool is not available.", + "usage": {}, + } tool_name = "google_search" arguments = { "q": q, @@ -161,19 +171,28 @@ async def google_search( ), "Empty result from google_search tool, please try again." # Apply filtering based on environment variables filtered_result = filter_google_search_result(result_content) - return filtered_result # Success, exit retry loop + return { + "text": filtered_result, + "usage": {"SERPER": 1}, + } # Success, exit retry loop except Exception as error: retry_count += 1 if retry_count >= max_retries: - return f"[ERROR]: google_search tool execution failed after {max_retries} attempts: {str(error)}" + return { + "text": f"[ERROR]: google_search tool execution failed after {max_retries} attempts: {str(error)}", + "usage": {}, + } # Wait before retrying await asyncio.sleep(min(2**retry_count, 60)) - return "[ERROR]: Unknown error occurred in google_search tool, please try again." + return { + "text": "[ERROR]: Unknown error occurred in google_search tool, please try again.", + "usage": {}, + } @mcp.tool() -async def wiki_get_page_content(entity: str, first_sentences: int = 10) -> str: +async def wiki_get_page_content(entity: str, first_sentences: int = 10) -> dict: """Get specific Wikipedia page content for the specific entity (people, places, concepts, events) and return structured information. This tool searches Wikipedia for the given entity and returns either the first few sentences @@ -222,7 +241,7 @@ async def wiki_get_page_content(entity: str, first_sentences: int = 10) -> str: result_parts.append(f"URL: {page.url}") - return "\n\n".join(result_parts) + return {"text": "\n\n".join(result_parts), "usage": {}} except wikipedia.exceptions.DisambiguationError as e: options_list = "\n".join( @@ -238,11 +257,11 @@ async def wiki_get_page_content(entity: str, first_sentences: int = 10) -> str: search_results = wikipedia.search(entity, results=5) if search_results: output += f"Try to search {entity} in Wikipedia: {search_results}" - return output + return {"text": output, "usage": {}} except Exception: pass - return output + return {"text": output, "usage": {}} except wikipedia.exceptions.PageError: # Try a search if direct page lookup fails @@ -252,39 +271,60 @@ async def wiki_get_page_content(entity: str, first_sentences: int = 10) -> str: suggestion_list = "\n".join( [f"- {result}" for result in search_results[:5]] ) - return ( - f"Page Not Found: No Wikipedia page found for '{entity}'.\n\n" - f"Similar pages found:\n{suggestion_list}\n\n" - f"Try searching for one of these suggestions instead." - ) + return { + "text": ( + f"Page Not Found: No Wikipedia page found for '{entity}'.\n\n" + f"Similar pages found:\n{suggestion_list}\n\n" + f"Try searching for one of these suggestions instead." + ), + "usage": {}, + } else: - return ( - f"Page Not Found: No Wikipedia page found for '{entity}' " - f"and no similar pages were found. Please try a different search term." - ) + return { + "text": ( + f"Page Not Found: No Wikipedia page found for '{entity}' " + f"and no similar pages were found. Please try a different search term." + ), + "usage": {}, + } except Exception as search_error: - return ( - f"Page Not Found: No Wikipedia page found for '{entity}'. " - f"Search for alternatives also failed: {str(search_error)}" - ) + return { + "text": ( + f"Page Not Found: No Wikipedia page found for '{entity}'. " + f"Search for alternatives also failed: {str(search_error)}" + ), + "usage": {}, + } except wikipedia.exceptions.RedirectError: - return f"Redirect Error: Failed to follow redirect for '{entity}'" + return { + "text": f"Redirect Error: Failed to follow redirect for '{entity}'", + "usage": {}, + } except requests.exceptions.RequestException as e: - return f"Network Error: Failed to connect to Wikipedia: {str(e)}" + return { + "text": f"Network Error: Failed to connect to Wikipedia: {str(e)}", + "usage": {}, + } except wikipedia.exceptions.WikipediaException as e: - return f"Wikipedia Error: An error occurred while searching Wikipedia: {str(e)}" + return { + "text": f"Wikipedia Error: An error occurred while searching Wikipedia: {str(e)}", + "usage": {}, + } except Exception as e: - return f"Unexpected Error: An unexpected error occurred: {str(e)}" + return { + "text": f"Unexpected Error: An unexpected error occurred: {str(e)}", + "usage": {}, + } @mcp.tool() async def search_wiki_revision( entity: str, year: int, month: int, max_revisions: int = 50 -) -> str: +) -> dict: """Search for an entity in Wikipedia and return the revision history for a specific month. Args: @@ -357,7 +397,7 @@ async def search_wiki_revision( "rvprop": "timestamp|ids", } - content = await smart_request( + response = await smart_request( url=base_url, params=params, env={ @@ -367,32 +407,45 @@ async def search_wiki_revision( "JINA_BASE_URL": JINA_BASE_URL, }, ) + content, usage = response.get("text", None), response.get("usage", {}) data = request_to_json(content) # Check for API errors if "error" in data: - return f"[ERROR]: Wikipedia API Error: {data['error'].get('info', 'Unknown error')}" + return { + "text": f"[ERROR]: Wikipedia API Error: {data['error'].get('info', 'Unknown error')}", + "usage": usage, + } # Process the response pages = (data.get("query") or {}).get("pages", {}) if not pages: - return f"[ERROR]: No results found for entity '{entity}'" + return { + "text": f"[ERROR]: No results found for entity '{entity}'", + "usage": usage, + } # Check if page exists page_id = list(pages.keys())[0] if page_id == "-1": - return f"[ERROR]: Page Not Found: No Wikipedia page found for '{entity}'" + return { + "text": f"[ERROR]: Page Not Found: No Wikipedia page found for '{entity}'", + "usage": usage, + } page_info = pages[page_id] page_title = page_info.get("title", entity) if "revisions" not in page_info or not page_info["revisions"]: - return ( - adjustment_msg + f"Page Title: {page_title}\n\n" - f"No revisions found for '{entity}' in {year}-{month:02d}.\n\n" - f"The page may not have been edited during this time period." - ) + return { + "text": ( + adjustment_msg + f"Page Title: {page_title}\n\n" + f"No revisions found for '{entity}' in {year}-{month:02d}.\n\n" + f"The page may not have been edited during this time period." + ), + "usage": usage, + } # Format the results result_parts = [ @@ -426,27 +479,42 @@ async def search_wiki_revision( if revisions_details: result_parts.append("Revisions:\n" + "\n\n".join(revisions_details)) - return ( - adjustment_msg - + "\n\n".join(result_parts) - + "\n\nHint: You can use the `scrape_website` tool to get the webpage content of a URL." - ) + return { + "text": ( + adjustment_msg + + "\n\n".join(result_parts) + + "\n\nHint: You can use the `scrape_website` tool to get the webpage content of a URL." + ), + "usage": usage, + } except requests.exceptions.Timeout: - return f"[ERROR]: Network Error: Request timed out while fetching revision history for '{entity}'" + return { + "text": f"[ERROR]: Network Error: Request timed out while fetching revision history for '{entity}'", + "usage": {}, + } except requests.exceptions.RequestException as e: - return f"[ERROR]: Network Error: Failed to connect to Wikipedia: {str(e)}" + return { + "text": f"[ERROR]: Network Error: Failed to connect to Wikipedia: {str(e)}", + "usage": {}, + } except ValueError as e: - return f"[ERROR]: Date Error: Invalid date values - {str(e)}" + return { + "text": f"[ERROR]: Date Error: Invalid date values - {str(e)}", + "usage": {}, + } except Exception as e: - return f"[ERROR]: Unexpected Error: An unexpected error occurred: {str(e)}" + return { + "text": f"[ERROR]: Unexpected Error: An unexpected error occurred: {str(e)}", + "usage": {}, + } @mcp.tool() -async def search_archived_webpage(url: str, year: int, month: int, day: int) -> str: +async def search_archived_webpage(url: str, year: int, month: int, day: int) -> dict: """Search the Wayback Machine (archive.org) for archived versions of a webpage, optionally for a specific date. Args: @@ -459,9 +527,15 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> str: Formatted archive information including archived URL, timestamp, and status. Returns error message if URL not found or other issues occur. """ + + usage = {} + # Handle empty URL if not url: - return f"[ERROR]: Invalid URL: '{url}'. URL cannot be empty." + return { + "text": f"[ERROR]: Invalid URL: '{url}'. URL cannot be empty.", + "usage": usage, + } # Auto-add https:// if no protocol is specified protocol_hint = "" @@ -521,7 +595,10 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> # Validate the final adjusted date datetime.datetime(year, month, day) except ValueError as e: - return f"[ERROR]: Invalid date: {year}-{month:02d}-{day:02d}. {str(e)}" + return { + "text": f"[ERROR]: Invalid date: {year}-{month:02d}-{day:02d}. {str(e)}", + "usage": usage, + } # Prepare adjustment message if any changes were made if adjustments: @@ -538,7 +615,7 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> retry_count = 0 # retry 5 times if the response is not valid while retry_count < 5: - content = await smart_request( + response = await smart_request( url=base_url, params={"url": url, "timestamp": date}, env={ @@ -548,7 +625,12 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> "JINA_BASE_URL": JINA_BASE_URL, }, ) + content, usage_temp = ( + response.get("text", None), + response.get("usage", {}), + ) data = request_to_json(content) + usage = merge_usage(usage, usage_temp) if ( "archived_snapshots" in data and "closest" in data["archived_snapshots"] @@ -564,17 +646,20 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> available = closest.get("available", True) if not available: - return ( - hint_message - + adjustment_msg - + ( - f"Archive Status: Snapshot exists but is not available\n\n" - f"Original URL: {url}\n" - f"Requested Date: {year:04d}-{month:02d}-{day:02d}\n" - f"Closest Snapshot: {archived_timestamp}\n\n" - f"Try a different date" - ) - ) + return { + "text": ( + hint_message + + adjustment_msg + + ( + f"Archive Status: Snapshot exists but is not available\n\n" + f"Original URL: {url}\n" + f"Requested Date: {year:04d}-{month:02d}-{day:02d}\n" + f"Closest Snapshot: {archived_timestamp}\n\n" + f"Try a different date" + ) + ), + "usage": usage, + } # Format timestamp for better readability try: @@ -583,25 +668,28 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> except Exception: formatted_time = archived_timestamp - return ( - protocol_hint - + hint_message - + adjustment_msg - + ( - f"Archive Found: Archived version located\n\n" - f"Original URL: {url}\n" - f"Requested Date: {year:04d}-{month:02d}-{day:02d}\n" - f"Archived URL: {archived_url}\n" - f"Archived Timestamp: {formatted_time}\n" - ) - + "\n\nHint: You can also use the `scrape_website` tool to get the webpage content of a URL." - ) + return { + "text": ( + protocol_hint + + hint_message + + adjustment_msg + + ( + f"Archive Found: Archived version located\n\n" + f"Original URL: {url}\n" + f"Requested Date: {year:04d}-{month:02d}-{day:02d}\n" + f"Archived URL: {archived_url}\n" + f"Archived Timestamp: {formatted_time}\n" + ) + + "\n\nHint: You can also use the `scrape_website` tool to get the webpage content of a URL." + ), + "usage": usage, + } # Search without specific date (most recent) retry_count = 0 # retry 5 times if the response is not valid while retry_count < 5: - content = await smart_request( + response = await smart_request( url=base_url, params={"url": url}, env={ @@ -611,6 +699,8 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> "JINA_BASE_URL": JINA_BASE_URL, }, ) + content, usage_temp = response.get("text", None), response.get("usage", {}) + usage = merge_usage(usage, usage_temp) data = request_to_json(content) if "archived_snapshots" in data and "closest" in data["archived_snapshots"]: break @@ -624,16 +714,19 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> available = closest.get("available", True) if not available: - return ( - protocol_hint - + hint_message - + ( - f"Archive Status: Most recent snapshot exists but is not available\n\n" - f"Original URL: {url}\n" - f"Most Recent Snapshot: {archived_timestamp}\n\n" - f"The URL may have been archived but access is restricted" - ) - ) + return { + "text": ( + protocol_hint + + hint_message + + ( + f"Archive Status: Most recent snapshot exists but is not available\n\n" + f"Original URL: {url}\n" + f"Most Recent Snapshot: {archived_timestamp}\n\n" + f"The URL may have been archived but access is restricted" + ) + ), + "usage": usage, + } # Format timestamp for better readability try: @@ -642,43 +735,58 @@ async def search_archived_webpage(url: str, year: int, month: int, day: int) -> except Exception: formatted_time = archived_timestamp - return ( - protocol_hint - + hint_message - + ( - f"Archive Found: Most recent archived version\n\n" - f"Original URL: {url}\n" - f"Archived URL: {archived_url}\n" - f"Archived Timestamp: {formatted_time}\n" - ) - + "\n\nHint: You can also use the `scrape_website` tool to get the webpage content of a URL." - ) + return { + "text": ( + protocol_hint + + hint_message + + ( + f"Archive Found: Most recent archived version\n\n" + f"Original URL: {url}\n" + f"Archived URL: {archived_url}\n" + f"Archived Timestamp: {formatted_time}\n" + ) + + "\n\nHint: You can also use the `scrape_website` tool to get the webpage content of a URL." + ), + "usage": usage, + } else: - return ( - protocol_hint - + hint_message - + ( - f"Archive Not Found: No archived versions available\n\n" - f"Original URL: {url}\n\n" - f"The URL '{url}' has not been archived by the Wayback Machine.\n" - f"You may want to:\n" - f"- Check if the URL is correct\n" - f"- Try a different URL and date\n" - ) - ) + return { + "text": ( + protocol_hint + + hint_message + + ( + f"Archive Not Found: No archived versions available\n\n" + f"Original URL: {url}\n\n" + f"The URL '{url}' has not been archived by the Wayback Machine.\n" + f"You may want to:\n" + f"- Check if the URL is correct\n" + f"- Try a different URL and date\n" + ) + ), + "usage": usage, + } except requests.exceptions.RequestException as e: - return f"[ERROR]: Network Error: Failed to connect to Wayback Machine: {str(e)}" + return { + "text": f"[ERROR]: Network Error: Failed to connect to Wayback Machine: {str(e)}", + "usage": usage, + } except ValueError as e: - return f"[ERROR]: Data Error: Failed to parse response from Wayback Machine: {str(e)}" + return { + "text": f"[ERROR]: Data Error: Failed to parse response from Wayback Machine: {str(e)}", + "usage": usage, + } except Exception as e: - return f"[ERROR]: Unexpected Error: An unexpected error occurred: {str(e)}" + return { + "text": f"[ERROR]: Unexpected Error: An unexpected error occurred: {str(e)}", + "usage": usage, + } @mcp.tool() -async def scrape_website(url: str) -> str: +async def scrape_website(url: str) -> dict: """This tool is used to scrape a website for its content. Search engines are not supported by this tool. This tool can also be used to get YouTube video non-visual information (however, it may be incomplete), such as video subtitles, titles, descriptions, key moments, etc. Args: diff --git a/src/tool/mcp_servers/utils/smart_request.py b/src/tool/mcp_servers/utils/smart_request.py index 728856a7..0e44d97b 100644 --- a/src/tool/mcp_servers/utils/smart_request.py +++ b/src/tool/mcp_servers/utils/smart_request.py @@ -25,7 +25,10 @@ def request_to_json(content: str) -> dict: async def smart_request(url: str, params: dict = None, env: dict = None) -> str: # Handle empty URL if not url: - return f"[ERROR]: Invalid URL: '{url}'. URL cannot be empty." + return { + "text": f"[ERROR]: Invalid URL: '{url}'. URL cannot be empty.", + "usage": {}, + } if env: JINA_API_KEY = env.get("JINA_API_KEY", "") @@ -36,7 +39,10 @@ async def smart_request(url: str, params: dict = None, env: dict = None) -> str: SERPER_API_KEY = "" if JINA_API_KEY == "" and SERPER_API_KEY == "": - return "[ERROR]: JINA_API_KEY and SERPER_API_KEY are not set, smart_request is not available." + return { + "text": "[ERROR]: JINA_API_KEY and SERPER_API_KEY are not set, smart_request is not available.", + "usage": {}, + } IS_MIRO_API = True if "miro" in JINA_BASE_URL else False @@ -52,7 +58,10 @@ async def smart_request(url: str, params: dict = None, env: dict = None) -> str: # Check for restricted domains if "huggingface.co/datasets" in url or "huggingface.co/spaces" in url: - return "You are trying to scrape a Hugging Face dataset for answers, please do not use the scrape tool for this purpose." + return { + "text": "You are trying to scrape a Hugging Face dataset for answers, please do not use the scrape tool for this purpose.", + "usage": {}, + } retry_count = 0 max_retries = 3 @@ -68,24 +77,36 @@ async def smart_request(url: str, params: dict = None, env: dict = None) -> str: ): youtube_hint = "[NOTE]: If you need to get information about its visual or audio content, please use tool 'visual_audio_youtube_analyzing' instead. This tool may not be able to provide visual and audio content of a YouTube Video.\n\n" - content, jina_err = await scrape_jina(url, JINA_API_KEY, JINA_BASE_URL) + jina_response = await scrape_jina(url, JINA_API_KEY, JINA_BASE_URL) + content = jina_response.get("text", None) + jina_err = jina_response.get("error", None) + jina_usage = jina_response.get("usage", {}) if jina_err: error_msg += f"Failed to get content from Jina.ai: {jina_err}\n" elif content is None or content.strip() == "": error_msg += "No content got from Jina.ai.\n" else: - return protocol_hint + youtube_hint + content + return { + "text": protocol_hint + youtube_hint + content, + "usage": jina_usage, + } if not IS_MIRO_API: # Try Serper API for scraping if not using Miro API # (Miro API does not support caching Serper scraping results) - content, serper_err = await scrape_serper(url, SERPER_API_KEY) + serper_response = await scrape_serper(url, SERPER_API_KEY) + content = serper_response.get("text", None) + serper_err = serper_response.get("error", None) + serper_usage = serper_response.get("usage", {}) if serper_err: error_msg += f"Failed to get content from SERPER: {serper_err}\n" elif content is None or content.strip() == "": error_msg += "No content got from SERPER.\n" else: - return protocol_hint + youtube_hint + content + return { + "text": protocol_hint + youtube_hint + content, + "usage": serper_usage, + } content, request_err = scrape_request(url) if request_err: @@ -93,29 +114,29 @@ async def smart_request(url: str, params: dict = None, env: dict = None) -> str: elif content is None or content.strip() == "": error_msg += "No content got from requests.\n" else: - return protocol_hint + youtube_hint + content + return {"text": protocol_hint + youtube_hint + content, "usage": {}} raise Exception(error_msg) except Exception as e: retry_count += 1 if retry_count >= max_retries: - return f"[ERROR]: {str(e)}" + return {"text": f"[ERROR]: {str(e)}", "usage": {}} else: await asyncio.sleep(4**retry_count) -async def scrape_jina( - url: str, jina_api_key: str, jina_base_url: str -) -> tuple[str, str]: +async def scrape_jina(url: str, jina_api_key: str, jina_base_url: str) -> dict: # Use Jina.ai reader API to convert URL to LLM-friendly text if jina_api_key == "": - return ( - None, - "JINA_API_KEY is not set, JINA scraping is not available.", - ) + return { + "text": None, + "error": "JINA_API_KEY is not set, JINA scraping is not available.", + "usage": {}, + } jina_headers = { + "Accept": "application/json", "Authorization": f"Bearer {jina_api_key}", "X-Base": "final", "X-Engine": "browser", @@ -129,12 +150,13 @@ async def scrape_jina( response = requests.get(jina_url, headers=jina_headers, timeout=120) if response.status_code == 422: # Return as error to allow fallback to other tools and retries - return ( - None, - "Tool execution failed with Jina 422 error, which may indicate the URL is a file. This tool does not support files. If you believe the URL might point to a file, you should try using other applicable tools, or try to process it in the sandbox.", - ) + return { + "text": None, + "error": "Tool execution failed with Jina 422 error, which may indicate the URL is a file. This tool does not support files. If you believe the URL might point to a file, you should try using other applicable tools, or try to process it in the sandbox.", + "usage": {}, + } response.raise_for_status() - content = response.text + content = response.json().get("data", {}).get("content", "") if ( "Warning: This page maybe not yet fully loaded, consider explicitly specify a timeout." in content @@ -142,27 +164,42 @@ async def scrape_jina( # Try with longer timeout response = requests.get(jina_url, headers=jina_headers, timeout=300) if response.status_code == 422: - return ( - None, - "Tool execution failed with Jina 422 error, which may indicate the URL is a file. This tool does not support files. If you believe the URL might point to a file, you should try using other applicable tools, or try to process it in the sandbox.", - ) + return { + "text": None, + "error": "Tool execution failed with Jina 422 error, which may indicate the URL is a file. This tool does not support files. If you believe the URL might point to a file, you should try using other applicable tools, or try to process it in the sandbox.", + "usage": {}, + } response.raise_for_status() - content = response.text - return content, None + content = response.json().get("data", {}).get("content", "") + return { + "text": content, + "error": None, + "usage": { + "JINA": response.json() + .get("meta", {}) + .get("usage", {}) + .get("tokens", 0) + }, + } except Exception as e: - return None, f"Failed to get content from Jina.ai: {str(e)}\n" + return { + "text": None, + "error": f"Failed to get content from Jina.ai: {str(e)}\n", + "usage": {}, + } -async def scrape_serper(url: str, serper_api_key: str) -> tuple[str, str]: +async def scrape_serper(url: str, serper_api_key: str) -> dict: """This function uses SERPER for scraping a website. Args: url: The URL of the website to scrape. """ if serper_api_key == "": - return ( - None, - "SERPER_API_KEY is not set, SERPER scraping is not available.", - ) + return { + "text": None, + "error": "SERPER_API_KEY is not set, SERPER scraping is not available.", + "usage": {}, + } server_params = StdioServerParameters( command="npx", @@ -179,9 +216,9 @@ async def scrape_serper(url: str, serper_api_key: str) -> tuple[str, str]: result_content = ( tool_result.content[-1].text if tool_result.content else "" ) - return result_content, None + return {"text": result_content, "error": None, "usage": {"SERPER": 1}} except Exception as e: - return None, f"Tool execution failed: {str(e)}" + return {"text": None, "error": f"Tool execution failed: {str(e)}", "usage": {}} def scrape_request(url: str) -> tuple[str, str]: diff --git a/src/tool/mcp_servers/vision_mcp_server.py b/src/tool/mcp_servers/vision_mcp_server.py index a60a4e3a..3393443b 100755 --- a/src/tool/mcp_servers/vision_mcp_server.py +++ b/src/tool/mcp_servers/vision_mcp_server.py @@ -13,6 +13,7 @@ import requests import asyncio from src.logging.logger import setup_mcp_logging +from collections import Counter # Anthropic credentials @@ -30,13 +31,20 @@ OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o") GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "") -GEMINI_MODEL_NAME = os.environ.get("GEMINI_MODEL_NAME", "gemini-2.5-pro") +GEMINI_MODEL_NAME = os.environ.get("GEMINI_MODEL_NAME", "gemini-3-pro-preview") # Initialize FastMCP server setup_mcp_logging(tool_name=os.path.basename(__file__)) mcp = FastMCP("vision-mcp-server") +def merge_usage(*usage_dicts): + total = Counter() + for d in usage_dicts: + total.update(d) + return dict(total) + + async def detect_image_format(file_path: str) -> str: try: with open(file_path, "rb") as f: @@ -71,7 +79,7 @@ async def guess_mime_media_type_from_extension(file_path: str) -> str: return "image/jpeg" # Default to JPEG if unknown -async def call_claude_vision(image_path_or_url: str, question: str) -> str: +async def call_claude_vision(image_path_or_url: str, question: str) -> dict: """Call Claude vision API.""" messages_for_llm = [ { @@ -104,7 +112,10 @@ async def call_claude_vision(image_path_or_url: str, question: str) -> str: data=image_data, ) elif "home/user" in image_path_or_url: - return "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } else: # Otherwise, assume it's a URL # Convert to https URL for Claude vision API url = image_path_or_url @@ -127,6 +138,19 @@ async def call_claude_vision(image_path_or_url: str, question: str) -> str: max_tokens=4096, messages=messages_for_llm, ) + if not hasattr(response, "usage"): + usage = {} + else: + usage_temp = response.usage + usage = { + "input": getattr(usage_temp, "input_tokens", 0), + "output": getattr(usage_temp, "output_tokens", 0), + "cache_read": getattr(usage_temp, "cache_read_input_tokens", 0), + "cache_write": getattr( + usage_temp, "cache_creation_input_tokens", 0 + ), + } + result = response.content[0].text # Check if response.text is None or empty after stripping @@ -140,13 +164,13 @@ async def call_claude_vision(image_path_or_url: str, question: str) -> str: break await asyncio.sleep(4**attempt) # Exponential backoff - return result + return {"text": result, "usage": usage} except Exception as e: - return f"[ERROR]: Claude Error: {e}" + return {"text": f"[ERROR]: Claude Error: {e}", "usage": {}} -async def call_openai_vision(image_path_or_url: str, question: str) -> str: +async def call_openai_vision(image_path_or_url: str, question: str) -> dict: """Call OpenAI vision API.""" try: if os.path.exists(image_path_or_url): # Check if the file exists locally @@ -158,7 +182,10 @@ async def call_openai_vision(image_path_or_url: str, question: str) -> str: "image_url": {"url": f"data:{mime_type};base64,{image_data}"}, } elif "home/user" in image_path_or_url: - return "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } else: # Otherwise, assume it's a URL image_content = { "type": "image_url", @@ -188,14 +215,27 @@ async def call_openai_vision(image_path_or_url: str, question: str) -> str: max_tokens=4096, messages=messages_for_llm, ) - - return response.choices[0].message.content + if not hasattr(response, "usage"): + usage = {} + else: + usage = response.usage + cache_tokens = getattr( + getattr(usage, "prompt_tokens_details", {}), "cached_tokens", 0 + ) + text_input_tokens = getattr(usage, "prompt_tokens", 0) + text_output_tokens = getattr(usage, "completion_tokens", 0) + usage = { + "cache_read": cache_tokens, + "input_text": text_input_tokens, + "output_text": text_output_tokens, + } + return {"text": response.choices[0].message.content, "usage": usage} except Exception as e: - return f"[ERROR]: OpenAI Error: {e}" + return {"text": f"[ERROR]: OpenAI Error: {e}", "usage": {}} -async def call_gemini_vision(image_path_or_url: str, question: str) -> str: +async def call_gemini_vision(image_path_or_url: str, question: str) -> dict: """Call Gemini vision API.""" try: mime_type = await detect_image_format(image_path_or_url) @@ -207,7 +247,10 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: mime_type=mime_type, ) elif "home/user" in image_path_or_url: - return "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction" + return { + "text": "The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction", + "usage": {}, + } else: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" @@ -232,7 +275,10 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: mime_type=mime_type, ) except Exception as e: - return f"[ERROR]: Failed to get image data {image_path_or_url}: {e}.\nNote: The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction or http url. If you are using http url, make sure it is an image file url." + return { + "text": f"[ERROR]: Failed to get image data {image_path_or_url}: {e}.\nNote: The visual_question_answering tool cannot access to sandbox file, please use the local path provided by original instruction or http url. If you are using http url, make sure it is an image file url.", + "usage": {}, + } retry_count = 0 max_retry = 3 # 3 retries with smart timing to avoid thundering herd @@ -241,7 +287,7 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: client = genai.Client(api_key=GEMINI_API_KEY) response = client.models.generate_content( - model="gemini-2.5-pro", + model="gemini-3-pro-preview", contents=[ image, types.Part(text=question), @@ -249,11 +295,19 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: # config=types.GenerateContentConfig(temperature=0.1), ) + usage = response.usage_metadata + input_tokens = usage.prompt_token_count + output_tokens = usage.total_token_count - input_tokens + usage = { + "input": input_tokens, + "output": output_tokens, + } + # Check if response.text is None or empty after stripping if response.text is None or response.text.strip() == "": raise Exception("Response text is None or empty") - return response.text + return {"text": response.text, "usage": usage} except Exception as e: # Only retry for rate limit and server errors, or empty response @@ -265,7 +319,10 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: ): retry_count += 1 if retry_count > max_retry: - return f"[ERROR]: Gemini Error after {retry_count} retries: {e}" + return { + "text": f"[ERROR]: Gemini Error after {retry_count} retries: {e}", + "usage": {}, + } # Rate limit is per minute, spread 5 requests across different minute windows if retry_count == 1: @@ -280,11 +337,11 @@ async def call_gemini_vision(image_path_or_url: str, question: str) -> str: await asyncio.sleep(wait_time) else: - return f"[ERROR]: Gemini Error: {e}" + return {"text": f"[ERROR]: Gemini Error: {e}", "usage": {}} @mcp.tool() -async def visual_question_answering(image_path_or_url: str, question: str) -> str: +async def visual_question_answering(image_path_or_url: str, question: str) -> dict: """This tool is used to ask question about an image or a video and get the answer with Gemini vision language models. It also automatically performs OCR (text extraction) on the image for additional context. Args: @@ -310,13 +367,19 @@ async def visual_question_answering(image_path_or_url: str, question: str) -> st Return only the extracted text content, maintaining the original formatting and structure as much as possible. If there is no text in the image, respond with 'No text found'. If there are areas where text may exist but is unreadable or ambiguous, describe these as well.""" if ANTHROPIC_API_KEY: - ocr_result = await call_claude_vision(image_path_or_url, ocr_prompt) + response = await call_claude_vision(image_path_or_url, ocr_prompt) elif OPENAI_API_KEY: - ocr_result = await call_openai_vision(image_path_or_url, ocr_prompt) + response = await call_openai_vision(image_path_or_url, ocr_prompt) elif GEMINI_API_KEY: - ocr_result = await call_gemini_vision(image_path_or_url, ocr_prompt) + response = await call_gemini_vision(image_path_or_url, ocr_prompt) else: - return "[ERROR]: No API key is set, visual_question_answering tool is not available." + return { + "text": "[ERROR]: No API key is set, visual_question_answering tool is not available.", + "usage": {}, + } + + ocr_result = response.get("text", "") + ocr_usage = response.get("usage", {}) vqa_prompt = f"""You are a highly attentive visual analysis assistant. Your task is to carefully examine the image and provide a thorough, accurate answer to the question. @@ -341,22 +404,45 @@ async def visual_question_answering(image_path_or_url: str, question: str) -> st # Before answering, carefully analyze both the question and the image. Identify and briefly list potential subtle or easily overlooked VQA pitfalls or ambiguities that could arise in interpreting this question or image (e.g., confusing similar objects, missing small details, misreading text, ambiguous context, etc.). For each, suggest a method or strategy to avoid or mitigate these issues. Only after this analysis, proceed to answer the question, providing a thorough and detailed observation and reasoning process. if ANTHROPIC_API_KEY: - vqa_result = await call_claude_vision(image_path_or_url, vqa_prompt) + response = await call_claude_vision(image_path_or_url, vqa_prompt) + vqa_result = response.get("text", "") + vqa_usage = response.get("usage", {}) + usage = { + f"vision_anthropic_{ANTHROPIC_MODEL_NAME}": merge_usage( + ocr_usage, vqa_usage + ) + } elif OPENAI_API_KEY: - vqa_result = await call_openai_vision(image_path_or_url, vqa_prompt) + response = await call_openai_vision(image_path_or_url, vqa_prompt) + vqa_result = response.get("text", "") + vqa_usage = response.get("usage", {}) + usage = { + f"vision_openai_{OPENAI_MODEL_NAME}": merge_usage(ocr_usage, vqa_usage) + } elif GEMINI_API_KEY: - vqa_result = await call_gemini_vision(image_path_or_url, vqa_prompt) + response = await call_gemini_vision(image_path_or_url, vqa_prompt) + vqa_result = response.get("text", "") + vqa_usage = response.get("usage", {}) + usage = { + f"vision_gemini_{GEMINI_MODEL_NAME}": merge_usage(ocr_usage, vqa_usage) + } else: - return "[ERROR]: No API key is set, visual_question_answering tool is not available." + return { + "text": "[ERROR]: No API key is set, visual_question_answering tool is not available.", + "usage": {}, + } - return f"OCR results:\n{ocr_result}\n\nVQA result:\n{vqa_result}" + return { + "text": f"OCR results:\n{ocr_result}\n\nVQA result:\n{vqa_result}", + "usage": usage, + } # The tool visual_audio_youtube_analyzing only support single YouTube URL as input for now, though GEMINI can support multiple URLs up to 10 per request. @mcp.tool() async def visual_audio_youtube_analyzing( url: str, question: str = "", provide_transcribe: bool = False -) -> str: +) -> dict: """Analyzes public YouTube video audiovisual content to answer questions or provide transcriptions. This tool processes both audio tracks and visual frames from YouTube videos. This tool could be primarily used when analyzing YouTube video content. Only supports YouTube Video URLs containing youtube.com/watch, youtube.com/shorts, or youtube.com/live for now. Args: @@ -368,19 +454,30 @@ async def visual_audio_youtube_analyzing( The answer to the question or the transcription of the video. """ if GEMINI_API_KEY == "": - return "[ERROR]: GEMINI_API_KEY is not set, visual_audio_youtube_analyzing tool is not available." + return { + "text": "[ERROR]: GEMINI_API_KEY is not set, visual_audio_youtube_analyzing tool is not available.", + "usage": {}, + } if ( "youtube.com/watch" not in url and "youtube.com/shorts" not in url and "youtube.com/live" not in url ): - return f"[ERROR]: Invalid URL: '{url}'. YouTube Video URL must contain youtube.com/watch, youtube.com/shorts, or youtube.com/live" + return { + "text": f"[ERROR]: Invalid URL: '{url}'. YouTube Video URL must contain youtube.com/watch, youtube.com/shorts, or youtube.com/live.", + "usage": {}, + } if question == "" and not provide_transcribe: - return "[ERROR]: You must provide a question to ask about the video content or set provide_transcribe to True." + return { + "text": "[ERROR]: You must provide a question to ask about the video content or set provide_transcribe to True.", + "usage": {}, + } client = genai.Client(api_key=GEMINI_API_KEY) + usage_transcribe = {} + usage_answer = {} if provide_transcribe: # prompt from GEMINI official document prompt = "Transcribe the audio from this video, giving timestamps for salient events in the video. Also provide visual descriptions." @@ -389,7 +486,7 @@ async def visual_audio_youtube_analyzing( while retry_count <= max_retry: try: transcribe_response = client.models.generate_content( - model="gemini-2.5-pro", + model="gemini-3-pro-preview", contents=types.Content( parts=[ types.Part(file_data=types.FileData(file_uri=url)), @@ -397,6 +494,15 @@ async def visual_audio_youtube_analyzing( ] ), ) + usage_transcribe = transcribe_response.usage_metadata + input_tokens_transcribe = usage_transcribe.prompt_token_count + output_tokens_transcribe = ( + usage_transcribe.total_token_count - input_tokens_transcribe + ) + usage_transcribe = { + "input": input_tokens_transcribe, + "output": output_tokens_transcribe, + } # Check if response.text is None or empty after stripping if ( @@ -455,7 +561,7 @@ async def visual_audio_youtube_analyzing( while retry_count <= max_retry: try: response = client.models.generate_content( - model="gemini-2.5-pro", + model="gemini-3-pro-preview", contents=types.Content( parts=[ types.Part(file_data=types.FileData(file_uri=url)), @@ -463,7 +569,15 @@ async def visual_audio_youtube_analyzing( ] ), ) - + usage_answer = response.usage_metadata + input_tokens_answer = usage_answer.prompt_token_count + output_tokens_answer = ( + usage_answer.total_token_count - input_tokens_answer + ) + usage_answer = { + "input": input_tokens_answer, + "output": output_tokens_answer, + } # Check if response.text is None or empty after stripping if response.text is None or response.text.strip() == "": raise Exception("Response text is None or empty") @@ -511,7 +625,14 @@ async def visual_audio_youtube_analyzing( break hint = "\n\nHint: Large videos may trigger rate limits causing failures. If you need more website information rather than video visual content itself (such as video subtitles, titles, descriptions, key moments), you can also call tool `scrape_website` tool." - return transcribe_content + answer_content + hint + return { + "text": transcribe_content + answer_content + hint, + "usage": { + "youtube_gemini_gemini-3-pro-preview": merge_usage( + usage_transcribe, usage_answer + ) + }, + } if __name__ == "__main__": diff --git a/utils/usage/calculate_usage_from_log.py b/utils/usage/calculate_usage_from_log.py new file mode 100644 index 00000000..932b91a5 --- /dev/null +++ b/utils/usage/calculate_usage_from_log.py @@ -0,0 +1,309 @@ +import glob +import json +import os +import sys +import re +import ast +import types + + +def str_to_pricing(pricing_str): + if "fee" not in pricing_str: + return float(eval(pricing_str)) + else: + return eval(f"lambda fee: {pricing_str}") + + +def _update_tool_usage(total_usage: dict, usage: dict): + for key, value in usage.items(): + if key not in total_usage: + total_usage[key] = value + else: + if not isinstance(value, dict): + try: + total_usage[key] += value + except TypeError: + raise TypeError( + f"Type error when updating tool usage with {total_usage[key]} and {value}" + ) + + elif isinstance(value, dict) and "sandbox_" not in key: + for sub_key, sub_value in value.items(): + if sub_key not in total_usage[key]: + total_usage[key][sub_key] = sub_value + else: + try: + total_usage[key][sub_key] += sub_value + except TypeError: + raise TypeError( + f"Type error when updating tool usage with {total_usage[key][sub_key]} and {sub_value}" + ) + else: + for sub_key, sub_value in value.items(): + total_usage[key][sub_key] = sub_value + + +def extract_tool_usage_from_log(run_dir, tool_usage_dict): + # Traverse all task_{task_id}_attempt_*.log files to extract score + log_files = glob.glob(os.path.join(run_dir, "task_*_attempt_*.json")) + for log_file in log_files: + task_id = log_file.split("/")[-1].split("_")[1] + tool_usage_dict[task_id] = {} + total_usage = {} + with open(log_file, "r") as f: + data = json.load(f) + for d in data["step_logs"]: + if d["step_name"] == "tool_usage": + log = d["message"] + log = ast.literal_eval(log) + _update_tool_usage(total_usage, log) + for tool, usage in total_usage.items(): + if tool.startswith("sandbox_"): + if f"E2B_{usage["cpu"]}_{usage["mem"]}" not in tool_usage_dict[task_id]: + tool_usage_dict[task_id][f"E2B_{usage["cpu"]}_{usage["mem"]}"] = 0 + tool_usage_dict[task_id][f"E2B_{usage["cpu"]}_{usage["mem"]}"] += ( + usage["end_time"] - usage["start_time"] + ) + continue + if tool not in tool_usage_dict[task_id]: + tool_usage_dict[task_id][tool] = usage + else: + if not isinstance(usage, dict): + try: + tool_usage_dict[task_id][tool] += usage + except TypeError: + raise TypeError( + f"Type error when updating tool usage with {tool_usage_dict[task_id][tool]} and {usage}" + ) + else: + for sub_key, sub_value in usage.items(): + if sub_key not in tool_usage_dict[task_id][tool]: + tool_usage_dict[task_id][tool][sub_key] = sub_value + else: + try: + tool_usage_dict[task_id][tool][sub_key] += sub_value + except TypeError: + raise TypeError( + f"Type error when updating tool usage with {tool_usage_dict[task_id][tool][sub_key]} and {sub_value}" + ) + + +def extract_llm_usage_from_log(run_dir, llm_usage_dict): + # Traverse all task_{task_id}_attempt_*.log files to extract score + log_files = glob.glob(os.path.join(run_dir, "task_*_attempt_*.json")) + for log_file in log_files: + task_id = log_file.split("/")[-1].split("_")[1] + llm_usage_dict[task_id] = {} + with open(log_file, "r") as f: + data = json.load(f) + for d in data["step_logs"]: + if d["step_name"] == "usage_calculation": + log = d["message"] + model_match = re.search(r"Usage log:\s*\[(.*?)\]", log) + if model_match: + model_raw = model_match.group(1) + model_formatted = model_raw.replace(" | ", "|") + else: + raise ValueError(f"Model not found in log: {log}") + + cached_read_match = re.search(r"Cached Read:\s*(\d+)", log) + cached_read_input = ( + int(cached_read_match.group(1)) if cached_read_match else None + ) + cached_write_match = re.search(r"Cached Write:\s*(\d+)", log) + cached_write_input = ( + int(cached_write_match.group(1)) if cached_write_match else None + ) + + uncached_match = re.search(r"Uncached:\s*(\d+)", log) + uncached_input = ( + int(uncached_match.group(1)) if uncached_match else None + ) + + total_output_match = re.search(r"Total Output:\s*(\d+)", log) + total_output = ( + int(total_output_match.group(1)) if total_output_match else None + ) + + total_fee_match = re.search(r"Total Fee:\s*(\d+(?:\.\d+)?)", log) + total_fee = float(total_fee_match.group(1)) if total_fee_match else None + + if model_formatted in llm_usage_dict[task_id]: + llm_usage_dict[task_id][model_formatted]["cached_read_input"] += ( + cached_read_input + ) + llm_usage_dict[task_id][model_formatted]["cached_write_input"] += ( + cached_write_input + ) + llm_usage_dict[task_id][model_formatted]["uncached_input"] += ( + uncached_input + ) + llm_usage_dict[task_id][model_formatted]["total_output"] += ( + total_output + ) + llm_usage_dict[task_id][model_formatted]["total_fee"] += total_fee + else: + llm_usage_dict[task_id][model_formatted] = { + "cached_read_input": cached_read_input, + "cached_write_input": cached_write_input, + "uncached_input": uncached_input, + "total_output": total_output, + "total_fee": total_fee, + } + + +def calculate_tool_fee(usage_dict): + total_fee = 0 + with open("utils/usage/pricing.json", "r") as f: + pricing = json.load(f) + pricing = pricing["tool"] + for _, _usage in usage_dict.items(): + for tool, usage in _usage.items(): + ToolInPricing = False + if "openrouter" in tool: + total_fee += usage["cost"] + ToolInPricing = True + elif "E2B" in tool: + mem = int(int(tool.split("_")[-1]) / (1024 * 1024)) + cpu = int(tool.split("_")[1]) + total_fee += ( + eval(pricing["E2B"]["cpu"]) * cpu + + eval(pricing["E2B"]["mem"]) * mem + ) + ToolInPricing = True + else: + for key in pricing.keys(): + if key.lower() in tool.lower(): + if not isinstance(pricing[key], dict): + pricing_temp = str_to_pricing(pricing[key]) + total_fee += ( + pricing_temp * usage + if not isinstance(pricing_temp, types.FunctionType) + else pricing_temp(usage) * usage + ) + ToolInPricing = True + else: + for model, fee in pricing[key].items(): + if model in tool: + if isinstance(fee, dict): + for i in fee.keys(): + if i not in usage: + continue + # total_fee += eval(fee[i]) * usage[i] + pricing_temp = str_to_pricing(fee[i]) + total_fee += ( + pricing_temp * usage[i] + if not isinstance( + pricing_temp, types.FunctionType + ) + else pricing_temp(usage[i]) * usage[i] + ) + else: + # total_fee += eval(fee) * usage + pricing_temp = str_to_pricing(fee) + total_fee += ( + pricing_temp * usage + if not isinstance( + pricing_temp, types.FunctionType + ) + else pricing_temp(usage) * usage + ) + ToolInPricing = True + break + break + if not ToolInPricing: + raise ValueError(f"Tool {tool} not found in pricing") + return total_fee + + +def calculate_llm_fee(usage_dict): + total_fee = 0 + with open("utils/usage/pricing.json", "r") as f: + pricing = json.load(f) + pricing = pricing["llm"] + for _, _usage in usage_dict.items(): + for model, usage in _usage.items(): + FindModel = False + if usage["total_fee"] > 0: + assert ( + "openrouter" in model.lower() + ), f"Model {model} not found in openrouter but has total fee!" + total_fee += usage["total_fee"] * 1.055 + FindModel = True + else: + if "openrouter" not in model.lower(): + model_client = model.split("|")[0] + model_name = model.split("|")[1] + for client, model_dict in pricing.items(): + if client.lower() in model_client.lower(): + for name, fee in model_dict.items(): + if name.lower() == model_name.lower(): + FindModel = True + for token_type in fee.keys(): + if token_type == "total_fee": + continue + # total_fee += eval(fee[token_type]) * usage[token_type] + pricing_temp = str_to_pricing(fee[token_type]) + total_fee += ( + pricing_temp * usage[token_type] + if not isinstance( + pricing_temp, types.FunctionType + ) + else pricing_temp(usage[token_type]) + * usage[token_type] + ) + break + break + if not FindModel: + raise ValueError(f"Model {model} not found in pricing") + return total_fee + + +def calculate_average_fee(results_dir): + if not os.path.exists(results_dir): + print(f"Results directory does not exist: {results_dir}") + sys.exit(1) + + # print(f"Analyzing results from: {results_dir}") + + tool_usage_dict = {} + llm_usage_dict = {} + extract_tool_usage_from_log(results_dir, tool_usage_dict) + extract_llm_usage_from_log(results_dir, llm_usage_dict) + # print(tool_usage_dict) + # print(llm_usage_dict) + total_tool_fee = calculate_tool_fee(tool_usage_dict) + total_llm_fee = calculate_llm_fee(llm_usage_dict) + # print(f"Total tool fee: {total_tool_fee}") + # print(f"Total llm fee: {total_llm_fee}") + # print(f"Total fee: {total_tool_fee + total_llm_fee}") + count = sum( + 1 + for f in os.listdir(results_dir) + if f.endswith(".json") and os.path.isfile(os.path.join(results_dir, f)) + ) + return (total_tool_fee + total_llm_fee) / count + + +def scan_error_logs(base_dir="logs"): + def _find_2025_dirs(root): + try: + with os.scandir(root) as it: + for entry in it: + if not entry.is_dir(): + continue + if entry.name.startswith("2025"): + yield entry.path + else: + yield from _find_2025_dirs(entry.path) + except PermissionError: + pass + except FileNotFoundError: + pass + + for d in _find_2025_dirs(base_dir): + print(d) + print("Average fee:", calculate_average_fee(d)) + + +scan_error_logs("logs") diff --git a/utils/usage/pricing.json b/utils/usage/pricing.json new file mode 100644 index 00000000..068ff957 --- /dev/null +++ b/utils/usage/pricing.json @@ -0,0 +1,69 @@ +{ + "tool": { + "JINA": "500 / 11000000000", + "SERPER": "3750 / 12500000", + "E2B": { + "cpu": "0.000014", + "mem": "0.0000045 / 1048576" + }, + "openai": { + "gpt-4o-transcribe": "2.50 / 1000000", + "gpt-4o-mini-transcribe": "1.25 / 1000000", + "gpt-4o-audio-preview": { + "OpenAI_audio_qa_audio_usage": "40 / 1000000", + "OpenAI_audio_qa_input_text_usage": "2.5 / 1000000", + "OpenAI_audio_qa_output_text_usage": "10 / 1000000" + } + }, + "openrouter": { + }, + "anthropic": { + "claude-3-7-sonnet-20250219": { + "input": "3 / 1000000", + "output": "15 / 1000000", + "cache_read": "0.3 / 1000000", + "cache_write": "3.75 / 1000000" + } + }, + "gemini": { + "gemini-3-pro-preview": { + "input": "2 / 1000000 if token < 200000 else 4 / 1000000", + "output": "12 / 1000000 if token < 200000 else 18 / 1000000" + } + } + }, + "llm": { + "openai": { + "gpt-5": { + "cached_read_input": "0.125 / 1000000", + "cached_write_input": "0", + "uncached_input": "1.25 / 1000000", + "total_output": "10 / 1000000" + } + }, + "anthropic": { + "claude-3-7-sonnet-20250219": { + "cached_read_input": "0.3 / 1000000", + "cached_write_input": "3.75 / 1000000", + "uncached_input": "3 / 1000000", + "total_output": "15 / 1000000" + } + }, + "mirothinker": { + "MODEL_NAME": { + "cached_read_input": "0.125 / 1000000", + "cached_write_input": "0", + "uncached_input": "1.25 / 1000000", + "total_output": "10 / 1000000" + }, + "DUMMY_MODEL_NAME": { + "cached_read_input": "0.125 / 1000000", + "cached_write_input": "0", + "uncached_input": "1.25 / 1000000", + "total_output": "10 / 1000000" + } + }, + "openrouter": { + } + } +} \ No newline at end of file