Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/modules/config/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class MCPServerConfig(BaseModel):
help_email: Optional[str] = None # Contact email for help/support
groups: List[str] = Field(default_factory=list)
enabled: bool = True
allow_discovery: bool = False # Allow users to discover server even without access (shows overview and contact info only)
command: Optional[List[str]] = None # Command to run server (for stdio servers)
cwd: Optional[str] = None # Working directory for command
env: Optional[Dict[str, str]] = None # Environment variables for stdio servers
Expand Down
41 changes: 40 additions & 1 deletion backend/modules/mcp_tools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from fastmcp import Client
from modules.config import config_manager
from core.auth_utils import create_authorization_manager
from core.utils import sanitize_for_logging
from modules.config.config_manager import resolve_env_var
from domain.messages.models import ToolCall, ToolResult
Expand Down Expand Up @@ -628,6 +627,46 @@ async def get_authorized_servers(self, user_email: str, auth_check_func) -> List
authorized_servers.append(server_name)
return authorized_servers

async def get_discoverable_servers(self, user_email: str, auth_check_func) -> Dict[str, Dict[str, Any]]:
"""Get servers that are discoverable but not authorized for the user.

Returns a dict mapping server names to their basic info (description, author, help_email, groups).
Only includes servers with allow_discovery=true where the user lacks access.
"""
discoverable_servers = {}
for server_name, server_config in self.servers_config.items():
if not server_config.get("enabled", True):
continue

# Skip if discovery is not allowed
if not server_config.get("allow_discovery", False):
continue

required_groups = server_config.get("groups", [])

# Skip servers with no groups (they're accessible to everyone)
if not required_groups:
continue

# Check if user is in any of the required groups
group_checks = [await auth_check_func(user_email, group) for group in required_groups]

# Only include if user does NOT have access
if not any(group_checks):
discoverable_servers[server_name] = {
'server': server_name,
'description': server_config.get('description', ''),
'author': server_config.get('author', ''),
'short_description': server_config.get('short_description', ''),
'help_email': server_config.get('help_email', ''),
'groups': required_groups,
'compliance_level': server_config.get('compliance_level'),
'is_discoverable': True,
'has_access': False
}

return discoverable_servers

