Skip to content

Commit c3b39b5

Browse files
committed
Added single writer processing with isolation to SQLite3 connection pool
1 parent 2cd06e4 commit c3b39b5

4 files changed

Lines changed: 138 additions & 21 deletions

File tree

src/packages/dumbo/src/core/taskProcessing/executionGuards.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { v7 as uuid } from 'uuid';
2+
import { DumboError, TransientDatabaseError } from '../errors';
23
import { TaskProcessor } from './taskProcessor';
34

45
export type ExclusiveAccessGuard = {

src/packages/dumbo/src/storage/sqlite/sqlite3/index.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
type SQLite3Connection,
2323
type SQLite3ConnectionOptions,
2424
} from './connections';
25+
import { serializeSqlite3WriterPool } from './serializeWriter';
2526

2627
export type SQLite3DumboOptions = Omit<
2728
SQLitePoolOptions<SQLite3Connection, SQLite3ConnectionOptions>,
@@ -33,8 +34,8 @@ export type SQLite3PoolOptions = SQLite3DumboOptions;
3334

3435
export type Sqlite3Pool = SQLitePool<SQLite3Connection>;
3536

36-
export const sqlite3Pool = (options: SQLite3DumboOptions) =>
37-
sqlitePool(
37+
export const sqlite3Pool = (options: SQLite3DumboOptions) => {
38+
const pool = sqlitePool(
3839
toSqlitePoolOptions({
3940
...options,
4041
driverType: SQLite3DriverType,
@@ -51,6 +52,17 @@ export const sqlite3Pool = (options: SQLite3DumboOptions) =>
5152
}),
5253
);
5354

55+
// Ambient pools wrap a connection the caller already holds; serialising on
56+
// top of them would double-lock and defeat the purpose. Anything else gets
57+
// wrapped so writer-bound calls (withConnection, withTransaction, command,
58+
// batchCommand) serialise through a single TaskProcessor, with ALS-based
59+
// reentrancy so nested calls from inside an active writer task bypass the
60+
// queue instead of deadlocking.
61+
if ('connection' in options && options.connection) return pool;
62+
63+
return serializeSqlite3WriterPool(pool);
64+
};
65+
5466
const tryParseConnectionString = (connectionString: string) => {
5567
try {
5668
return SQLiteConnectionString(connectionString);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { AsyncLocalStorage } from 'node:async_hooks';
2+
import { TaskProcessor } from '../../../core/taskProcessing';
3+
import type { SQLitePool } from '../core';
4+
import type { SQLite3Connection } from './connections';
5+
6+
export const serializeSqlite3WriterPool = (
7+
pool: SQLitePool<SQLite3Connection>,
8+
options?: { maxQueueSize?: number },
9+
): SQLitePool<SQLite3Connection> => {
10+
const taskProcessor = new TaskProcessor({
11+
maxActiveTasks: 1,
12+
maxQueueSize: options?.maxQueueSize ?? 1000,
13+
});
14+
const insideWriterTask = new AsyncLocalStorage<true>();
15+
16+
const enqueue = <Result>(op: () => Promise<Result>): Promise<Result> => {
17+
if (insideWriterTask.getStore() === true) {
18+
return op();
19+
}
20+
return taskProcessor.enqueue(({ ack }) =>
21+
insideWriterTask.run(true, async () => {
22+
try {
23+
return await op();
24+
} finally {
25+
ack();
26+
}
27+
}),
28+
);
29+
};
30+
31+
return {
32+
driverType: pool.driverType,
33+
connection: pool.connection.bind(pool),
34+
transaction: pool.transaction.bind(pool),
35+
withConnection: ((handle, connectionOptions) =>
36+
connectionOptions?.readonly
37+
? pool.withConnection(handle, connectionOptions)
38+
: enqueue(() =>
39+
pool.withConnection(handle, connectionOptions),
40+
)) as SQLitePool<SQLite3Connection>['withConnection'],
41+
withTransaction: ((handle, transactionOptions) =>
42+
enqueue(() =>
43+
pool.withTransaction(handle, transactionOptions),
44+
)) as SQLitePool<SQLite3Connection>['withTransaction'],
45+
execute: {
46+
query: pool.execute.query.bind(pool.execute),
47+
batchQuery: pool.execute.batchQuery.bind(pool.execute),
48+
command: (sql, commandOptions) =>
49+
enqueue(() => pool.execute.command(sql, commandOptions)),
50+
batchCommand: (sqls, commandOptions) =>
51+
enqueue(() => pool.execute.batchCommand(sqls, commandOptions)),
52+
},
53+
close: async () => {
54+
await taskProcessor.stop();
55+
await pool.close();
56+
},
57+
};
58+
};

src/packages/dumbo/src/storage/sqlite/sqlite3/transactions/transactions.int.spec.ts

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -552,12 +552,8 @@ describe('SQLite3 Transactions', () => {
552552
}
553553
});
554554

555-
// TODO: parallel pool.withTransaction calls share one underlying SQLite
556-
// transaction on the singleton writer connection. A rollback in one
557-
// sibling rolls back the other's writes. Fix requires a writer queue
558-
// (one real BEGIN/COMMIT per call), not just the finally-block tweak.
559-
it.fails(
560-
`isolates a failing parallel transaction from a successful one with ${testName} database`,
555+
it(
556+
`keeps a plain command outside of an open transaction with ${testName} database`,
561557
async () => {
562558
const pool = sqlite3Pool({
563559
fileName,
@@ -566,37 +562,87 @@ describe('SQLite3 Transactions', () => {
566562

567563
try {
568564
await pool.execute.command(
569-
SQL`CREATE TABLE isolation_test (id INTEGER PRIMARY KEY, value TEXT)`,
565+
SQL`CREATE TABLE plain_isolation (id INTEGER PRIMARY KEY, value TEXT)`,
570566
);
571567

572-
let releaseA: () => void = () => {};
573-
let releaseB: () => void = () => {};
574-
const aMayProceed = new Promise<void>((r) => {
575-
releaseA = r;
568+
let releaseTx: () => void = () => {};
569+
let plainMayStart: () => void = () => {};
570+
const txMayProceed = new Promise<void>((r) => {
571+
releaseTx = r;
572+
});
573+
const plainCanStart = new Promise<void>((r) => {
574+
plainMayStart = r;
576575
});
577-
const bMayProceed = new Promise<void>((r) => {
578-
releaseB = r;
576+
577+
const txPromise = pool.withTransaction(async (tx) => {
578+
await tx.execute.command(
579+
SQL`INSERT INTO plain_isolation (id, value) VALUES (1, 'tx-row')`,
580+
);
581+
plainMayStart();
582+
await txMayProceed;
583+
throw new Error('tx intentional rollback');
579584
});
580585

586+
await plainCanStart;
587+
588+
const plainPromise = pool.execute.command(
589+
SQL`INSERT INTO plain_isolation (id, value) VALUES (2, 'plain-row')`,
590+
);
591+
592+
releaseTx();
593+
594+
await assert.rejects(txPromise, /tx intentional rollback/);
595+
await plainPromise;
596+
597+
const rows = await pool.execute.query<{
598+
id: number;
599+
value: string;
600+
}>(SQL`SELECT id, value FROM plain_isolation ORDER BY id`);
601+
602+
assert.deepStrictEqual(
603+
rows.rows.map((r) => r.value),
604+
['plain-row'],
605+
);
606+
} finally {
607+
await pool.close();
608+
}
609+
},
610+
);
611+
612+
it(
613+
`isolates a failing parallel transaction from a successful one with ${testName} database`,
614+
async () => {
615+
const pool = sqlite3Pool({
616+
fileName,
617+
transactionOptions: { allowNestedTransactions: true },
618+
});
619+
620+
try {
621+
await pool.execute.command(
622+
SQL`CREATE TABLE isolation_test (id INTEGER PRIMARY KEY, value TEXT)`,
623+
);
624+
581625
const a = pool.withTransaction(async (tx) => {
582626
await tx.execute.command(
583627
SQL`INSERT INTO isolation_test (id, value) VALUES (1, 'a-commits')`,
584628
);
585-
releaseB();
586-
await aMayProceed;
587629
});
588630

589631
const b = pool.withTransaction(async (tx) => {
590-
await bMayProceed;
591632
await tx.execute.command(
592633
SQL`INSERT INTO isolation_test (id, value) VALUES (2, 'b-rolls-back')`,
593634
);
594635
throw new Error('B intentional failure');
595636
});
596637

597-
await assert.rejects(b, /B intentional failure/);
598-
releaseA();
599-
await a;
638+
const results = await Promise.allSettled([a, b]);
639+
640+
assert.strictEqual(results[0]?.status, 'fulfilled');
641+
assert.strictEqual(results[1]?.status, 'rejected');
642+
assert.match(
643+
(results[1] as PromiseRejectedResult).reason.message,
644+
/B intentional failure/,
645+
);
600646

601647
const rows = await pool.execute.query<{
602648
id: number;

0 commit comments

Comments
 (0)