Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
41dcb7d
Add run_sql_query tool for executing read-only SQL queries
kevingatera Jul 17, 2025
bf3b5f7
Add wrap_untrusted function to handle untrusted data and reduce promp…
kevingatera Jul 21, 2025
aafca5a
Add read-only mode support to improve security and prevent accidental…
kevingatera Jul 21, 2025
1b2f571
Externalize formatter as per Daniel's comment
kevingatera Jul 21, 2025
2187fd3
Improved SQL result formatter now pads each column to the widest head…
kevingatera Jul 21, 2025
ac757c5
Fixed the error handling in run_sql_query to properly catch exception…
kevingatera Jul 21, 2025
78412fb
Created a unified _assert_query_executable method that checks both qu…
kevingatera Jul 21, 2025
6cb2557
Provide clear documentation about the tool's capabilities in both rea…
kevingatera Jul 21, 2025
eecc32d
Update main.py
danielmeppiel Jul 22, 2025
08c896c
Update db_context/schema/formatter.py
kevingatera Jul 22, 2025
0eec9a2
Commit only on explicit write ops
kevingatera Jul 22, 2025
7a1fae4
Provide a bit more robust SQL injection
kevingatera Jul 22, 2025
4184091
Centralize cursor management in execute_sql_query
kevingatera Jul 22, 2025
c07eb0f
Use try catch for exceptions coming from Database API
kevingatera Jul 22, 2025
da22779
Restore to single-query column search with result cache
kevingatera Jul 22, 2025
ba09a78
Add sqlparse dependency and update package metadata
kevingatera Jul 28, 2025
7ee8b5c
Implemented sqlparse instead of regex-based validation
kevingatera Jul 28, 2025
848200f
Fix wrap_untrusted with XML escaping and clearer boundaries
kevingatera Jul 31, 2025
63c9833
Add defensive read-only check and fix token validation
kevingatera Jul 31, 2025
3b71142
CI: add GitHub Actions workflow for unit & integration tests with Ora…
danielmeppiel Aug 22, 2025
bc6ef40
refactor: move wrap_untrusted into db_context.utils and fix test imports
danielmeppiel Aug 22, 2025
6e9fb38
merge: incorporate wrap_untrusted refactor into PR branch
danielmeppiel Aug 22, 2025
f138a31
ci: run unit tests on push and PR; gate integration tests on unit tests
danielmeppiel Aug 22, 2025
f6362e5
docs: tighten MCP tool docstrings with Use/Compose/Avoid guidance
danielmeppiel Aug 22, 2025
5811da4
feat: add query result formatting improvements, explain plan tool, an…
danielmeppiel Aug 22, 2025
9c1aaeb
fix: correct cross-schema foreign key detection in get_related_tables…
danielmeppiel Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ By intelligently caching and serving database schema information, this server al
- **Relationship Mapping**: Understand foreign key relationships between tables
- **Oracle Database Support**: Built specifically for Oracle databases
- **MCP Integration**: Works seamlessly with GitHub Copilot in VSCode, Claude, ChatGPT, and other AI assistants that support MCP
- **Read-Only Mode**: Default security mode that prevents write operations while allowing full read access

## Usage

Expand Down Expand Up @@ -104,7 +105,8 @@ In VSCode Insiders, go to your user or workspace `settings.json` file and add th
"TARGET_SCHEMA":"",
"CACHE_DIR":".cache",
"THICK_MODE":"", // Optional: set to "1" to enable thick mode
"ORACLE_CLIENT_LIB_DIR":"" // Optional: in case you use thick mode and you want to set a non-default directory for client libraries
"ORACLE_CLIENT_LIB_DIR":"", // Optional: in case you use thick mode and you want to set a non-default directory for client libraries
"READ_ONLY_MODE":"1" // Optional: set to "0" to allow write operations (default: "1" for read-only)
}
}
}
Expand Down Expand Up @@ -179,7 +181,8 @@ In VSCode Insiders, go to your user or workspace `settings.json` file and add th
"TARGET_SCHEMA":"",
"CACHE_DIR":".cache",
"THICK_MODE":"", // Optional: set to "1" to enable thick mode
"ORACLE_CLIENT_LIB_DIR":"" // Optional: in case you use thick mode and if you want to set a non-default directory for client libraries
"ORACLE_CLIENT_LIB_DIR":"", // Optional: in case you use thick mode and if you want to set a non-default directory for client libraries
"READ_ONLY_MODE":"1" // Optional: set to "0" to allow write operations (default: "1" for read-only)
}
}
}
Expand All @@ -191,6 +194,7 @@ For both options:
- Replace the `ORACLE_CONNECTION_STRING` with your actual database connection string
- The `TARGET_SCHEMA` is optional, it will default to the user's schema
- The `CACHE_DIR` is optional, defaulting to `.cache` within the MCP server root folder
- The `READ_ONLY_MODE` defaults to "1" (read-only) for security. Set to "0" only when write operations are needed