def get_available_tools(self) -> List[str]:
"""Get list of available tool names."""
available_tools = []
Expand Down
9 changes: 9 additions & 0 deletions backend/routes/config_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def get_config(
tools_info = []
prompts_info = []
authorized_servers = []
discoverable_servers_info = []

if app_settings.feature_tools_enabled:
# Get MCP manager
Expand All @@ -123,6 +124,9 @@ async def get_config(
# Get authorized servers for the user - this filters out unauthorized servers completely
authorized_servers = await mcp_manager.get_authorized_servers(current_user, is_user_in_group)

# Get discoverable servers (servers user can see but not access)
discoverable_servers = await mcp_manager.get_discoverable_servers(current_user, is_user_in_group)

# Add canvas pseudo-tool to authorized servers (available to all users)
authorized_servers.append("canvas")

Expand Down Expand Up @@ -197,6 +201,9 @@ async def get_config(
'help_email': server_config.get('help_email', ''),
'compliance_level': server_config.get('compliance_level')
})

# Build discoverable servers info (limited information for servers user can't access)
discoverable_servers_info = list(discoverable_servers.values())

# Read help page configuration (supports new config directory layout + legacy paths)
help_config = {}
Expand Down Expand Up @@ -243,6 +250,7 @@ async def get_config(
# Log what the user can see for debugging
logger.info(
f"User {sanitize_for_logging(current_user)} has access to {len(authorized_servers)} servers: {authorized_servers}\n"
f"User can discover {len(discoverable_servers_info)} additional servers: {[s['server'] for s in discoverable_servers_info]}\n"
f"Returning {len(tools_info)} server tool groups to frontend for user {sanitize_for_logging(current_user)}"
)
# Build models list with compliance levels
Expand Down Expand Up @@ -284,6 +292,7 @@ async def get_config(
"models": models_list,
"tools": tools_info, # Only authorized servers are included
"prompts": prompts_info, # Available prompts from authorized servers
"discoverable_servers": discoverable_servers_info, # Servers user can see but not access
"data_sources": rag_data_sources, # RAG data sources for the user
"rag_servers": rag_servers, # Optional richer structure for RAG UI
"user": current_user,
Expand Down
221 changes: 221 additions & 0 deletions backend/tests/test_mcp_discoverable_servers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""Test get_discoverable_servers functionality."""

import pytest
from modules.mcp_tools.client import MCPToolManager


@pytest.mark.asyncio
async def test_get_discoverable_servers_basic():
"""Test that get_discoverable_servers returns servers user can discover but not access."""

# Create a mock MCPToolManager with test server config
mcp_manager = MCPToolManager(None)
mcp_manager.servers_config = {
"public_server": {
"enabled": True,
"groups": [],
"allow_discovery": True,
"description": "Public server",
"author": "Test Team",
"help_email": "[email protected]"
},
"admin_server": {
"enabled": True,
"groups": ["admin"],
"allow_discovery": True,
"description": "Admin only server",
"author": "Test Team",
"short_description": "Admin tools",
"help_email": "[email protected]",
"compliance_level": "SOC2"
},
"hidden_server": {
"enabled": True,
"groups": ["superusers"],
"allow_discovery": False, # Not discoverable
"description": "Hidden server",
"author": "Test Team",
"help_email": "[email protected]"
},
"user_server": {
"enabled": True,
"groups": ["users"],
"allow_discovery": True,
"description": "User server",
"author": "Test Team",
"help_email": "[email protected]"
}
}

# Mock async auth function - user has no group access
async def mock_auth_check(user_email: str, group: str) -> bool:
"""Mock auth check that returns False for all groups."""
return False

# Test with user who has no access
discoverable = await mcp_manager.get_discoverable_servers("[email protected]", mock_auth_check)

# Should include admin_server and user_server (discoverable, user lacks access)
# Should NOT include public_server (no groups required, so user has access)
# Should NOT include hidden_server (allow_discovery is False)
assert "admin_server" in discoverable
assert "user_server" in discoverable
assert "public_server" not in discoverable
assert "hidden_server" not in discoverable

# Check that discoverable servers have the right structure
assert discoverable["admin_server"]["server"] == "admin_server"
assert discoverable["admin_server"]["description"] == "Admin only server"
assert discoverable["admin_server"]["author"] == "Test Team"
assert discoverable["admin_server"]["help_email"] == "[email protected]"
assert discoverable["admin_server"]["groups"] == ["admin"]
assert discoverable["admin_server"]["compliance_level"] == "SOC2"
assert discoverable["admin_server"]["is_discoverable"] is True
assert discoverable["admin_server"]["has_access"] is False


@pytest.mark.asyncio
async def test_get_discoverable_servers_with_partial_access():
"""Test discoverable servers when user has access to some servers."""

mcp_manager = MCPToolManager(None)
mcp_manager.servers_config = {
"server1": {
"enabled": True,
"groups": ["users"],
"allow_discovery": True,
"description": "Server 1",
"author": "Test Team",
"help_email": "[email protected]"
},
"server2": {
"enabled": True,
"groups": ["admin"],
"allow_discovery": True,
"description": "Server 2",
"author": "Test Team",
"help_email": "[email protected]"
},
"server3": {
"enabled": True,
"groups": ["superusers"],
"allow_discovery": True,
"description": "Server 3",
"author": "Test Team",
"help_email": "[email protected]"
}
}

# User has access to 'users' group only
async def mock_auth_check(user_email: str, group: str) -> bool:
return group == "users"

discoverable = await mcp_manager.get_discoverable_servers("[email protected]", mock_auth_check)

# Should include server2 and server3 (discoverable, user lacks access)
# Should NOT include server1 (user has access via 'users' group)
assert "server1" not in discoverable
assert "server2" in discoverable
assert "server3" in discoverable


@pytest.mark.asyncio
async def test_get_discoverable_servers_disabled_servers():
"""Test that disabled servers are not discoverable."""

mcp_manager = MCPToolManager(None)
mcp_manager.servers_config = {
"enabled_server": {
"enabled": True,
"groups": ["admin"],
"allow_discovery": True,
"description": "Enabled server",
"author": "Test Team",
"help_email": "[email protected]"
},
"disabled_server": {
"enabled": False,
"groups": ["admin"],
"allow_discovery": True,
"description": "Disabled server",
"author": "Test Team",
"help_email": "[email protected]"
}
}

async def mock_auth_check(user_email: str, group: str) -> bool:
return False

discoverable = await mcp_manager.get_discoverable_servers("[email protected]", mock_auth_check)

# Should include only enabled_server
# Should NOT include disabled_server
assert "enabled_server" in discoverable
assert "disabled_server" not in discoverable


@pytest.mark.asyncio
async def test_get_discoverable_servers_empty():
"""Test when no servers are discoverable."""

mcp_manager = MCPToolManager(None)
mcp_manager.servers_config = {
"server1": {
"enabled": True,
"groups": ["admin"],
"allow_discovery": False, # Not discoverable
"description": "Server 1",
"author": "Test Team",
"help_email": "[email protected]"
},
"server2": {
"enabled": True,
"groups": [], # No groups required
"allow_discovery": True,
"description": "Server 2",
"author": "Test Team",
"help_email": "[email protected]"
}
}

async def mock_auth_check(user_email: str, group: str) -> bool:
return False

discoverable = await mcp_manager.get_discoverable_servers("[email protected]", mock_auth_check)

# Should be empty - server1 is not discoverable, server2 has no groups
assert discoverable == {}


@pytest.mark.asyncio
async def test_get_discoverable_servers_all_access():
"""Test when user has access to all servers."""

mcp_manager = MCPToolManager(None)
mcp_manager.servers_config = {
"server1": {
"enabled": True,
"groups": ["admin"],
"allow_discovery": True,
"description": "Server 1",
"author": "Test Team",
"help_email": "[email protected]"
},
"server2": {
"enabled": True,
"groups": ["users"],
"allow_discovery": True,
"description": "Server 2",
"author": "Test Team",
"help_email": "[email protected]"
}
}

# User has access to all groups
async def mock_auth_check(user_email: str, group: str) -> bool:
return True

discoverable = await mcp_manager.get_discoverable_servers("[email protected]", mock_auth_check)

# Should be empty - user has access to all servers
assert discoverable == {}
Loading