-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathmultiple_instances.test.ts
278 lines (233 loc) · 8.55 KB
/
multiple_instances.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import {
AbstractPowerSyncDatabase,
createBaseLogger,
createLogger,
SqliteBucketStorage,
SyncStatus
} from '@powersync/common';
import {
SharedWebStreamingSyncImplementation,
SharedWebStreamingSyncImplementationOptions,
WebRemote
} from '@powersync/web';
import { Mutex } from 'async-mutex';
import { beforeAll, describe, expect, it, vi } from 'vitest';
import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { generateTestDb, testSchema } from './utils/testDb';
describe('Multiple Instances', { sequential: true }, () => {
const openDatabase = () =>
generateTestDb({
database: {
dbFilename: `test-multiple-instances.db`
},
schema: testSchema
});
beforeAll(() => createBaseLogger().useDefaults());
function createAsset(powersync: AbstractPowerSyncDatabase) {
return powersync.execute('INSERT INTO assets(id, description) VALUES(uuid(), ?)', ['test']);
}
it('should share data between instances', async () => {
const powersync = openDatabase();
// Create an asset on the first connection
await createAsset(powersync);
// Create a new connection and verify it can read existing assets
const db2 = openDatabase();
const assets = await db2.getAll('SELECT * FROM assets');
expect(assets.length).equals(1);
});
it('should broadcast logs from shared sync worker', { timeout: 20000 }, async () => {
const logger = createLogger('test-logger');
const spiedErrorLogger = vi.spyOn(logger, 'error');
const spiedDebugLogger = vi.spyOn(logger, 'debug');
const powersync = generateTestDb({
logger,
database: {
dbFilename: 'broadcast-logger-test.sqlite'
},
schema: testSchema
});
powersync.connect({
fetchCredentials: async () => {
return {
endpoint: 'http://localhost/does-not-exist',
token: 'none'
};
},
uploadData: async (db) => {}
});
// Should log that a connection attempt has been made
const message = 'Streaming sync iteration started';
await vi.waitFor(
() =>
expect(
spiedDebugLogger.mock.calls
.flat(1)
.find((argument) => typeof argument == 'string' && argument.includes(message))
).exist,
{ timeout: 2000 }
);
// The connection should fail with an error
await vi.waitFor(() => expect(spiedErrorLogger.mock.calls.length).gt(0), { timeout: 2000 });
// This test seems to take quite long while waiting for this disconnect call
});
it('should maintain DB connections if instances call close', async () => {
/**
* The shared webworker should use the same DB connection for both instances.
* The shared connection should only be closed if all PowerSync clients
* close themselves.
*/
const powersync1 = openDatabase();
const powersync2 = openDatabase();
await powersync1.close();
// Create an asset on the first connection
await createAsset(powersync2);
});
it('should watch table changes between instances', async () => {
const db1 = openDatabase();
const db2 = openDatabase();
const watchedPromise = new Promise<void>(async (resolve) => {
const controller = new AbortController();
for await (const result of db2.watch('SELECT * FROM assets', [], { signal: controller.signal })) {
if (result.rows?.length) {
resolve();
controller.abort();
}
}
});
await createAsset(db1);
await watchedPromise;
});
it('should share sync updates', async () => {
// Generate the first streaming sync implementation
const connector1 = new TestConnector();
const db = openDatabase();
await db.init();
// They need to use the same identifier to use the same shared worker.
const identifier = 'streaming-sync-shared';
const syncOptions1: SharedWebStreamingSyncImplementationOptions = {
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
await connector1.uploadData(db);
},
identifier,
db: db.database as WebDBAdapter
};
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
// Generate the second streaming sync implementation
const connector2 = new TestConnector();
const syncOptions2: SharedWebStreamingSyncImplementationOptions = {
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
await connector2.uploadData(db);
},
identifier,
db: db.database as WebDBAdapter
};
const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2);
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
const l = stream2.registerListener({
statusChanged: (status) => {
if (status.connected) {
resolve();
l();
}
}
});
});
// hack to set the status to a new one for tests
(stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true }));
await stream2UpdatedPromise;
expect(stream2.isConnected).true;
await stream1.dispose();
await stream2.dispose();
});
it('should trigger uploads from last connected clients', async () => {
// Generate the first streaming sync implementation
const connector1 = new TestConnector();
const spy1 = vi.spyOn(connector1, 'uploadData');
const db = openDatabase();
await db.init();
// They need to use the same identifier to use the same shared worker.
const identifier = db.database.name;
// Resolves once the first connector has been called to upload data
let triggerUpload1: () => void;
const upload1TriggeredPromise = new Promise<void>((resolve) => {
triggerUpload1 = resolve;
});
// Create the first streaming client
const stream1 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload1();
connector1.uploadData(db);
},
db: db.database as WebDBAdapter,
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
}
});
// Generate the second streaming sync implementation
const connector2 = new TestConnector();
// The second connector will be called first to upload, we don't want it to actually upload
// This will cause the sync uploads to be delayed as the CRUD queue did not change
const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {});
let triggerUpload2: () => void;
const upload2TriggeredPromise = new Promise<void>((resolve) => {
triggerUpload2 = resolve;
});
const stream2 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload2();
connector2.uploadData(db);
},
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
},
db: db.database as WebDBAdapter
});
// Waits for the stream to be marked as connected
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
const l = stream2.registerListener({
statusChanged: (status) => {
if (status.connected) {
resolve();
l();
}
}
});
});
// hack to set the status to connected for tests
(stream1 as any)['_testUpdateStatus'](new SyncStatus({ connected: true }));
// The status in the second stream client should be updated
await stream2UpdatedPromise;
expect(stream2.isConnected).true;
// Create something with CRUD in it.
await db.execute('INSERT into customers (id, name, email) VALUES (uuid(), ?, ?)', [
'steven',
]);
// Manual trigger since tests don't entirely configure watches for ps_crud
stream1.triggerCrudUpload();
// The second connector should be called to upload
await upload2TriggeredPromise;
// It should call the latest connected client
expect(spy2).toHaveBeenCalledOnce();
// Close the second client, leaving only the first one
await stream2.dispose();
stream1.triggerCrudUpload();
// It should now upload from the first client
await upload1TriggeredPromise;
expect(spy1).toHaveBeenCalledOnce();
await stream1.dispose();
});
});