diff --git a/src/daemon-client.ts b/src/daemon-client.ts index d9b5c34..fa6ce82 100644 --- a/src/daemon-client.ts +++ b/src/daemon-client.ts @@ -22,6 +22,7 @@ import { removePidFile, removeSocketFile, } from './daemon.js'; +import { sendIpcRequest } from './ipc.js'; // ============================================================================ // Daemon Connection @@ -55,40 +56,7 @@ async function sendRequest( socketPath: string, request: DaemonRequest, ): Promise { - return new Promise((resolve, reject) => { - const socket = Bun.connect({ - unix: socketPath, - socket: { - open(socket) { - socket.write(JSON.stringify(request)); - }, - data(socket, data) { - try { - const response = JSON.parse(data.toString().trim()); - socket.end(); - resolve(response); - } catch (err) { - socket.end(); - reject(new Error('Invalid response from daemon')); - } - }, - error(socket, error) { - reject(error); - }, - close() { - // Connection closed - }, - connectError(socket, error) { - reject(error); - }, - }, - }); - - // Timeout after 5 seconds (fast fallback to direct connection) - setTimeout(() => { - reject(new Error('Daemon request timeout')); - }, 5000); - }); + return sendIpcRequest(socketPath, request) as Promise; } /** diff --git a/src/daemon.ts b/src/daemon.ts index 944cc5c..44dbbf3 100644 --- a/src/daemon.ts +++ b/src/daemon.ts @@ -28,6 +28,7 @@ import { getSocketDir, getSocketPath, } from './config.js'; +import { createIpcServer } from './ipc.js'; // ============================================================================ // Types @@ -348,26 +349,8 @@ export async function runDaemon( // Start Unix socket server try { - server = Bun.listen({ - unix: socketPath, - socket: { - open(socket) { - activeConnections.add(socket); - debug(`[daemon:${serverName}] Client connected`); - }, - async data(socket, data) { - const response = await handleRequest(data); - socket.write(`${JSON.stringify(response)}\n`); - }, - close(socket) { - activeConnections.delete(socket); - debug(`[daemon:${serverName}] Client disconnected`); - }, - error(socket, error) { - debug(`[daemon:${serverName}] Socket error: ${error.message}`); - activeConnections.delete(socket); - }, - }, + server = createIpcServer(socketPath, async (request) => { + return await handleRequest(Buffer.from(JSON.stringify(request))); }); debug(`[daemon:${serverName}] Listening on ${socketPath}`); diff --git a/src/ipc.ts b/src/ipc.ts new file mode 100644 index 0000000..d9ad796 --- /dev/null +++ b/src/ipc.ts @@ -0,0 +1,126 @@ +/** + * IPC primitives for daemon socket communication. + * + * Handles large message transmission over Unix sockets where Bun's + * socket.write() may only write up to the kernel buffer size (~8KB). + * Uses newline-delimited JSON as the wire protocol. + */ + +/** + * Create a Unix socket server that receives JSON requests and sends + * newline-delimited JSON responses, handling partial writes via drain. + */ +export function createIpcServer( + socketPath: string, + handler: (request: unknown) => unknown, +) { + const readBuffers = new Map(); + const writeBuffers = new Map(); + + function writeAll(socket: { write(data: string): number }, payload: string) { + const written = socket.write(payload); + if (written < payload.length) { + writeBuffers.set(socket, payload.slice(written)); + } + } + + return Bun.listen({ + unix: socketPath, + socket: { + async data(socket, data) { + if (!readBuffers.has(socket)) readBuffers.set(socket, ''); + readBuffers.set(socket, readBuffers.get(socket)! + data.toString()); + const buf = readBuffers.get(socket)!; + try { + JSON.parse(buf); + readBuffers.delete(socket); + } catch { + return; // incomplete JSON, wait for more + } + const request = JSON.parse(buf); + const response = await handler(request); + writeAll(socket, JSON.stringify(response) + '\n'); + }, + drain(socket) { + const remaining = writeBuffers.get(socket); + if (remaining) { + writeBuffers.delete(socket); + writeAll(socket, remaining); + } + }, + close(socket) { + readBuffers.delete(socket); + writeBuffers.delete(socket); + }, + error(socket) { + readBuffers.delete(socket); + writeBuffers.delete(socket); + }, + }, + }); +} + +/** + * Send a JSON request to a Unix socket and receive a newline-delimited + * JSON response, buffering chunks until the full message arrives. + */ +export function sendIpcRequest( + socketPath: string, + request: unknown, + timeoutMs = 5000, +): Promise { + return new Promise((resolve, reject) => { + let buffer = ''; + let settled = false; + + function settle(fn: () => void) { + if (!settled) { + settled = true; + fn(); + } + } + + Bun.connect({ + unix: socketPath, + socket: { + open(socket) { + socket.write(JSON.stringify(request)); + }, + data(socket, data) { + buffer += data.toString(); + const newlineIndex = buffer.indexOf('\n'); + if (newlineIndex === -1) return; + const message = buffer.slice(0, newlineIndex).trim(); + try { + const response = JSON.parse(message); + socket.end(); + settle(() => resolve(response)); + } catch { + socket.end(); + settle(() => reject(new Error('Invalid response from daemon'))); + } + }, + close() { + if (buffer.length > 0) { + try { + const response = JSON.parse(buffer.trim()); + settle(() => resolve(response)); + } catch { + // timeout will handle it + } + } + }, + error(_socket, error) { + settle(() => reject(error)); + }, + connectError(_socket, error) { + settle(() => reject(error)); + }, + }, + }); + + setTimeout(() => { + settle(() => reject(new Error('Daemon request timeout'))); + }, timeoutMs); + }); +} diff --git a/tests/daemon-socket.test.ts b/tests/daemon-socket.test.ts new file mode 100644 index 0000000..7da38be --- /dev/null +++ b/tests/daemon-socket.test.ts @@ -0,0 +1,105 @@ +/** + * Integration tests for daemon IPC socket communication. + * + * Bun's socket.write() may only write up to the kernel buffer size + * (typically 8192 bytes). These tests verify that the daemon correctly + * transmits large responses (like listTools with many tools) by + * exercising the real daemon process and CLI client code paths. + */ + +import { describe, test, expect, beforeAll, afterAll } from 'bun:test'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { $ } from 'bun'; + +describe('Daemon large message handling', () => { + let tempDir: string; + let configPath: string; + const mockServerPath = join(import.meta.dir, 'fixtures', 'mock-mcp-server.ts'); + + beforeAll(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'mcp-cli-daemon-test-')); + configPath = join(tempDir, 'mcp_servers.json'); + await writeFile( + configPath, + JSON.stringify({ + mcpServers: { + 'mock-large': { + command: 'bun', + args: ['run', mockServerPath], + }, + }, + }), + ); + }); + + afterAll(async () => { + // Kill any daemon processes we spawned + try { + await $`pkill -f 'daemon.*mock-large'`.nothrow(); + } catch {} + await rm(tempDir, { recursive: true, force: true }); + }); + + test('lists all 50 tools via direct connection (baseline)', async () => { + const result = + await $`MCP_NO_DAEMON=1 MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath}` + .quiet() + .nothrow() + .then((r) => ({ + stdout: r.stdout.toString(), + stderr: r.stderr.toString(), + exitCode: r.exitCode, + })); + + expect(result.exitCode).toBe(0); + expect(result.stdout).toContain('mock_tool_0'); + expect(result.stdout).toContain('mock_tool_49'); + + // Count tool lines (each starts with " • ") + const toolLines = result.stdout.split('\n').filter((l: string) => l.trim().startsWith('•')); + expect(toolLines.length).toBe(50); + }, 20000); + + test('lists all 50 tools via daemon (exercises large IPC message)', async () => { + // This is the key test: the daemon's listTools response for 50 tools + // exceeds the 8KB kernel socket buffer, requiring drain-based writes. + // Without the fix, only the first ~8KB is sent and the client times out. + const result = + await $`MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath}` + .quiet() + .nothrow() + .then((r) => ({ + stdout: r.stdout.toString(), + stderr: r.stderr.toString(), + exitCode: r.exitCode, + })); + + expect(result.exitCode).toBe(0); + expect(result.stdout).toContain('mock_tool_0'); + expect(result.stdout).toContain('mock_tool_49'); + + const toolLines = result.stdout.split('\n').filter((l: string) => l.trim().startsWith('•')); + expect(toolLines.length).toBe(50); + }, 20000); + + test('daemon serves tool descriptions correctly for large payloads', async () => { + // With -d flag, descriptions are included — making the response even larger + const result = + await $`MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath} -d` + .quiet() + .nothrow() + .then((r) => ({ + stdout: r.stdout.toString(), + stderr: r.stderr.toString(), + exitCode: r.exitCode, + })); + + expect(result.exitCode).toBe(0); + expect(result.stdout).toContain('mock_tool_0'); + expect(result.stdout).toContain('mock_tool_49'); + // Descriptions should be present + expect(result.stdout).toContain('intentionally long'); + }, 20000); +}); diff --git a/tests/fixtures/mock-mcp-server.ts b/tests/fixtures/mock-mcp-server.ts new file mode 100644 index 0000000..d855ea7 --- /dev/null +++ b/tests/fixtures/mock-mcp-server.ts @@ -0,0 +1,36 @@ +#!/usr/bin/env bun +/** + * Mock MCP server over stdio that returns a large number of tools. + * Used by daemon socket integration tests to ensure large IPC + * messages are transmitted correctly. + * + * Uses the official MCP SDK server/stdio transport. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { z } from 'zod'; + +const TOOL_COUNT = 50; + +const server = new McpServer({ + name: 'mock-large-server', + version: '1.0.0', +}); + +// Register many tools to create a large listTools response (>8KB) +for (let i = 0; i < TOOL_COUNT; i++) { + server.tool( + `mock_tool_${i}`, + `Mock tool number ${i}. This is a description that is intentionally long to inflate the payload size beyond the kernel socket buffer limit of 8192 bytes.`, + { + arg1: z.string().describe(`String argument for mock_tool_${i}`), + arg2: z.number().describe(`Number argument for mock_tool_${i}`), + arg3: z.boolean().describe(`Boolean argument for mock_tool_${i}`), + }, + async () => ({ content: [{ type: 'text' as const, text: `result from mock_tool_${i}` }] }), + ); +} + +const transport = new StdioServerTransport(); +await server.connect(transport);