Skip to content
This repository was archived by the owner on Jan 10, 2025. It is now read-only.

Commit ab2092d

Browse files
committed
improve metrics and persist cursors
1 parent 8c56670 commit ab2092d

File tree

5 files changed

+55
-24
lines changed

5 files changed

+55
-24
lines changed

pnpm-lock.yaml

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

substream-listener/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
"dev": "tsx --watch ./src/index.mts | pino-pretty"
77
},
88
"dependencies": {
9-
"@substreams/core": "^0.16.0",
10-
"@substreams/manifest": "^0.15.0",
11-
"@substreams/node": "^0.6.2",
9+
"@connectrpc/connect-node": "1.4.0",
10+
"@substreams/core": "0.16.0",
11+
"@substreams/manifest": "0.15.0",
12+
"@substreams/node": "0.6.2",
1213
"bullmq": "^5.8.1",
1314
"fets": "^0.8.0",
1415
"ioredis": "^5.4.1",

substream-listener/src/index.mts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { createRouter, Response } from "fets";
22
import { App } from "uWebSockets.js";
33
import { isAddress } from "viem";
4-
import { sendWebhook } from "./send-webhook.mjs";
54
import {
65
invalidHttpRequests,
76
registry,

substream-listener/src/prometheus.mts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const DEFAULT_LABEL_NAMES = [
1212
"module_hash",
1313
"contract_address",
1414
"output_module",
15+
"app_id",
1516
] as const;
1617

1718
function calculateHeadBlockTimeDrift(clock: Clock) {
@@ -20,7 +21,6 @@ function calculateHeadBlockTimeDrift(clock: Clock) {
2021
}
2122

2223
// Counters
23-
2424
export const invalidHttpRequests = new promClient.Counter({
2525
name: "http_invalid_request",
2626
help: "The number of invalid HTTP requests received",
@@ -121,6 +121,7 @@ export function onPrometheusMetrics(
121121
substreamsEndpoint: string;
122122
contractAddress: string;
123123
moduleHash: string;
124+
appId: string;
124125
},
125126
) {
126127
manifest?.set(
@@ -132,6 +133,7 @@ export function onPrometheusMetrics(
132133
module_hash: options.moduleHash,
133134
contract_address: options.contractAddress,
134135
output_module: emitter.request.outputModule,
136+
app_id: options.appId,
135137
},
136138
1,
137139
);
@@ -146,6 +148,7 @@ export function onPrometheusMetrics(
146148
module_hash: options.moduleHash,
147149
contract_address: options.contractAddress,
148150
output_module: emitter.request.outputModule,
151+
app_id: options.appId,
149152
},
150153
1,
151154
);
@@ -157,6 +160,7 @@ export function onPrometheusMetrics(
157160
module_hash: options.moduleHash,
158161
contract_address: options.contractAddress,
159162
output_module: emitter.request.outputModule,
163+
app_id: options.appId,
160164
})
161165
.inc(1),
162166
);
@@ -167,6 +171,7 @@ export function onPrometheusMetrics(
167171
module_hash: options.moduleHash,
168172
contract_address: options.contractAddress,
169173
output_module: emitter.request.outputModule,
174+
app_id: options.appId,
170175
})
171176
.inc(1);
172177

@@ -175,6 +180,7 @@ export function onPrometheusMetrics(
175180
module_hash: options.moduleHash,
176181
contract_address: options.contractAddress,
177182
output_module: emitter.request.outputModule,
183+
app_id: options.appId,
178184
})
179185
?.inc(block.toBinary().byteLength);
180186

@@ -183,6 +189,7 @@ export function onPrometheusMetrics(
183189
module_hash: options.moduleHash,
184190
contract_address: options.contractAddress,
185191
output_module: emitter.request.outputModule,
192+
app_id: options.appId,
186193
},
187194
1,
188195
);
@@ -193,6 +200,7 @@ export function onPrometheusMetrics(
193200
module_hash: options.moduleHash,
194201
contract_address: options.contractAddress,
195202
output_module: emitter.request.outputModule,
203+
app_id: options.appId,
196204
},
197205
Number(block.clock.number),
198206
);
@@ -202,6 +210,7 @@ export function onPrometheusMetrics(
202210
module_hash: options.moduleHash,
203211
contract_address: options.contractAddress,
204212
output_module: emitter.request.outputModule,
213+
app_id: options.appId,
205214
},
206215
calculateHeadBlockTimeDrift(block.clock),
207216
);
@@ -211,6 +220,7 @@ export function onPrometheusMetrics(
211220
module_hash: options.moduleHash,
212221
contract_address: options.contractAddress,
213222
output_module: emitter.request.outputModule,
223+
app_id: options.appId,
214224
},
215225
Number(block.clock.timestamp?.seconds),
216226
);

