-
Notifications
You must be signed in to change notification settings - Fork 14
added 3 new tools #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds three new database manipulation tools—update_rows
, get_rows
, and delete_rows
—to allow row-level operations via the MCP framework, introduces a helper method in the database context for retrieving rows directly, and documents these capabilities in both English and Chinese READMEs.
- Implements
update_rows
,get_rows
, anddelete_rows
as async MCP tools inmain.py
- Adds
get_rows_from_table
helper indb_context/database.py
(currently unused) - Updates
README.md
andREADME-zh.md
with usage examples for the new tools
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
main.py | Adds update_rows , get_rows , and delete_rows tools |
db_context/database.py | Introduces get_rows_from_table helper |
README.md | Documents new tools with examples |
README-zh.md | Documents new tools with examples (Chinese) |
Comments suppressed due to low confidence (2)
main.py:610
- These newly added tools (
update_rows
,get_rows
,delete_rows
) do not appear to have accompanying unit or integration tests. Adding tests for success, failure, and edge cases will ensure reliability.
@mcp.tool()
main.py:668
- The
ctx
parameter is placed beforelimit
, which likely breaks the expected context injection order for@mcp.tool()
.ctx
should be the last argument—consider movinglimit
ahead ofctx
.
async def get_rows(table_name: str, condition: str, ctx: Context, limit: int = 5) -> str:
main.py
Outdated
update_query = f"UPDATE {schema}.{table_name} SET {set_values} WHERE {condition}" | ||
|
||
# Execute the update | ||
await db_context.db_connector._execute_cursor_no_fetch(cursor, update_query) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructing SQL statements via direct string interpolation can open SQL injection risks. Consider parameterizing user-provided values or validating/escaping inputs before executing the query.
update_query = f"UPDATE {schema}.{table_name} SET {set_values} WHERE {condition}" | |
# Execute the update | |
await db_context.db_connector._execute_cursor_no_fetch(cursor, update_query) | |
# Validate table_name to prevent SQL injection | |
if not table_name.isidentifier(): | |
return f"Invalid table name: {table_name}" | |
# Construct the parameterized query | |
update_query = f"UPDATE {schema}.{table_name} SET {set_values} WHERE {condition}" | |
# Execute the update with parameters | |
await db_context.db_connector._execute_cursor_no_fetch(cursor, update_query, {}) |
Copilot uses AI. Check for mistakes.
main.py
Outdated
|
||
# Construct and execute the delete query | ||
schema = await db_context.db_connector._get_effective_schema(conn) | ||
delete_query = f"DELETE FROM {schema}.{table_name} WHERE {condition}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This DELETE statement is built via string interpolation and may be vulnerable to SQL injection. It’s safer to use parameterized queries or sanitize the condition before executing.
delete_query = f"DELETE FROM {schema}.{table_name} WHERE {condition}" | |
delete_query = f"DELETE FROM {schema}.\"{table_name}\" WHERE {condition}" |
Copilot uses AI. Check for mistakes.
db_context/database.py
Outdated
async def get_rows_from_table(self, table_name: str, condition: str, limit: int = 100) -> List[Dict[str, Any]]: | ||
"""Get rows from a table that match the specified condition""" | ||
conn = await self.get_connection() | ||
try: | ||
cursor = conn.cursor() | ||
schema = await self._get_effective_schema(conn) | ||
|
||
# Get column names for this table to format results | ||
columns_result = await self._execute_cursor( | ||
cursor, | ||
""" | ||
SELECT column_name | ||
FROM all_tab_columns | ||
WHERE owner = :owner AND table_name = :table_name | ||
ORDER BY column_id | ||
""", | ||
owner=schema, | ||
table_name=table_name.upper() | ||
) | ||
|
||
column_names = [col[0] for col in columns_result] | ||
|
||
# Construct and execute the query | ||
query = f"SELECT * FROM {schema}.{table_name} WHERE {condition}" | ||
if limit > 0: | ||
query += f" FETCH FIRST {limit} ROWS ONLY" | ||
|
||
results = await self._execute_cursor(cursor, query) | ||
|
||
# Convert to list of dictionaries with column names | ||
formatted_results = [] | ||
for row in results: | ||
row_dict = {} | ||
for i, col_name in enumerate(column_names): | ||
# Handle None values and format data appropriately | ||
if i < len(row): | ||
row_dict[col_name] = row[i] | ||
formatted_results.append(row_dict) | ||
|
||
return formatted_results | ||
finally: | ||
await self._close_connection(conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The new get_rows_from_table
helper isn’t used by any of the tools and duplicates functionality. Consider consolidating with existing get_rows
methods or renaming/removing to avoid dead code.
async def get_rows_from_table(self, table_name: str, condition: str, limit: int = 100) -> List[Dict[str, Any]]: | |
"""Get rows from a table that match the specified condition""" | |
conn = await self.get_connection() | |
try: | |
cursor = conn.cursor() | |
schema = await self._get_effective_schema(conn) | |
# Get column names for this table to format results | |
columns_result = await self._execute_cursor( | |
cursor, | |
""" | |
SELECT column_name | |
FROM all_tab_columns | |
WHERE owner = :owner AND table_name = :table_name | |
ORDER BY column_id | |
""", | |
owner=schema, | |
table_name=table_name.upper() | |
) | |
column_names = [col[0] for col in columns_result] | |
# Construct and execute the query | |
query = f"SELECT * FROM {schema}.{table_name} WHERE {condition}" | |
if limit > 0: | |
query += f" FETCH FIRST {limit} ROWS ONLY" | |
results = await self._execute_cursor(cursor, query) | |
# Convert to list of dictionaries with column names | |
formatted_results = [] | |
for row in results: | |
row_dict = {} | |
for i, col_name in enumerate(column_names): | |
# Handle None values and format data appropriately | |
if i < len(row): | |
row_dict[col_name] = row[i] | |
formatted_results.append(row_dict) | |
return formatted_results | |
finally: | |
await self._close_connection(conn) | |
# Removed the unused `get_rows_from_table` method as it is not referenced elsewhere in the provided code. |
Copilot uses AI. Check for mistakes.
Hi @SHAHARYAR1255, thanks for proposing these new tools to execute select/update/delete queries. In principle, I see the implementation constraining flexibility. We limit the flexibility by imposing simple "where" conditions and a specific query construct. SQL queries can be a bit more complex and include joins, group bys, sub-queries, etc. Since the Agent can figure out complex queries by itself, and this MCP tool enables it to write very complex queries by understanding the schema, I'd rather go for a more generic tool similar to what mcp/sqlite server has done. This would multiply the usefulness of the new tools. It could be as simple as letting the agent pass the SQL statement in full, and 3 tools - one per operation type - to be able to implement more restrictive RBAC per type of operation. Formatting logic should go in |
execute_select_query execute_update_query execute_delete_query
Hi Daniel, I have updated the tools to make it just receive a SQL query and execute it. |
Summary
This PR adds three new tools to the MCP for interacting with the database:
execute_select_query:
Executes a SELECT SQL query and returns the results as a list of dictionaries.
execute_update_query:
Executes an UPDATE SQL query and returns the number of affected rows.
execute_delete_query:
Executes a DELETE SQL query and returns the number of affected rows.