Skip to content

Commit 2e7d223

Browse files
authored
Handle OpenViking outages without blocking OpenClaw (#1158)
1 parent 27d48bd commit 2e7d223

File tree

8 files changed

+734
-5
lines changed

8 files changed

+734
-5
lines changed

examples/openclaw-plugin/context-engine.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ export function createMemoryOpenVikingContextEngine(params: {
460460
cfg: Required<MemoryOpenVikingConfig>;
461461
logger: Logger;
462462
getClient: () => Promise<OpenVikingClient>;
463+
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
463464
/** Extra args help match hook-populated routing when OpenClaw provides sessionKey / OV session id. */
464465
resolveAgentId: (sessionId: string, sessionKey?: string, ovSessionId?: string) => string;
465466
rememberSessionAgentId?: (ctx: {
@@ -476,6 +477,7 @@ export function createMemoryOpenVikingContextEngine(params: {
476477
cfg,
477478
logger,
478479
getClient,
480+
quickPrecheck,
479481
resolveAgentId,
480482
rememberSessionAgentId,
481483
} = params;
@@ -543,6 +545,30 @@ export function createMemoryOpenVikingContextEngine(params: {
543545
return typeof agentId === "string" && agentId.trim() ? agentId.trim() : undefined;
544546
}
545547

548+
async function runLocalPrecheck(
549+
stage: "assemble" | "afterTurn",
550+
sessionId: string,
551+
extra: Record<string, unknown> = {},
552+
): Promise<boolean> {
553+
if (cfg.mode !== "local" || !quickPrecheck) {
554+
return true;
555+
}
556+
const result = await quickPrecheck();
557+
if (result.ok) {
558+
return true;
559+
}
560+
warnOrInfo(
561+
logger,
562+
`openviking: ${stage} precheck failed for session=${sessionId}: ${result.reason}`,
563+
);
564+
diag(`${stage}_skip`, sessionId, {
565+
reason: "precheck_failed",
566+
precheckReason: result.reason,
567+
...extra,
568+
});
569+
return false;
570+
}
571+
546572
return {
547573
info: {
548574
id,
@@ -586,6 +612,11 @@ export function createMemoryOpenVikingContextEngine(params: {
586612
});
587613

588614
try {
615+
if (!(await runLocalPrecheck("assemble", OVSessionId, {
616+
tokenBudget,
617+
}))) {
618+
return { messages, estimatedTokens: roughEstimate(messages) };
619+
}
589620
const client = await getClient();
590621
const routingRef =
591622
assembleParams.sessionId ?? sessionKey ?? OVSessionId;
@@ -767,6 +798,13 @@ export function createMemoryOpenVikingContextEngine(params: {
767798
messages: newMsgFull,
768799
});
769800

801+
if (!(await runLocalPrecheck("afterTurn", OVSessionId, {
802+
totalMessages: messages.length,
803+
newMessageCount: newCount,
804+
prePromptMessageCount: start,
805+
}))) {
806+
return;
807+
}
770808
const client = await getClient();
771809
const turnText = newTexts.join("\n");
772810
const sanitized = turnText.replace(/<relevant-memories>[\s\S]*?<\/relevant-memories>/gi, " ").replace(/\s+/g, " ").trim();

examples/openclaw-plugin/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
} from "./memory-ranking.js";
2424
import {
2525
IS_WIN,
26-
waitForHealth,
26+
waitForHealthOrExit,
2727
quickHealthCheck,
2828
quickRecallPrecheck,
2929
withTimeout,
@@ -343,6 +343,10 @@ const contextEnginePlugin = {
343343
entry.resolve = resolve;
344344
entry.reject = reject;
345345
});
346+
// Service startup can reject this shared promise before any hook/tool
347+
// awaits it. Attach a sink now so expected local-startup failures do
348+
// not surface as process-level unhandled rejections.
349+
void entry.promise.catch(() => {});
346350
clientPromise = entry.promise;
347351
localClientPendingPromises.set(localCacheKey, entry);
348352
}
@@ -990,6 +994,10 @@ const contextEnginePlugin = {
990994
cfg,
991995
logger: api.logger,
992996
getClient,
997+
quickPrecheck:
998+
cfg.mode === "local"
999+
? () => quickRecallPrecheck(cfg.mode, cfg.baseUrl, cfg.port, localProcess)
1000+
: undefined,
9931001
resolveAgentId,
9941002
rememberSessionAgentId,
9951003
});
@@ -1094,7 +1102,7 @@ const contextEnginePlugin = {
10941102
api.logger.warn(`openviking: subprocess exited (code=${code}, signal=${signal})${out}`);
10951103
});
10961104
try {
1097-
await waitForHealth(baseUrl, timeoutMs, intervalMs);
1105+
await waitForHealthOrExit(baseUrl, timeoutMs, intervalMs, child);
10981106
const client = new OpenVikingClient(
10991107
baseUrl,
11001108
cfg.apiKey,
@@ -1171,7 +1179,7 @@ const contextEnginePlugin = {
11711179
api.logger.warn(`openviking: re-spawned subprocess exited (code=${code}, signal=${signal})`);
11721180
});
11731181
try {
1174-
await waitForHealth(baseUrl, timeoutMs, intervalMs);
1182+
await waitForHealthOrExit(baseUrl, timeoutMs, intervalMs, child);
11751183
const client = new OpenVikingClient(baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs);
11761184
localClientCache.set(localCacheKey, { client, process: child });
11771185
if (resolveLocalClient) {

examples/openclaw-plugin/process-manager.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,68 @@ export function waitForHealth(baseUrl: string, timeoutMs: number, intervalMs: nu
2929
});
3030
}
3131

32+
export function waitForHealthOrExit(
33+
baseUrl: string,
34+
timeoutMs: number,
35+
intervalMs: number,
36+
child: ReturnType<typeof spawn>,
37+
): Promise<void> {
38+
const exited =
39+
child.killed || child.exitCode !== null || child.signalCode !== null;
40+
if (exited) {
41+
return Promise.reject(
42+
new Error(
43+
`OpenViking subprocess exited before health check ` +
44+
`(code=${child.exitCode}, signal=${child.signalCode})`,
45+
),
46+
);
47+
}
48+
49+
return new Promise((resolve, reject) => {
50+
let settled = false;
51+
52+
const cleanup = () => {
53+
child.off?.("error", onError);
54+
child.off?.("exit", onExit);
55+
};
56+
57+
const finishResolve = () => {
58+
if (settled) {
59+
return;
60+
}
61+
settled = true;
62+
cleanup();
63+
resolve();
64+
};
65+
66+
const finishReject = (err: unknown) => {
67+
if (settled) {
68+
return;
69+
}
70+
settled = true;
71+
cleanup();
72+
reject(err instanceof Error ? err : new Error(String(err)));
73+
};
74+
75+
const onError = (err: Error) => {
76+
finishReject(err);
77+
};
78+
79+
const onExit = (code: number | null, signal: string | null) => {
80+
finishReject(
81+
new Error(
82+
`OpenViking subprocess exited before health check ` +
83+
`(code=${code}, signal=${signal})`,
84+
),
85+
);
86+
};
87+
88+
child.once("error", onError);
89+
child.once("exit", onExit);
90+
waitForHealth(baseUrl, timeoutMs, intervalMs).then(finishResolve, finishReject);
91+
});
92+
}
93+
3294
export function withTimeout<T>(promise: Promise<T>, timeoutMs: number, timeoutMessage: string): Promise<T> {
3395
return new Promise((resolve, reject) => {
3496
const timer = setTimeout(() => reject(new Error(timeoutMessage)), timeoutMs);

examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ function makeEngine(opts?: {
1717
commitTokenThreshold?: number;
1818
getSession?: Record<string, unknown>;
1919
addSessionMessageError?: Error;
20+
cfgOverrides?: Record<string, unknown>;
21+
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
2022
}) {
2123
const cfg = memoryOpenVikingConfigSchema.parse({
2224
mode: "remote",
@@ -26,6 +28,7 @@ function makeEngine(opts?: {
2628
ingestReplyAssist: false,
2729
commitTokenThreshold: opts?.commitTokenThreshold ?? 20000,
2830
emitStandardDiagnostics: true,
31+
...(opts?.cfgOverrides ?? {}),
2932
});
3033
const logger = makeLogger();
3134

@@ -63,6 +66,7 @@ function makeEngine(opts?: {
6366
cfg,
6467
logger,
6568
getClient,
69+
quickPrecheck: opts?.quickPrecheck,
6670
resolveAgentId,
6771
});
6872

@@ -108,6 +112,34 @@ describe("context-engine afterTurn()", () => {
108112
);
109113
});
110114

115+
it("skips immediately when local precheck reports OpenViking unavailable", async () => {
116+
const quickPrecheck = vi.fn().mockResolvedValue({
117+
ok: false as const,
118+
reason: "local process is not running",
119+
});
120+
const { engine, client, getClient, logger } = makeEngine({
121+
cfgOverrides: {
122+
mode: "local",
123+
port: 1933,
124+
},
125+
quickPrecheck,
126+
});
127+
128+
await engine.afterTurn!({
129+
sessionId: "s1",
130+
sessionFile: "",
131+
messages: [{ role: "user", content: "hello" }],
132+
prePromptMessageCount: 0,
133+
});
134+
135+
expect(quickPrecheck).toHaveBeenCalledTimes(1);
136+
expect(getClient).not.toHaveBeenCalled();
137+
expect(client.addSessionMessage).not.toHaveBeenCalled();
138+
expect(logger.warn).toHaveBeenCalledWith(
139+
expect.stringContaining("afterTurn precheck failed"),
140+
);
141+
});
142+
111143
it("skips when no new user/assistant messages after prePromptMessageCount", async () => {
112144
const { engine, client, logger } = makeEngine();
113145

examples/openclaw-plugin/tests/ut/context-engine-assemble.test.ts

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,34 @@ function makeStats() {
3535
};
3636
}
3737

38-
function makeEngine(contextResult: unknown) {
38+
function makeEngine(
39+
contextResult: unknown,
40+
opts?: {
41+
cfgOverrides?: Record<string, unknown>;
42+
quickPrecheck?: () => Promise<{ ok: true } | { ok: false; reason: string }>;
43+
},
44+
) {
3945
const logger = makeLogger();
4046
const client = {
4147
getSessionContext: vi.fn().mockResolvedValue(contextResult),
4248
} as unknown as OpenVikingClient;
4349
const getClient = vi.fn().mockResolvedValue(client);
4450
const resolveAgentId = vi.fn((sessionId: string) => `agent:${sessionId}`);
51+
const localCfg = opts?.cfgOverrides
52+
? memoryOpenVikingConfigSchema.parse({
53+
...cfg,
54+
...opts.cfgOverrides,
55+
})
56+
: cfg;
4557

4658
const engine = createMemoryOpenVikingContextEngine({
4759
id: "openviking",
4860
name: "Context Engine (OpenViking)",
4961
version: "test",
50-
cfg,
62+
cfg: localCfg,
5163
logger,
5264
getClient,
65+
quickPrecheck: opts?.quickPrecheck,
5366
resolveAgentId,
5467
});
5568

@@ -142,6 +155,47 @@ describe("context-engine assemble()", () => {
142155
});
143156
});
144157

158+
it("falls back immediately when local precheck reports OpenViking unavailable", async () => {
159+
const quickPrecheck = vi.fn().mockResolvedValue({
160+
ok: false as const,
161+
reason: "local process is not running",
162+
});
163+
const { engine, client, getClient, logger } = makeEngine(
164+
{
165+
latest_archive_overview: "unused",
166+
pre_archive_abstracts: [],
167+
messages: [],
168+
estimatedTokens: 123,
169+
stats: makeStats(),
170+
},
171+
{
172+
cfgOverrides: {
173+
mode: "local",
174+
port: 1933,
175+
},
176+
quickPrecheck,
177+
},
178+
);
179+
180+
const liveMessages = [{ role: "user", content: "fallback live message" }];
181+
const result = await engine.assemble({
182+
sessionId: "session-local",
183+
messages: liveMessages,
184+
tokenBudget: 4096,
185+
});
186+
187+
expect(quickPrecheck).toHaveBeenCalledTimes(1);
188+
expect(getClient).not.toHaveBeenCalled();
189+
expect(client.getSessionContext).not.toHaveBeenCalled();
190+
expect(result).toEqual({
191+
messages: liveMessages,
192+
estimatedTokens: roughEstimate(liveMessages),
193+
});
194+
expect(logger.warn).toHaveBeenCalledWith(
195+
expect.stringContaining("assemble precheck failed"),
196+
);
197+
});
198+
145199
it("emits a non-error toolResult for a running tool (not a synthetic error)", async () => {
146200
const { engine } = makeEngine({
147201
latest_archive_overview: "",

0 commit comments

Comments
 (0)