Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 2 additions & 34 deletions src/daemon-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
removePidFile,
removeSocketFile,
} from './daemon.js';
import { sendIpcRequest } from './ipc.js';

// ============================================================================
// Daemon Connection
Expand Down Expand Up @@ -55,40 +56,7 @@ async function sendRequest(
socketPath: string,
request: DaemonRequest,
): Promise<DaemonResponse> {
return new Promise((resolve, reject) => {
const socket = Bun.connect({
unix: socketPath,
socket: {
open(socket) {
socket.write(JSON.stringify(request));
},
data(socket, data) {
try {
const response = JSON.parse(data.toString().trim());
socket.end();
resolve(response);
} catch (err) {
socket.end();
reject(new Error('Invalid response from daemon'));
}
},
error(socket, error) {
reject(error);
},
close() {
// Connection closed
},
connectError(socket, error) {
reject(error);
},
},
});

// Timeout after 5 seconds (fast fallback to direct connection)
setTimeout(() => {
reject(new Error('Daemon request timeout'));
}, 5000);
});
return sendIpcRequest(socketPath, request) as Promise<DaemonResponse>;
}

/**
Expand Down
23 changes: 3 additions & 20 deletions src/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
getSocketDir,
getSocketPath,
} from './config.js';
import { createIpcServer } from './ipc.js';

// ============================================================================
// Types
Expand Down Expand Up @@ -348,26 +349,8 @@ export async function runDaemon(

// Start Unix socket server
try {
server = Bun.listen({
unix: socketPath,
socket: {
open(socket) {
activeConnections.add(socket);
debug(`[daemon:${serverName}] Client connected`);
},
async data(socket, data) {
const response = await handleRequest(data);
socket.write(`${JSON.stringify(response)}\n`);
},
close(socket) {
activeConnections.delete(socket);
debug(`[daemon:${serverName}] Client disconnected`);
},
error(socket, error) {
debug(`[daemon:${serverName}] Socket error: ${error.message}`);
activeConnections.delete(socket);
},
},
server = createIpcServer(socketPath, async (request) => {
return await handleRequest(Buffer.from(JSON.stringify(request)));
});

debug(`[daemon:${serverName}] Listening on ${socketPath}`);
Expand Down
126 changes: 126 additions & 0 deletions src/ipc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* IPC primitives for daemon socket communication.
*
* Handles large message transmission over Unix sockets where Bun's
* socket.write() may only write up to the kernel buffer size (~8KB).
* Uses newline-delimited JSON as the wire protocol.
*/

/**
* Create a Unix socket server that receives JSON requests and sends
* newline-delimited JSON responses, handling partial writes via drain.
*/
export function createIpcServer(
socketPath: string,
handler: (request: unknown) => unknown,
) {
const readBuffers = new Map<unknown, string>();
const writeBuffers = new Map<unknown, string>();

function writeAll(socket: { write(data: string): number }, payload: string) {
const written = socket.write(payload);
if (written < payload.length) {
writeBuffers.set(socket, payload.slice(written));
}
}

return Bun.listen({
unix: socketPath,
socket: {
async data(socket, data) {
if (!readBuffers.has(socket)) readBuffers.set(socket, '');
readBuffers.set(socket, readBuffers.get(socket)! + data.toString());
const buf = readBuffers.get(socket)!;
try {
JSON.parse(buf);
readBuffers.delete(socket);
} catch {
return; // incomplete JSON, wait for more
}
const request = JSON.parse(buf);
const response = await handler(request);
writeAll(socket, JSON.stringify(response) + '\n');
},
drain(socket) {
const remaining = writeBuffers.get(socket);
if (remaining) {
writeBuffers.delete(socket);
writeAll(socket, remaining);
}
},
close(socket) {
readBuffers.delete(socket);
writeBuffers.delete(socket);
},
error(socket) {
readBuffers.delete(socket);
writeBuffers.delete(socket);
},
},
});
}

/**
* Send a JSON request to a Unix socket and receive a newline-delimited
* JSON response, buffering chunks until the full message arrives.
*/
export function sendIpcRequest(
socketPath: string,
request: unknown,
timeoutMs = 5000,
): Promise<unknown> {
return new Promise((resolve, reject) => {
let buffer = '';
let settled = false;

function settle(fn: () => void) {
if (!settled) {
settled = true;
fn();
}
}

Bun.connect({
unix: socketPath,
socket: {
open(socket) {
socket.write(JSON.stringify(request));
},
data(socket, data) {
buffer += data.toString();
const newlineIndex = buffer.indexOf('\n');
if (newlineIndex === -1) return;
const message = buffer.slice(0, newlineIndex).trim();
try {
const response = JSON.parse(message);
socket.end();
settle(() => resolve(response));
} catch {
socket.end();
settle(() => reject(new Error('Invalid response from daemon')));
}
},
close() {
if (buffer.length > 0) {
try {
const response = JSON.parse(buffer.trim());
settle(() => resolve(response));
} catch {
// timeout will handle it
}
}
},
error(_socket, error) {
settle(() => reject(error));
},
connectError(_socket, error) {
settle(() => reject(error));
},
},
});

setTimeout(() => {
settle(() => reject(new Error('Daemon request timeout')));
}, timeoutMs);
});
}
105 changes: 105 additions & 0 deletions tests/daemon-socket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Integration tests for daemon IPC socket communication.
*
* Bun's socket.write() may only write up to the kernel buffer size
* (typically 8192 bytes). These tests verify that the daemon correctly
* transmits large responses (like listTools with many tools) by
* exercising the real daemon process and CLI client code paths.
*/

