-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathOPSQLiteConnection.ts
86 lines (75 loc) · 2.36 KB
/
OPSQLiteConnection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite';
import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common';
export type OPSQLiteConnectionOptions = {
baseDB: DB;
};
export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
protected DB: DB;
constructor(protected options: OPSQLiteConnectionOptions) {
super();
this.DB = options.baseDB;
// link table update commands
this.DB.updateHook((update) => {
this.iterateListeners((cb) => {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}
cb.tablesUpdated?.({
table: update.table,
opType,
rowId: update.rowId
});
});
});
}
close() {
return this.DB.close();
}
async execute(query: string, params?: any[]): Promise<QueryResult> {
const res = await this.DB.execute(query, params);
return {
insertId: res.insertId,
rowsAffected: res.rowsAffected,
rows: {
_array: res.rows ?? [],
length: res.rows?.length ?? 0,
item: (index: number) => res.rows?.[index]
}
};
}
async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
const tuple: SQLBatchTuple[] = [[query, params[0]]];
params.slice(1).forEach((p) => tuple.push([query, p]));
const result = await this.DB.executeBatch(tuple);
return {
rowsAffected: result.rowsAffected ?? 0
};
}
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
const result = await this.DB.execute(sql, parameters);
return result.rows ?? [];
}
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
const result = await this.DB.execute(sql, parameters);
return result.rows?.[0] ?? null;
}
async get<T>(sql: string, parameters?: any[]): Promise<T> {
const result = await this.getOptional(sql, parameters);
if (!result) {
throw new Error('Result set is empty');
}
return result as T;
}
async refreshSchema() {
await this.get("PRAGMA table_info('sqlite_master')");
}
}