substream-listener/src/scheduler.mts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ import { Redis } from "ioredis";
33
import pTimeout from "p-timeout";
44
import { type Logger } from "pino";
55
import * as prometheus from "./prometheus.mjs";
6+
import { createGrpcWebTransport } from "@connectrpc/connect-node";
67
import {
78
applyParams,
9+
createAuthInterceptor,
810
createModuleHashHex,
911
createRegistry,
1012
createRequest,
1113
} from "@substreams/core";
1214
import { readPackage } from "@substreams/manifest";
1315
import { BlockEmitter } from "@substreams/node";
14-
import { createNodeTransport } from "@substreams/node/createNodeTransport";
1516
import { fileURLToPath } from "node:url";
1617
import path from "node:path";
1718
import { Svix } from "svix";
@@ -93,7 +94,13 @@ export function createScheduler(config: {
9394
const worker = new Worker<Input>(
9495
config.queueName,
9596
async (job) => {
96-
logger.info(
97+
const jobLogger = logger.child({
98+
jobId: job.id,
99+
jobName: job.name,
100+
});
101+
const cursorKey = `cursor:${job.name}:${job.id}`.toLowerCase();
102+
103+
jobLogger.info(
97104
{
98105
payload: job.data,
99106
},
@@ -114,7 +121,7 @@ export function createScheduler(config: {
114121
throw new Error("No modules found in substream package");
115122
}
116123

117-
logger.debug(
124+
jobLogger.debug(
118125
{ contractAddress },
119126
"Applying params to substream package",
120127
);
@@ -127,18 +134,26 @@ export function createScheduler(config: {
127134
substreamPackage.modules,
128135
outputModule,
129136
);
130-
logger.debug({ moduleHash }, "Module hash");
137+
jobLogger.debug({ moduleHash }, "Module hash");
131138

132139
const registry = createRegistry(substreamPackage);
133-
const transport = createNodeTransport(
134-
substreamsEndpoint,
135-
token,
136-
registry,
137-
);
140+
const transport = createGrpcWebTransport({
141+
baseUrl: substreamsEndpoint,
142+
httpVersion: "2",
143+
interceptors: [createAuthInterceptor(token)],
144+
jsonOptions: {
145+
typeRegistry: registry,
146+
},
147+
});
148+
149+
const startCursor =
150+
(await redisConnection?.get(cursorKey)) || undefined;
151+
logger.debug({ startCursor }, "Starting from cursor");
138152
const request = createRequest({
139153
substreamPackage,
140154
outputModule,
141155
startBlockNum: startBlock,
156+
startCursor,
142157
});
143158

144159
// NodeJS Events
@@ -150,16 +165,19 @@ export function createScheduler(config: {
150165
substreamsEndpoint,
151166
contractAddress,
152167
moduleHash,
168+
appId,
169+
});
170+
171+
emitter.on("cursor", async (cursor) => {
172+
if (cursor) {
173+
await redisConnection?.set(cursorKey, cursor);
174+
}
153175
});
154176

155177
// Stream Blocks
156178
emitter.on("anyMessage", async (message, cursor, clock) => {
157179
const transfers = (message.transfers || []) as any[];
158180

159-
logger.debug(
160-
{ transfers: transfers.length },
161-
"Sending transfers to Svix",
162-
);
163181
const events = transfers.map((transfer) => {
164182
return svix.message.create(appId, {
165183
eventType: "erc721.transfer",
@@ -173,7 +191,7 @@ export function createScheduler(config: {
173191
try {
174192
await Promise.all(events);
175193
} catch (e) {
176-
logger.error({ error: e }, "Error sending events to Svix");
194+
jobLogger.error({ error: e }, "Error sending events to Svix");
177195
}
178196
});
179197

@@ -184,15 +202,15 @@ export function createScheduler(config: {
184202
// End of Stream
185203
emitter.on("close", (error) => {
186204
if (error) {
187-
logger.error({ error }, "Error closing stream");
205+
jobLogger.error({ error }, "Error closing stream");
188206
reject(error);
189207
}
190208
resolve("Stream closed");
191209
});
192210

193211
// Fatal Error
194212
emitter.on("fatalError", (error) => {
195-
logger.fatal({ error }, "Fatal error in stream");
213+
jobLogger.fatal({ error }, "Fatal error in stream");
196214
reject(error);
197215
});
198216
});

0 commit comments

Comments
 (0)