Skip to content

Commit d6fae7a

Browse files
committed
Merged serializeWriter into singleton pool to ensure reentrance
1 parent 7671a32 commit d6fae7a

6 files changed

Lines changed: 132 additions & 103 deletions

File tree

src/packages/dumbo/src/storage/sqlite/core/pool/pool.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import {
1212
createSingletonConnectionPool,
1313
type ConnectionPool,
1414
} from '../../../../core';
15-
import {
16-
sqliteDualConnectionPool,
17-
type SQLiteDualPoolOptions,
18-
} from './dualPool';
1915

2016
export type SQLiteFileNameOrConnectionString =
2117
| {
@@ -151,7 +147,6 @@ export type SQLitePoolOptions<
151147
ConnectionOptions
152148
>
153149
| SQLiteAmbientConnectionPoolOptions<SQLiteConnectionType>
154-
| SQLiteDualPoolOptions<SQLiteConnectionType, ConnectionOptions>
155150
) & {
156151
driverType: SQLiteConnectionType['driverType'];
157152
serializer?: JSONSerializer;
@@ -204,10 +199,7 @@ export const toSqlitePoolOptions = <
204199
>;
205200
}
206201

207-
return { ...rest, dual: true } as SQLitePoolOptions<
208-
SQLiteConnectionType,
209-
ConnectionOptions
210-
>;
202+
return rest as SQLitePoolOptions<SQLiteConnectionType, ConnectionOptions>;
211203
};
212204

213205
export function sqlitePool<
@@ -234,12 +226,6 @@ export function sqlitePool<
234226
).connection,
235227
});
236228