### Starting the Server locally

Expand Down Expand Up @@ -308,6 +312,15 @@ Example:
What tables are related to the ORDERS table?
```

#### `run_sql_query`
Execute a SQL query and return the results in a formatted table.
Example:
```
Can you run this query for me? SELECT * FROM EMPLOYEES WHERE DEPARTMENT_ID = 10
```

**Note**: In read-only mode (default), only SELECT statements are permitted. Write operations (INSERT, UPDATE, DELETE) are blocked for security. When read-only mode is deactivated (`READ_ONLY_MODE="0"`), this tool can execute both read and write operations.

## Architecture

This MCP server employs a three-layer architecture optimized for large-scale Oracle databases:
Expand Down Expand Up @@ -350,6 +363,15 @@ You can specify a custom location for the Oracle Client libraries using the `ORA

Note: When using Docker, you don't need to worry about installing Oracle Client libraries as they are included in the container (Oracle Instant Client v23.7). The container supports Oracle databases versions 19c up to 23ai in both linux/arm64 and linux/amd64 architectures.

## Read-Only Mode

The MCP server operates in read-only mode by default for increased security. This prevents any write operations (INSERT, UPDATE, DELETE, DDL) while allowing full read access to the database. It protects against unintended changes from AI-generated queries.

### Configuration

- **Default**: `READ_ONLY_MODE="1"` (read-only, secure)
- **Write Access**: `READ_ONLY_MODE="0"` (allows write operations)

## System Requirements

- **Python**: Version 3.12 or higher (required for optimal performance)
Expand Down
8 changes: 6 additions & 2 deletions db_context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@


class DatabaseContext:
def __init__(self, connection_string: str, cache_path: Path, target_schema: Optional[str] = None, use_thick_mode: bool = False, lib_dir: Optional[str] = None):
self.db_connector = DatabaseConnector(connection_string, target_schema, use_thick_mode, lib_dir)
def __init__(self, connection_string: str, cache_path: Path, target_schema: Optional[str] = None, use_thick_mode: bool = False, lib_dir: Optional[str] = None, read_only: bool = True):
self.db_connector = DatabaseConnector(connection_string, target_schema, use_thick_mode, lib_dir, read_only)
self.schema_manager = SchemaManager(self.db_connector, cache_path)
# Set the schema manager reference in the connector
self.db_connector.set_schema_manager(self.schema_manager)
Expand Down Expand Up @@ -133,6 +133,10 @@ async def get_related_tables(self, table_name: str) -> Dict[str, List[str]]:
await self.schema_manager.save_cache()
return result

async def run_sql_query(self, sql: str, params: Optional[Dict[str, Any]] = None, max_rows: int = 100) -> Dict[str, Any]:
"""Runs a SQL query and returns the results."""
return await self.db_connector.execute_sql_query(sql, params, max_rows)

async def explain_query_plan(self, query: str) -> Dict[str, Any]:
"""Get execution plan for an SQL query with optimization suggestions"""
return await self.db_connector.explain_query_plan(query)
166 changes: 148 additions & 18 deletions db_context/database.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import sys
import oracledb
import re
import time
import asyncio
from typing import Dict, List, Set, Optional, Any
from pathlib import Path
from .models import SchemaManager

class DatabaseConnector:
def __init__(self, connection_string: str, target_schema: Optional[str] = None, use_thick_mode: bool = False, lib_dir: Optional[str] = None):
def __init__(self, connection_string: str, target_schema: Optional[str] = None, use_thick_mode: bool = False, lib_dir: Optional[str] = None, read_only: bool = True):
"""Create a new connector.

Args:
connection_string: Oracle connection string
target_schema: Optional schema override
use_thick_mode: Whether to use thick mode for Oracle client
lib_dir: Optional Oracle client library directory
read_only: When True (default) all write operations will be blocked.
"""
self.connection_string = connection_string
self.schema_manager: Optional[SchemaManager] = None # Will be set by DatabaseContext
self.target_schema: Optional[str] = target_schema
self.thick_mode = use_thick_mode
self.read_only = read_only
self._pool = None
self._pool_lock = asyncio.Lock()

