Skip to content

Commit de6eaa1

Browse files
committed
POC for onUpdate implementation for better-sqlite3.
1 parent 3e91fbf commit de6eaa1

File tree

6 files changed

+137
-0
lines changed

6 files changed

+137
-0
lines changed

src/driver-api.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ export interface SqliteDriverConnection {
1616
options?: ExecuteOptions
1717
): Promise<ResultSet>;
1818

19+
onUpdate(
20+
listener: UpdateListener,
21+
options?: { tables?: string[]; batchLimit?: number }
22+
): () => void;
23+
1924
close(): Promise<void>;
2025
}
2126

@@ -31,6 +36,23 @@ export interface SqliteDriverConnectionPool {
3136
): Promise<ReservedConnection>;
3237

3338
close(): Promise<void>;
39+
40+
onUpdate(
41+
listener: UpdateListener,
42+
options?: { tables?: string[]; batchLimit?: number }
43+
): () => void;
44+
}
45+
46+
export type UpdateListener = (event: BatchedUpdateEvent) => void;
47+
48+
export interface BatchedUpdateEvent {
49+
events: UpdateEvent[];
50+
}
51+
52+
export interface UpdateEvent {
53+
table: string;
54+
type: "insert" | "update" | "delete";
55+
rowId: bigint;
3456
}
3557

3658
export interface ReservedConnection {

src/driver-util.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
ReservedConnection,
44
SqliteDriverConnection,
55
SqliteDriverConnectionPool,
6+
UpdateListener,
67
} from "./driver-api.js";
78

89
interface QueuedItem {
@@ -78,6 +79,15 @@ export class SingleConnectionPool implements SqliteDriverConnectionPool {
7879
break;
7980
}
8081
}
82+
83+
onUpdate(
84+
listener: UpdateListener,
85+
options?:
86+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
87+
| undefined
88+
): () => void {
89+
return this.connection.onUpdate(listener, options);
90+
}
8191
}
8292

8393
export interface DriverFactory {
@@ -161,6 +171,16 @@ class MultiConnectionPool implements SqliteDriverConnectionPool {
161171
await con.close();
162172
}
163173
}
174+
175+
onUpdate(
176+
listener: UpdateListener,
177+
options?:
178+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
179+
| undefined
180+
): () => void {
181+
// No-op
182+
return () => {};
183+
}
164184
}
165185

166186
export class ReadWriteConnectionPool implements SqliteDriverConnectionPool {
@@ -193,4 +213,13 @@ export class ReadWriteConnectionPool implements SqliteDriverConnectionPool {
193213
await this.readPool.close();
194214
await this.writePool?.close();
195215
}
216+
217+
onUpdate(
218+
listener: UpdateListener,
219+
options?:
220+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
221+
| undefined
222+
): () => void {
223+
return this.writePool!.onUpdate(listener, options);
224+
}
196225
}

src/drivers/better-sqlite3-async-driver.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
RunResults,
66
SqliteDriverConnection,
77
SqliteDriverConnectionPool,
8+
UpdateListener,
89
} from "../driver-api.js";
910

1011
import { EventIterator } from "event-iterator";
@@ -104,4 +105,13 @@ export class BetterSqliteAsyncConnection implements SqliteDriverConnection {
104105
}
105106
return results!;
106107
}
108+
109+
onUpdate(
110+
listener: UpdateListener,
111+
options?:
112+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
113+
| undefined
114+
): () => void {
115+
throw new Error("Not implemented");
116+
}
107117
}

src/drivers/better-sqlite3-driver.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
RunResults,
77
SqliteDriverConnection,
88
SqliteDriverConnectionPool,
9+
UpdateListener,
910
} from "../driver-api.js";
1011
const Database = require("better-sqlite3");
1112

@@ -118,4 +119,46 @@ export class BetterSqliteConnection implements SqliteDriverConnection {
118119
dispose(): void {
119120
// No-op
120121
}
122+
123+
onUpdate(
124+
listener: UpdateListener,
125+
options?:
126+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
127+
| undefined
128+
): () => void {
129+
// Proof-of-concept implementation, based on the idea here:
130+
// https://github.com/WiseLibs/better-sqlite3/issues/62
131+
// TODO:
132+
// 1. Handle multiple registrations.
133+
// 2. Don't re-register triggers.
134+
// 3. De-register listener.
135+
// 4. Batching.
136+
//
137+
// More fundamental limitations:
138+
// 1. The table needs to exist before registering the listener.
139+
// 2. Deleting and re-creating the same will dereigster the listener for that table.
140+
141+
this.con.function("_logger", function (table: any, type: any, rowid: any) {
142+
listener({ events: [{ table, rowId: rowid, type }] });
143+
});
144+
let tables = options?.tables;
145+
if (tables == null) {
146+
tables = this.con
147+
.prepare(`select name from sqlite_master where type = 'table'`)
148+
.all()
149+
.map((row) => (row as any).name as string);
150+
}
151+
for (let table of tables) {
152+
this.con.exec(
153+
`CREATE TEMPORARY TRIGGER IF NOT EXISTS _logger_notification_${table}__update AFTER UPDATE ON ${table} BEGIN SELECT _logger('${table}', 'update', NEW.rowid); END`
154+
);
155+
this.con.exec(
156+
`CREATE TEMPORARY TRIGGER IF NOT EXISTS _logger_notification_${table}__insert AFTER INSERT ON ${table} BEGIN SELECT _logger('${table}', 'insert', NEW.rowid); END`
157+
);
158+
this.con.exec(
159+
`CREATE TEMPORARY TRIGGER IF NOT EXISTS _logger_notification_${table}__delete AFTER DELETE ON ${table} BEGIN SELECT _logger('${table}', 'delete', OLD.rowid); END`
160+
);
161+
}
162+
return () => {};
163+
}
121164
}

src/drivers/sqlite3-driver.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
RunResults,
77
SqliteDriverConnection,
88
SqliteDriverConnectionPool,
9+
UpdateListener,
910
} from "../driver-api.js";
1011

1112
import { ReadWriteConnectionPool } from "../driver-util.js";
@@ -93,6 +94,15 @@ export class Sqlite3Connection implements SqliteDriverConnection {
9394
async close() {
9495
this.con.close();
9596
}
97+
98+
onUpdate(
99+
listener: UpdateListener,
100+
options?:
101+
| { tables?: string[] | undefined; batchLimit?: number | undefined }
102+
| undefined
103+
): () => void {
104+
throw new Error("Not implemented");
105+
}
96106
}
97107

98108
function transformResults(rows: any[], options?: ExecuteOptions): ResultSet {

test/tests/drivers.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,5 +165,28 @@ export function describeDriverTests(
165165
release();
166166
}
167167
});
168+
169+
test.skip("onUpdate", async () => {
170+
// Skipped: Not properly implemented yet.
171+
172+
const driver = await open();
173+
const { connection, release } = await driver.reserveConnection();
174+
try {
175+
await connection.run(
176+
"create table test_data(id integer primary key, data text)"
177+
);
178+
// TODO: test the results
179+
connection.onUpdate(({ events }) => {
180+
console.log("update", events);
181+
});
182+
183+
await connection.run(
184+
"insert into test_data(data) values(123) returning id"
185+
);
186+
await connection.run("update test_data set data = data || 'test'");
187+
} finally {
188+
release();
189+
}
190+
});
168191
});
169192
}

0 commit comments

Comments
 (0)