Skip to content

Commit 651e281

Browse files
feat: use a single Postgres connection for all namespaces
The adapter will now create one single Postgres connection for all namespaces, instead of one per namespace, which could lead to performance issues. Related: #3
1 parent 829a1f5 commit 651e281

File tree

1 file changed

+99
-68
lines changed

1 file changed

+99
-68
lines changed

Diff for: lib/index.ts

+99-68
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ export interface PostgresAdapterOptions {
125125
errorHandler: (err: Error) => void;
126126
}
127127

128+
const defaultErrorHandler = (err: Error) => debug(err);
129+
128130
/**
129131
* Returns a function that will create a PostgresAdapter instance.
130132
*
@@ -137,8 +139,104 @@ export function createAdapter(
137139
pool: Pool,
138140
opts: Partial<PostgresAdapterOptions> = {}
139141
) {
142+
const errorHandler = opts.errorHandler || defaultErrorHandler;
143+
const tableName = opts.tableName || "socket_io_attachments";
144+
const cleanupInterval = opts.cleanupInterval || 30000;
145+
146+
const channelToAdapters = new Map<string, PostgresAdapter>();
147+
let isConnectionInProgress = false;
148+
let client: any;
149+
let cleanupTimer: NodeJS.Timer;
150+
151+
const scheduleReconnection = () => {
152+
const reconnectionDelay = Math.floor(2000 * (0.5 + Math.random()));
153+
setTimeout(initClient, reconnectionDelay);
154+
};
155+
156+
const initClient = async () => {
157+
try {
158+
debug("fetching client from the pool");
159+
client = await pool.connect();
160+
isConnectionInProgress = false;
161+
162+
for (const [channel] of channelToAdapters) {
163+
debug("client listening to %s", channel);
164+
await client.query(`LISTEN "${channel}"`);
165+
}
166+
167+
client.on("notification", async (msg: any) => {
168+
try {
169+
await channelToAdapters.get(msg.channel)?.onEvent(msg.payload);
170+
} catch (err) {
171+
errorHandler(err);
172+
}
173+
});
174+
175+
client.on("error", () => {
176+
debug("client error");
177+
});
178+
179+
client.on("end", () => {
180+
debug("client was closed, scheduling reconnection...");
181+
scheduleReconnection();
182+
});
183+
} catch (err) {
184+
errorHandler(err);
185+
debug("error while initializing client, scheduling reconnection...");
186+
scheduleReconnection();
187+
}
188+
};
189+
190+
const scheduleCleanup = () => {
191+
cleanupTimer = setTimeout(async () => {
192+
try {
193+
await pool.query(
194+
`DELETE FROM ${tableName} WHERE created_at < now() - interval '${cleanupInterval} milliseconds'`
195+
);
196+
} catch (err) {
197+
errorHandler(err);
198+
}
199+
scheduleCleanup();
200+
}, cleanupInterval);
201+
};
202+
140203
return function (nsp: any) {
141-
return new PostgresAdapter(nsp, pool, opts);
204+
let adapter = new PostgresAdapter(nsp, pool, opts);
205+
206+
channelToAdapters.set(adapter.channel, adapter);
207+
208+
if (isConnectionInProgress) {
209+
// nothing to do
210+
} else if (client) {
211+
debug("client listening to %s", adapter.channel);
212+
client.query(`LISTEN "${adapter.channel}"`).catch(errorHandler);
213+
} else {
214+
isConnectionInProgress = true;
215+
initClient();
216+
217+
scheduleCleanup();
218+
}
219+
220+
const defaultClose = adapter.close;
221+
222+
adapter.close = () => {
223+
channelToAdapters.delete(adapter.channel);
224+
225+
if (channelToAdapters.size === 0) {
226+
if (client) {
227+
client.removeAllListeners("end");
228+
client.release();
229+
client = null;
230+
}
231+
if (cleanupTimer) {
232+
clearTimeout(cleanupTimer);
233+
}
234+
}
235+
236+
defaultClose.call(adapter);
237+
};
238+
239+
return adapter;
142240
};
143241
}
144242

@@ -150,14 +248,11 @@ export class PostgresAdapter extends Adapter {
150248
public heartbeatInterval: number;
151249
public heartbeatTimeout: number;
152250
public payloadThreshold: number;
153-
public cleanupInterval: number;
154251
public errorHandler: (err: Error) => void;
155252

156253
private readonly pool: Pool;
157-
private client: any;
158254
private nodesMap: Map<string, number> = new Map<string, number>(); // uid => timestamp of last message
159255
private heartbeatTimer: NodeJS.Timeout | undefined;
160-
private cleanupTimer: NodeJS.Timeout | undefined;
161256
private requests: Map<string, Request> = new Map();
162257
private ackRequests: Map<string, AckRequest> = new Map();
163258

@@ -185,69 +280,18 @@ export class PostgresAdapter extends Adapter {
185280
this.heartbeatInterval = opts.heartbeatInterval || 5000;
186281
this.heartbeatTimeout = opts.heartbeatTimeout || 10000;
187282
this.payloadThreshold = opts.payloadThreshold || 8000;
188-
this.cleanupInterval = opts.cleanupInterval || 30000;
189-
const defaultErrorHandler = (err: Error) => debug(err);
190283
this.errorHandler = opts.errorHandler || defaultErrorHandler;
191284

192-
this.initSubscription();
193285
this.publish({
194286
type: EventType.INITIAL_HEARTBEAT,
195287
});
196-
this.scheduleCleanup();
197288
}
198289

199290
close(): Promise<void> | void {
200291
debug("closing adapter");
201292
if (this.heartbeatTimer) {
202293
clearTimeout(this.heartbeatTimer);
203294
}
204-
if (this.cleanupTimer) {
205-
clearTimeout(this.cleanupTimer);
206-
}
207-
if (this.client) {
208-
this.client.removeAllListeners("end");
209-
this.client.release();
210-
this.client = null;
211-
}
212-
}
213-
214-
private async initSubscription() {
215-
try {
216-
debug("fetching client from the pool");
217-
const client = await this.pool.connect();
218-
debug("client listening to %s", this.channel);
219-
await client.query(`LISTEN "${this.channel}"`);
220-
221-
client.on("notification", async (msg: any) => {
222-
try {
223-
await this.onEvent(msg.payload);
224-
} catch (err) {
225-
this.errorHandler(err);
226-
}
227-
});
228-
229-
client.on("error", () => {
230-
debug("client error");
231-
});
232-
233-
client.on("end", () => {
234-
debug("client was closed, scheduling reconnection...");
235-
this.scheduleReconnection();
236-
});
237-
238-
this.client = client;
239-
} catch (err) {
240-
this.errorHandler(err);
241-
debug("error while initializing client, scheduling reconnection...");
242-
this.scheduleReconnection();
243-
}
244-
}
245-
246-
private scheduleReconnection() {
247-
const reconnectionDelay = Math.floor(2000 * (0.5 + Math.random()));
248-
setTimeout(() => {
249-
this.initSubscription();
250-
}, reconnectionDelay);
251295
}
252296

253297
public async onEvent(event: any) {
@@ -451,19 +495,6 @@ export class PostgresAdapter extends Adapter {
451495
}, this.heartbeatInterval);
452496
}
453497

454-
private scheduleCleanup() {
455-
this.cleanupTimer = setTimeout(async () => {
456-
try {
457-
await this.pool.query(
458-
`DELETE FROM ${this.tableName} WHERE created_at < now() - interval '${this.cleanupInterval} milliseconds'`
459-
);
460-
} catch (err) {
461-
this.errorHandler(err);
462-
}
463-
this.scheduleCleanup();
464-
}, this.cleanupInterval);
465-
}
466-
467498
private async publish(document: any) {
468499
document.uid = this.uid;
469500

0 commit comments

Comments
 (0)