Skip to content

Commit 5f13912

Browse files
authored
feat: callback to intercept and modify predicates before re-registration (#675)
Closes #674 * First commit adds an optional callback that clients can use to modify an inactive predicate before it is re-registered. * Second commit fixes an issue where the the `setInterval` could result in overlapping healthcheck code running if something like the callback (which could be waiting on a postgres connection, for example) take a long time.
1 parent 8247900 commit 5f13912

File tree

3 files changed

+63
-25
lines changed

3 files changed

+63
-25
lines changed

components/client/typescript/src/index.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Static, Type } from '@fastify/type-provider-typebox';
66
import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this';
77
import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this';
88
import { logger } from './util/logger';
9+
import { PredicateSchema } from './schemas/predicate';
910

1011
const EventObserverOptionsSchema = Type.Object({
1112
/** Event observer host name (usually '0.0.0.0') */
@@ -40,6 +41,9 @@ const EventObserverOptionsSchema = Type.Object({
4041
* up to date. If they become obsolete, we will attempt to re-register them.
4142
*/
4243
predicate_health_check_interval_ms: Type.Optional(Type.Integer({ default: 5000 })),
44+
predicate_re_register_callback: Type.Optional(
45+
Type.Function([PredicateSchema], Type.Promise(PredicateSchema))
46+
),
4347
});
4448
/** Chainhook event observer configuration options */
4549
export type EventObserverOptions = Static<typeof EventObserverOptionsSchema>;
@@ -126,14 +130,24 @@ export class ChainhookEventObserver {
126130
this.fastify = await buildServer(this.observer, this.chainhook, predicates, callback);
127131
await this.fastify.listen({ host: this.observer.hostname, port: this.observer.port });
128132
if (this.observer.predicate_health_check_interval_ms && this.healthCheckTimer === undefined) {
129-
this.healthCheckTimer = setInterval(() => {
130-
predicateHealthCheck(this.observer, this.chainhook).catch(err =>
131-
logger.error(err, `ChainhookEventObserver predicate health check error`)
132-
);
133-
}, this.observer.predicate_health_check_interval_ms);
133+
this.scheduleHealthCheck();
134134
}
135135
}
136136

137+
private scheduleHealthCheck() {
138+
this.healthCheckTimer = setTimeout(() => {
139+
void predicateHealthCheck(this.observer, this.chainhook)
140+
.catch(err => {
141+
logger.error(err, `ChainhookEventObserver predicate health check error`);
142+
})
143+
.finally(() => {
144+
if (this.healthCheckTimer) {
145+
this.scheduleHealthCheck();
146+
}
147+
});
148+
}, this.observer.predicate_health_check_interval_ms);
149+
}
150+
137151
/**
138152
* Stop the Chainhook event server gracefully.
139153
*/

components/client/typescript/src/predicates.ts

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import * as fs from 'fs';
1+
import * as fs from 'fs/promises';
22
import * as path from 'path';
33
import { logger } from './util/logger';
44
import {
@@ -17,17 +17,32 @@ const RegisteredPredicates = new Map<string, Predicate>();
1717

1818
const CompiledPredicateSchema = TypeCompiler.Compile(PredicateSchema);
1919

20+
// Async version of fs.existsSync
21+
async function pathExists(path: string): Promise<boolean> {
22+
try {
23+
await fs.access(path);
24+
return true;
25+
} catch (error) {
26+
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
27+
return false;
28+
}
29+
throw error; // Re-throw other errors (e.g., permission issues)
30+
}
31+
}
32+
2033
/**
2134
* Looks on disk and returns a map of registered Predicates, where the key is the predicate `name`
2235
* as defined by the user.
2336
*/
24-
export function recallPersistedPredicatesFromDisk(basePath: string): Map<string, Predicate> {
37+
export async function recallPersistedPredicatesFromDisk(
38+
basePath: string
39+
): Promise<Map<string, Predicate>> {
2540
RegisteredPredicates.clear();
2641
try {
27-
if (!fs.existsSync(basePath)) return RegisteredPredicates;
28-
for (const file of fs.readdirSync(basePath)) {
42+
if (!(await pathExists(basePath))) return RegisteredPredicates;
43+
for (const file of await fs.readdir(basePath)) {
2944
if (file.endsWith('.json')) {
30-
const text = fs.readFileSync(path.join(basePath, file), 'utf-8');
45+
const text = await fs.readFile(path.join(basePath, file), 'utf-8');
3146
const predicate = JSON.parse(text) as JSON;
3247
if (CompiledPredicateSchema.Check(predicate)) {
3348
logger.info(
@@ -44,11 +59,11 @@ export function recallPersistedPredicatesFromDisk(basePath: string): Map<string,
4459
return RegisteredPredicates;
4560
}
4661

47-
export function savePredicateToDisk(basePath: string, predicate: Predicate) {
62+
export async function savePredicateToDisk(basePath: string, predicate: Predicate) {
4863
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
4964
try {
50-
fs.mkdirSync(basePath, { recursive: true });
51-
fs.writeFileSync(predicatePath, JSON.stringify(predicate, null, 2));
65+
await fs.mkdir(basePath, { recursive: true });
66+
await fs.writeFile(predicatePath, JSON.stringify(predicate, null, 2));
5267
logger.info(
5368
`ChainhookEventObserver persisted predicate '${predicate.name}' (${predicate.uuid}) to disk`
5469
);
@@ -60,13 +75,18 @@ export function savePredicateToDisk(basePath: string, predicate: Predicate) {
6075
}
6176
}
6277

63-
function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
78+
async function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
6479
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
65-
if (fs.existsSync(predicatePath)) {
66-
fs.rmSync(predicatePath);
80+
try {
81+
await fs.rm(predicatePath);
6782
logger.info(
6883
`ChainhookEventObserver deleted predicate '${predicate.name}' (${predicate.uuid}) from disk`
6984
);
85+
} catch (error: unknown) {
86+
// ignore if the file doesn't exist
87+
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
88+
logger.error(error, `Failed to delete predicate`);
89+
}
7090
}
7191
}
7292

@@ -146,11 +166,15 @@ async function registerPredicate(
146166
authorization_header: `Bearer ${observer.auth_token}`,
147167
},
148168
};
149-
const newPredicate = pendingPredicate as Predicate;
169+
let newPredicate = pendingPredicate as Predicate;
150170
newPredicate.uuid = randomUUID();
151171
if (newPredicate.networks.mainnet) newPredicate.networks.mainnet.then_that = thenThat;
152172
if (newPredicate.networks.testnet) newPredicate.networks.testnet.then_that = thenThat;
153173

174+
if (observer.predicate_re_register_callback) {
175+
newPredicate = await observer.predicate_re_register_callback(newPredicate);
176+
}
177+
154178
const path = observer.node_type === 'chainhook' ? `/v1/chainhooks` : `/v1/observers`;
155179
await request(`${chainhook.base_url}${path}`, {
156180
method: 'POST',
@@ -161,7 +185,7 @@ async function registerPredicate(
161185
logger.info(
162186
`ChainhookEventObserver registered '${newPredicate.name}' predicate (${newPredicate.uuid})`
163187
);
164-
savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
188+
await savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
165189
RegisteredPredicates.set(newPredicate.name, newPredicate);
166190
} catch (error) {
167191
logger.error(error, `ChainhookEventObserver unable to register predicate`);
@@ -186,7 +210,7 @@ async function removePredicate(
186210
throwOnError: true,
187211
});
188212
logger.info(`ChainhookEventObserver removed predicate '${predicate.name}' (${predicate.uuid})`);
189-
deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
213+
await deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
190214
} catch (error) {
191215
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
192216
}
@@ -203,7 +227,7 @@ export async function registerAllPredicatesOnObserverReady(
203227
logger.info(`ChainhookEventObserver does not have predicates to register`);
204228
return;
205229
}
206-
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
230+
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
207231
for (const predicate of predicates)
208232
await registerPredicate(predicate, diskPredicates, observer, chainhook);
209233
}
@@ -213,7 +237,7 @@ export async function removeAllPredicatesOnObserverClose(
213237
observer: EventObserverOptions,
214238
chainhook: ChainhookNodeOptions
215239
) {
216-
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
240+
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
217241
if (diskPredicates.size === 0) {
218242
logger.info(`ChainhookEventObserver does not have predicates to close`);
219243
return;

components/client/typescript/tests/predicates.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('predicates', () => {
8383
await server.start([testPredicate], async () => {});
8484

8585
expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
86-
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
86+
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
8787
const storedPredicate = disk.get('test');
8888
expect(storedPredicate).not.toBeUndefined();
8989
expect(storedPredicate?.name).toBe(testPredicate.name);
@@ -102,8 +102,8 @@ describe('predicates', () => {
102102
});
103103

104104
describe('pre-stored', () => {
105-
beforeEach(() => {
106-
savePredicateToDisk(observer.predicate_disk_file_path, {
105+
beforeEach(async () => {
106+
await savePredicateToDisk(observer.predicate_disk_file_path, {
107107
uuid: 'e2777d77-473a-4c1d-9012-152deb36bf4c',
108108
name: 'test',
109109
version: 1,
@@ -164,7 +164,7 @@ describe('predicates', () => {
164164

165165
mockAgent.assertNoPendingInterceptors();
166166
expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
167-
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
167+
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
168168
const storedPredicate = disk.get('test');
169169
// Should have a different uuid
170170
expect(storedPredicate?.uuid).not.toBe('e2777d77-473a-4c1d-9012-152deb36bf4c');

0 commit comments

Comments
 (0)