From 660fba59b0b24f7fae750e304e0c7b6336587848 Mon Sep 17 00:00:00 2001 From: Kevin Ingersoll Date: Fri, 18 Oct 2024 17:58:25 +0100 Subject: [PATCH] merge: alvs/ws-keepalive --- packages/store-sync/src/wiresaw.ts | 138 ++++++++++++++++++----------- 1 file changed, 86 insertions(+), 52 deletions(-) diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index 038016a83d..1e5206d978 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -4,6 +4,7 @@ import { StorageAdapterBlock, StoreEventsLog } from "./common"; import { storeEventsAbi } from "@latticexyz/store"; import { logSort } from "@latticexyz/common"; import { SocketRpcClient, getWebSocketRpcClient } from "viem/utils"; +import { debug } from "./debug"; type WatchLogsInput = { url: string; @@ -16,70 +17,103 @@ type WatchLogsResult = { }; export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLogsResult { - // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed - let caughtUp = false; - const logBuffer: StoreEventsLog[] = []; - const topics = [ storeEventsAbi.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name })), ] as LogTopic[]; // https://github.com/wevm/viem/blob/63a5ac86eb9a2962f7323b4cc76ef54f9f5ef7ed/src/actions/public/getLogs.ts#L171 + let resumeBlock = fromBlock; + let keepAliveInterval: ReturnType | undefined = undefined; const logs$ = new Observable((subscriber) => { let client: SocketRpcClient; - getWebSocketRpcClient(url, { keepAlive: true, reconnect: true }).then(async (_client) => { - client = _client; - client.socket.addEventListener("error", (error) => - subscriber.error({ code: -32603, message: "WebSocket error", data: error }), - ); - - // Start watching pending logs - const subscriptionId: Hex = ( - await client.requestAsync({ - body: { - method: "wiresaw_watchLogs", - params: [{ address, topics }], - }, - }) - ).result; - - // Listen for wiresaw_watchLogs subscription - // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. - // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) - client.socket.addEventListener("message", (message) => { - const response = JSON.parse(message.data); - if ("error" in response) { - // Return JSON-RPC errors to the subscriber - subscriber.error(response.error); - return; - } - // Parse the logs from wiresaw_watchLogs - if ("params" in response && response.params.subscription === subscriptionId) { - const logs: RpcLog[] = response.params.result; - const formattedLogs = logs.map((log) => formatLog(log)); - const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); - if (caughtUp) { - const blockNumber = parsedLogs[0].blockNumber; - subscriber.next({ blockNumber, logs: parsedLogs }); - } else { - logBuffer.push(...parsedLogs); + function setupClient(): void { + // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed + let caughtUp = false; + const logBuffer: StoreEventsLog[] = []; + + getWebSocketRpcClient(url, { + keepAlive: false, // keepAlive is handled below + }).then(async (_client) => { + client = _client; + client.socket.addEventListener("error", (error) => + subscriber.error({ code: -32603, message: "WebSocket error", data: error }), + ); + + // Start watching pending logs + const subscriptionId: Hex = ( + await client.requestAsync({ + body: { + method: "wiresaw_watchLogs", + params: [{ address, topics }], + }, + }) + ).result; + + // Listen for wiresaw_watchLogs subscription + // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. + // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) + client.socket.addEventListener("message", (message) => { + const response = JSON.parse(message.data); + if ("error" in response) { + // Return JSON-RPC errors to the subscriber + subscriber.error(response.error); + return; + } + + // Parse the logs from wiresaw_watchLogs + if ("params" in response && response.params.subscription === subscriptionId) { + const logs: RpcLog[] = response.params.result; + const formattedLogs = logs.map((log) => formatLog(log)); + const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + if (caughtUp) { + const blockNumber = parsedLogs[0].blockNumber; + subscriber.next({ blockNumber, logs: parsedLogs }); + resumeBlock = blockNumber + 1n; + } else { + logBuffer.push(...parsedLogs); + } } + }); + + // Catch up to the pending logs + try { + const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + const logs = [...initialLogs, ...logBuffer].sort(logSort); + const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; + subscriber.next({ blockNumber, logs: initialLogs }); + resumeBlock = blockNumber + 1n; + caughtUp = true; + } catch (e) { + subscriber.error("Could not fetch initial wiresaw logs"); } + + // Keep websocket alive and reconnect if it's not alive anymore + keepAliveInterval = setInterval(async () => { + try { + await Promise.race([ + client.requestAsync({ body: { method: "net_version" } }), + new Promise((_, reject) => { + setTimeout(reject, 2000); + }), + ]); + } catch { + debug("Detected unresponsive websocket, reconnecting..."); + clearInterval(keepAliveInterval); + client.close(); + setupClient(); + } + }, 3000); }); + } - // Catch up to the pending logs - try { - const initialLogs = await fetchInitialLogs({ client, address, fromBlock, topics }); - const logs = [...initialLogs, ...logBuffer].sort(logSort); - const blockNumber = logs.at(-1)?.blockNumber ?? fromBlock; - subscriber.next({ blockNumber, logs: initialLogs }); - caughtUp = true; - } catch (e) { - subscriber.error("Could not fetch initial wiresaw logs"); - } - }); + setupClient(); - return () => client?.close(); + return () => { + client?.close(); + if (keepAliveInterval != null) { + clearInterval(keepAliveInterval); + } + }; }); return { logs$ };