Expand Down Expand Up @@ -103,18 +114,41 @@ async def _execute_cursor(self, cursor, sql: str, **params):
await cursor.execute(sql, **params) # Async execution
return await cursor.fetchall()

def _assert_query_executable(self, sql: str) -> None:
"""Check if a query can be executed based on read-only mode and query type."""
if self.read_only and not self._is_select_query(sql):
raise PermissionError("Read-only mode: only SELECT statements are permitted.")

def _assert_write_allowed(self) -> None:
"""Raise if the connector is in read-only mode."""
if self.read_only:
raise PermissionError("Read-only mode: write operations are disabled")

async def _execute_cursor_no_fetch(self, cursor, sql: str, **params):
"""Helper method for cursor operations that don't need fetching (e.g. DELETE, UPDATE)"""
"""Helper method for statements that modify data (e.g. DELETE, UPDATE)."""
self._assert_query_executable(sql)
if self.thick_mode:
cursor.execute(sql, **params)
else:
await cursor.execute(sql, **params)

async def _execute_cursor_with_fetch(self, cursor, sql: str, max_rows: int = 100, **params):
"""Helper method for SELECT queries that need to fetch results."""
self._assert_query_executable(sql)
if self.thick_mode:
cursor.execute(sql, **params)
rows = cursor.fetchmany(max_rows)
else:
await cursor.execute(sql, **params)
rows = await cursor.fetchmany(max_rows)
return list(rows)

async def _commit(self, conn):
"""Commit the current transaction"""
self._assert_write_allowed()
if self.thick_mode:
conn.commit()
else:
else:
await conn.commit()


Expand Down Expand Up @@ -686,13 +720,13 @@ async def search_in_database(self, search_term: str, limit: int = 20) -> List[st
await self._close_connection(conn)

async def search_columns_in_database(self, table_names: List[str], search_term: str) -> Dict[str, List[Dict[str, Any]]]:
"""Search for columns in specified tables"""
"""Search for columns with a given pattern within a list of tables"""
conn = await self.get_connection()
try:
cursor = conn.cursor()
schema = await self._get_effective_schema(conn)
result = {}

# Get columns for the specified tables that match the search term
rows = await self._execute_cursor(cursor, """
SELECT /*+ RESULT_CACHE */
Expand All @@ -708,7 +742,7 @@ async def search_columns_in_database(self, table_names: List[str], search_term:
""", owner=schema,
table_names=table_names,
search_term=search_term.upper())

for table_name, column_name, data_type, nullable in rows:
if table_name not in result:
result[table_name] = []
Expand All @@ -717,14 +751,68 @@ async def search_columns_in_database(self, table_names: List[str], search_term:
"type": data_type,
"nullable": nullable == 'Y'
})

return result
finally:
await self._close_connection(conn)


async def execute_sql_query(self, sql: str, params: Optional[Dict[str, Any]] = None, max_rows: int = 100) -> Dict[str, Any]:
"""
Executes a SQL query and returns the results.

Args:
sql: The SQL query to execute.
params: A dictionary of bind parameters for the query.
max_rows: The maximum number of rows to return.

Returns:
A dictionary containing the query results, including column definitions and rows.
"""
conn = await self.get_connection()
try:
cursor = conn.cursor()

# Check if this is a SELECT query (has description)
if self._is_select_query(sql):
rows = await self._execute_cursor_with_fetch(cursor, sql, max_rows, **(params or {}))
columns = [desc[0] for desc in cursor.description] if cursor.description else []
result_rows = [dict(zip(columns, row)) for row in rows]

return {
"columns": columns,
"rows": result_rows,
"row_count": len(result_rows)
}
else:
await self._execute_cursor_no_fetch(cursor, sql, **(params or {}))
row_count = cursor.rowcount

# Only commit when the statement is an explicit DML or DDL operation
if self._is_write_operation(sql):
await self._commit(conn)
return {
"columns": [],
"rows": [],
"row_count": row_count,
"message": f"Statement executed successfully. {row_count} row(s) affected."
}

except oracledb.Error as e:
raise e
except PermissionError as e:
raise e
finally:
await self._close_connection(conn)