237-
if ('dual' in options && options.dual) {
238-
return sqliteDualConnectionPool(
239-
options as SQLiteDualPoolOptions<SQLiteConnectionType, ConnectionOptions>,
240-
);
241-
}
242-
243229
if (
244230
options.singleton === true &&
245231
(

src/packages/dumbo/src/storage/sqlite/sqlite3/index.ts

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
} from '../../../core';
99
import {
1010
DefaultSQLiteMigratorOptions,
11+
isInMemoryDatabase,
1112
SQLiteConnectionString,
1213
sqliteFormatter,
1314
sqliteMetadata,
@@ -22,7 +23,8 @@ import {
2223
type SQLite3Connection,
2324
type SQLite3ConnectionOptions,
2425
} from './connections';
25-
import { serializeSqlite3WriterPool } from './serializeWriter';
26+
import { sqliteDualConnectionPool } from './pool/dualPool';
27+
import { sqlite3SingletonPool } from './pool/singletonPool';
2628

2729
export type SQLite3DumboOptions = Omit<
2830
SQLitePoolOptions<SQLite3Connection, SQLite3ConnectionOptions>,
@@ -34,33 +36,50 @@ export type SQLite3PoolOptions = SQLite3DumboOptions;
3436

3537
export type Sqlite3Pool = SQLitePool<SQLite3Connection>;
3638

37-
export const sqlite3Pool = (options: SQLite3DumboOptions) => {
38-
const pool = sqlitePool(
39-
toSqlitePoolOptions({
40-
...options,
41-
driverType: SQLite3DriverType,
42-
...('connection' in options
43-
? {}
44-
: {
45-
connectionOptions: options,
46-
sqliteConnectionFactory: (opts: SQLite3ConnectionOptions) =>
47-
sqlite3Connection({
48-
...opts,
49-
serializer: options.serializer ?? JSONSerializer,
50-
}),
51-
}),
52-
}),
53-
);
39+
export const sqlite3Pool = (
40+
options: SQLite3DumboOptions,
41+
): SQLitePool<SQLite3Connection> => {
42+
// Ambient: caller-managed connection. No acquisition, no serialisation.
43+
if ('connection' in options && options.connection) {
44+
return sqlitePool(
45+
toSqlitePoolOptions({
46+
...options,
47+
driverType: SQLite3DriverType,
48+
}),
49+
);
50+
}
51+
52+
const sqliteConnectionFactory = (opts: SQLite3ConnectionOptions) =>
53+
sqlite3Connection({
54+
...opts,
55+
serializer: options.serializer ?? JSONSerializer,
56+
});
5457

55-
// Ambient pools wrap a connection the caller already holds; serialising on
56-
// top of them would double-lock and defeat the purpose. Anything else gets
57-
// wrapped so writer-bound calls (withConnection, withTransaction, command,
58-
// batchCommand) serialise through a single TaskProcessor, with ALS-based
59-
// reentrancy so nested calls from inside an active writer task bypass the
60-
// queue instead of deadlocking.
61-
if ('connection' in options && options.connection) return pool;
58+
// Singleton-shaped: in-memory DBs, an explicit client, or `singleton: true`.
59+
// One connection shared across callers — wrap it so concurrent callers
60+
// serialise through a single-slot TaskProcessor with ALS-based reentrancy.
61+
const isSingleton =
62+
isInMemoryDatabase(options) ||
63+
('client' in options && Boolean(options.client)) ||
64+
options.singleton === true;
65+
66+
if (isSingleton) {
67+
return sqlite3SingletonPool<SQLite3Connection>({
68+
driverType: SQLite3DriverType,
69+
getConnection: () => sqliteConnectionFactory(options),
70+
});
71+
}
6272

63-
return serializeSqlite3WriterPool(pool);
73+
// Default: file-backed dual pool. Its writer side is serialised inside
74+
// sqliteDualConnectionPool via the same primitive.
75+
const readerPoolSize = (options as { readerPoolSize?: number })
76+
.readerPoolSize;
77+
return sqliteDualConnectionPool({
78+
driverType: SQLite3DriverType,
79+
sqliteConnectionFactory,
80+
connectionOptions: options,
81+
...(readerPoolSize !== undefined ? { readerPoolSize } : {}),
82+
});
6483
};
6584

6685
const tryParseConnectionString = (connectionString: string) => {

src/packages/dumbo/src/storage/sqlite/core/pool/dualPool.int.spec.ts renamed to src/packages/dumbo/src/storage/sqlite/sqlite3/pool/dualPool.int.spec.ts

File renamed without changes.

src/packages/dumbo/src/storage/sqlite/core/pool/dualPool.ts renamed to src/packages/dumbo/src/storage/sqlite/sqlite3/pool/dualPool.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
import { cpus } from 'os';
2-
import {
3-
createBoundedConnectionPool,
4-
createSingletonConnectionPool,
5-
} from '../../../../core';
2+
import { createBoundedConnectionPool } from '../../../../core';
63
import { guardInitializedOnce } from '../../../../core/taskProcessing';
74
import type {
85
AnySQLiteConnection,
96
SQLiteConnectionFactory,
107
SQLiteConnectionOptions,
11-
} from '../connections';
12-
import { mapSqliteError } from '../errors';
13-
import type { SQLitePool } from './pool';
8+
SQLitePool,
9+
} from '../../core';
10+
import { mapSqliteError } from '../../core/errors';
11+
import { sqlite3SingletonPool } from './singletonPool';
1412

1513
export type SQLiteDualPoolOptions<
1614
SQLiteConnectionType extends AnySQLiteConnection,
@@ -89,7 +87,7 @@ export const sqliteDualConnectionPool = <
8987
return connection;
9088
};
9189

92-
const writerPool = createSingletonConnectionPool({
90+
const writerPool = sqlite3SingletonPool<SQLiteConnectionType>({
9391
driverType: options.driverType,
9492
getConnection: () => wrappedConnectionFactory(false, connectionOptions),
9593
});
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { AsyncLocalStorage } from 'node:async_hooks';
2+
import {
3+
createSingletonConnectionPool,
4+
type ConnectionPool,
5+
} from '../../../../core';
6+
import { TaskProcessor } from '../../../../core/taskProcessing';
7+
import type { AnySQLiteConnection } from '../../core';
8+
9+
export type SQLite3SingletonPoolOptions<
10+
SQLiteConnectionType extends AnySQLiteConnection,
11+
> = {
12+
driverType: SQLiteConnectionType['driverType'];
13+
getConnection: () => SQLiteConnectionType | Promise<SQLiteConnectionType>;
14+
closeConnection?: (connection: SQLiteConnectionType) => void | Promise<void>;
15+
maxQueueSize?: number;
16+
};
17+
18+
// Creates a singleton-connection pool whose callers serialise through a
19+
// single-slot TaskProcessor. An AsyncLocalStorage flag lets re-entrant calls
20+
// (made from inside an active task on this pool) bypass the queue instead of
21+
// deadlocking — that's the path emmett relies on when a workflow handler
22+
// internally calls back through the same pool (e.g. messageStore.appendToStream).
23+
export const sqlite3SingletonPool = <
24+
SQLiteConnectionType extends AnySQLiteConnection,
25+
>(
26+
options: SQLite3SingletonPoolOptions<SQLiteConnectionType>,
27+
): ConnectionPool<SQLiteConnectionType> => {
28+
const inner = createSingletonConnectionPool<SQLiteConnectionType>({
29+
driverType: options.driverType,
30+
getConnection: options.getConnection,
31+
...(options.closeConnection
32+
? { closeConnection: options.closeConnection }
33+
: {}),
34+
});
35+
36+
const taskProcessor = new TaskProcessor({
37+
maxActiveTasks: 1,
38+
maxQueueSize: options.maxQueueSize ?? 1000,
39+
});
40+
const insideWriterTask = new AsyncLocalStorage<true>();
41+
42+
const enqueue = <Result>(op: () => Promise<Result>): Promise<Result> => {
43+
if (insideWriterTask.getStore() === true) {
44+
return op();
45+
}
46+
return taskProcessor.enqueue(({ ack }) =>
47+
insideWriterTask.run(true, async () => {
48+
try {
49+
return await op();
50+
} finally {
51+
ack();
52+
}
53+
}),
54+
);
55+
};
56+
57+
return {
58+
driverType: inner.driverType,
59+
connection: inner.connection.bind(inner),
60+
transaction: inner.transaction.bind(inner),
61+
withConnection: (handle, connectionOptions) =>
62+
enqueue(() => inner.withConnection(handle, connectionOptions)),
63+
withTransaction: (handle, transactionOptions) =>
64+
enqueue(() => inner.withTransaction(handle, transactionOptions)),
65+
execute: {
66+
query: (sql, queryOptions) =>
67+
enqueue(() => inner.execute.query(sql, queryOptions)),
68+
batchQuery: (sqls, queryOptions) =>
69+
enqueue(() => inner.execute.batchQuery(sqls, queryOptions)),
70+
command: (sql, commandOptions) =>
71+
enqueue(() => inner.execute.command(sql, commandOptions)),
72+
batchCommand: (sqls, commandOptions) =>
73+
enqueue(() => inner.execute.batchCommand(sqls, commandOptions)),
74+
},
75+
close: async () => {
76+
await taskProcessor.stop();
77+
await inner.close();
78+
},
79+
};
80+
};

src/packages/dumbo/src/storage/sqlite/sqlite3/serializeWriter.ts

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)