import { describe, test, expect, beforeAll, afterAll } from 'bun:test';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { $ } from 'bun';

describe('Daemon large message handling', () => {
let tempDir: string;
let configPath: string;
const mockServerPath = join(import.meta.dir, 'fixtures', 'mock-mcp-server.ts');

beforeAll(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'mcp-cli-daemon-test-'));
configPath = join(tempDir, 'mcp_servers.json');
await writeFile(
configPath,
JSON.stringify({
mcpServers: {
'mock-large': {
command: 'bun',
args: ['run', mockServerPath],
},
},
}),
);
});

afterAll(async () => {
// Kill any daemon processes we spawned
try {
await $`pkill -f 'daemon.*mock-large'`.nothrow();
} catch {}
await rm(tempDir, { recursive: true, force: true });
});

test('lists all 50 tools via direct connection (baseline)', async () => {
const result =
await $`MCP_NO_DAEMON=1 MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath}`
.quiet()
.nothrow()
.then((r) => ({
stdout: r.stdout.toString(),
stderr: r.stderr.toString(),
exitCode: r.exitCode,
}));

expect(result.exitCode).toBe(0);
expect(result.stdout).toContain('mock_tool_0');
expect(result.stdout).toContain('mock_tool_49');

// Count tool lines (each starts with " • ")
const toolLines = result.stdout.split('\n').filter((l: string) => l.trim().startsWith('•'));
expect(toolLines.length).toBe(50);
}, 20000);

test('lists all 50 tools via daemon (exercises large IPC message)', async () => {
// This is the key test: the daemon's listTools response for 50 tools
// exceeds the 8KB kernel socket buffer, requiring drain-based writes.
// Without the fix, only the first ~8KB is sent and the client times out.
const result =
await $`MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath}`
.quiet()
.nothrow()
.then((r) => ({
stdout: r.stdout.toString(),
stderr: r.stderr.toString(),
exitCode: r.exitCode,
}));

expect(result.exitCode).toBe(0);
expect(result.stdout).toContain('mock_tool_0');
expect(result.stdout).toContain('mock_tool_49');

const toolLines = result.stdout.split('\n').filter((l: string) => l.trim().startsWith('•'));
expect(toolLines.length).toBe(50);
}, 20000);

test('daemon serves tool descriptions correctly for large payloads', async () => {
// With -d flag, descriptions are included — making the response even larger
const result =
await $`MCP_TIMEOUT=15000 bun run ${join(import.meta.dir, '..', 'src', 'index.ts')} -c ${configPath} -d`
.quiet()
.nothrow()
.then((r) => ({
stdout: r.stdout.toString(),
stderr: r.stderr.toString(),
exitCode: r.exitCode,
}));

expect(result.exitCode).toBe(0);
expect(result.stdout).toContain('mock_tool_0');
expect(result.stdout).toContain('mock_tool_49');
// Descriptions should be present
expect(result.stdout).toContain('intentionally long');
}, 20000);
});
36 changes: 36 additions & 0 deletions tests/fixtures/mock-mcp-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bun
/**
* Mock MCP server over stdio that returns a large number of tools.
* Used by daemon socket integration tests to ensure large IPC
* messages are transmitted correctly.
*
* Uses the official MCP SDK server/stdio transport.
*/

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';

const TOOL_COUNT = 50;

const server = new McpServer({
name: 'mock-large-server',
version: '1.0.0',
});

// Register many tools to create a large listTools response (>8KB)
for (let i = 0; i < TOOL_COUNT; i++) {
server.tool(
`mock_tool_${i}`,
`Mock tool number ${i}. This is a description that is intentionally long to inflate the payload size beyond the kernel socket buffer limit of 8192 bytes.`,
{
arg1: z.string().describe(`String argument for mock_tool_${i}`),
arg2: z.number().describe(`Number argument for mock_tool_${i}`),
arg3: z.boolean().describe(`Boolean argument for mock_tool_${i}`),
},
async () => ({ content: [{ type: 'text' as const, text: `result from mock_tool_${i}` }] }),
);
}

const transport = new StdioServerTransport();
await server.connect(transport);