async def explain_query_plan(self, query: str) -> Dict[str, Any]:
"""Get execution plan for a SQL query"""
"""
Get the execution plan for a given SQL query and provide optimization suggestions.
This tool uses 'EXPLAIN PLAN FOR' to analyze the query without executing it.
"""
# Check if explain plan is allowed
self._assert_query_executable(query)

conn = await self.get_connection()
try:
cursor = conn.cursor()
Expand Down Expand Up @@ -770,6 +858,12 @@ async def explain_query_plan(self, query: str) -> Dict[str, Any]:
"optimization_suggestions": ["Unable to generate execution plan due to error."],
"error": str(e)
}
except PermissionError as e:
return {
"execution_plan": [],
"optimization_suggestions": [],
"error": str(e)
}
finally:
await self._close_connection(conn)

Expand Down Expand Up @@ -811,12 +905,48 @@ def _analyze_query_for_optimization(self, query: str) -> List[str]:

return suggestions

async def _close_connection(self, conn):
"""Helper method to close connection based on mode"""
try:
if self.thick_mode:
conn.close() # Synchronous close for thick mode
else:
await conn.close() # Async close for thin mode
except Exception as e:
print(f"Error closing connection: {str(e)}", file=sys.stderr)
@staticmethod
def _is_select_query(sql: str) -> bool:
"""Return True if the statement appears to be a *pure* read-only SELECT/CTE.

Heuristics used (cheap but reasonably safe for most cases):
1. Statement must start with SELECT or WITH.
2. No semicolon exists elsewhere (prevents stacked statements).
3. No DML/DDL keywords appear anywhere in the text (outside quotes is
not fully parsed but good enough as a guard-rail). For more robust
validation a full SQL parser should be used.
"""
stripped = sql.lstrip()
upper_sql = stripped.upper()

if not (upper_sql.startswith("SELECT") or upper_sql.startswith("WITH")):
return False

# block attempts to chain multiple statements
if ";" in upper_sql:
return False

# simple keyword search for write operations
write_pattern = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|CREATE|ALTER|DROP|TRUNCATE|GRANT|REVOKE)\b", re.IGNORECASE)
if write_pattern.search(upper_sql):
return False

return True

@staticmethod
def _is_write_operation(sql: str) -> bool:
"""Return True if the SQL statement modifies data or structure."""
stripped = sql.lstrip().upper()
write_keywords = (
"INSERT",
"UPDATE",
"DELETE",
"MERGE",
"CREATE",
"ALTER",
"DROP",
"TRUNCATE",
"GRANT",
"REVOKE",
)
return any(stripped.startswith(keyword) for keyword in write_keywords)
51 changes: 50 additions & 1 deletion db_context/schema/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,53 @@ def _format_relationship_groups(groups: List[Dict[str, Any]], result: List[str])
else:
result.append(f" - {group['pattern']}:")
for pattern in sorted(group['column_patterns']):
result.append(f" {pattern}")
result.append(f" {pattern}")

def format_sql_query_result(result: Dict[str, Any]) -> str:
"""
Format SQL query results as a markdown table.

Args:
result: Dictionary containing query results with 'columns' and 'rows' keys

Returns:
Formatted markdown table string
"""
if not result.get("rows"):
return "Query executed successfully, but returned no rows."

headers = [str(h) for h in result["columns"]]
rows = result["rows"]

# Determine the maximum width required for each column considering both header and cell values
col_widths: List[int] = []
for header in headers:
max_width = len(header)
for row in rows:
cell_len = len(str(row.get(header, "")))
if cell_len > max_width:
max_width = cell_len
col_widths.append(max_width)

def _pad(cell: str, width: int) -> str:
"""Left-justify cell content (right-pad) to match the column width for aligned output."""
return cell.ljust(width)

formatted_result: List[str] = []

# Header row
header_row = "| " + " | ".join(_pad(h, col_widths[i]) for i, h in enumerate(headers)) + " |"
formatted_result.append(header_row)

# Separator row – must have at least 3 dashes per Markdown spec
separator_row = "| " + " | ".join("-" * max(3, col_widths[i]) for i in range(len(headers))) + " |"
formatted_result.append(separator_row)

# Data rows
for row in rows:
data_row = "| " + " | ".join(
_pad(str(row.get(h, "")), col_widths[i]) for i, h in enumerate(headers)
) + " |"
formatted_result.append(data_row)

return "\n".join(formatted_result)
Loading