Skip to content

Commit d5b48d2

Browse files
committed
Selectively configure flushing in initial sync.
1 parent 84b9708 commit d5b48d2

File tree

3 files changed

+48
-37
lines changed

3 files changed

+48
-37
lines changed

packages/powersync/lib/src/bucket_storage.dart

+26-35
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,6 @@ class BucketStorage {
4545
return rows.first['client_id'] as String;
4646
}
4747

48-
Future<void> streamOp(String op) async {
49-
await writeTransaction((tx) async {
50-
await tx.execute(
51-
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
52-
['stream', op]);
53-
});
54-
}
55-
5648
Future<void> saveSyncData(SyncDataBatch batch) async {
5749
var count = 0;
5850

@@ -65,7 +57,10 @@ class BucketStorage {
6557
'buckets': [b]
6658
}));
6759
}
68-
});
60+
// No need to flush - the data is not directly visible to the user either way.
61+
// We get major initial sync performance improvements with IndexedDB by
62+
// not flushing here.
63+
}, flush: false);
6964
_compactCounter += count;
7065
}
7166

@@ -85,7 +80,8 @@ class BucketStorage {
8580
await tx.execute(
8681
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
8782
['delete_bucket', bucket]);
88-
});
83+
// No need to flush - not directly visible to the user
84+
}, flush: false);
8985

9086
_pendingBucketDeletes = true;
9187
}
@@ -125,7 +121,8 @@ class BucketStorage {
125121
"UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'",
126122
[checkpoint.writeCheckpoint]);
127123
}
128-
});
124+
// Not flushing here - the flush will happen in the next step
125+
}, flush: false);
129126

130127
final valid = await updateObjectsFromBuckets(checkpoint);
131128
if (!valid) {
@@ -150,7 +147,10 @@ class BucketStorage {
150147
// can_update_local(db) == false
151148
return false;
152149
}
153-
});
150+
// Important to flush here.
151+
// After this step, the synced data will be visible to the user,
152+
// and we don't want that to be reverted.
153+
}, flush: true);
154154
}
155155

156156
Future<SyncLocalDatabaseResult> validateChecksums(
@@ -176,39 +176,25 @@ class BucketStorage {
176176
}
177177

178178
Future<void> autoCompact() async {
179+
// This is a no-op since powersync-sqlite-core v0.3.0
180+
179181
// 1. Delete buckets
180182
await _deletePendingBuckets();
181183

182184
// 2. Clear REMOVE operations, only keeping PUT ones
183185
await _clearRemoveOps();
184-
185-
// await _compactWal();
186-
}
187-
188-
// ignore: unused_element
189-
Future<void> _compactWal() async {
190-
try {
191-
await writeTransaction((tx) async {
192-
await tx.execute('PRAGMA wal_checkpoint(TRUNCATE)');
193-
});
194-
} on SqliteException catch (e) {
195-
// Ignore SQLITE_BUSY
196-
if (e.resultCode == 5) {
197-
// Ignore
198-
} else if (e.resultCode == 6) {
199-
// Ignore
200-
}
201-
}
202186
}
203187

204188
Future<void> _deletePendingBuckets() async {
189+
// This is a no-op since powersync-sqlite-core v0.3.0
205190
if (_pendingBucketDeletes) {
206191
// Executed once after start-up, and again when there are pending deletes.
207192
await writeTransaction((tx) async {
208193
await tx.execute(
209194
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
210195
['delete_pending_buckets', '']);
211-
});
196+
// No need to flush - not directly visible to the user
197+
}, flush: false);
212198
_pendingBucketDeletes = false;
213199
}
214200
}
@@ -218,11 +204,13 @@ class BucketStorage {
218204
return;
219205
}
220206

207+
// This is a no-op since powersync-sqlite-core v0.3.0
221208
await writeTransaction((tx) async {
222209
await tx.execute(
223210
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
224211
['clear_remove_ops', '']);
225-
});
212+
// No need to flush - not directly visible to the user
213+
}, flush: false);
226214
_compactCounter = 0;
227215
}
228216

@@ -267,7 +255,8 @@ class BucketStorage {
267255
[opId]);
268256

269257
return true;
270-
});
258+
// Flush here - don't want to lose the write checkpoint updates.
259+
}, flush: true);
271260
}
272261

273262
Future<CrudEntry?> nextCrudItem() async {
@@ -313,7 +302,8 @@ class BucketStorage {
313302
await tx.execute(
314303
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
315304
}
316-
});
305+
// Flush here - don't want to lose the write checkpoint updates.
306+
}, flush: true);
317307
});
318308
}
319309

@@ -323,7 +313,8 @@ class BucketStorage {
323313
/// concurrently.
324314
Future<T> writeTransaction<T>(
325315
Future<T> Function(SqliteWriteContext tx) callback,
326-
{Duration? lockTimeout}) async {
316+
{Duration? lockTimeout,
317+
required bool flush}) async {
327318
return _internalDb.writeTransaction(callback, lockTimeout: lockTimeout);
328319
}
329320
}

packages/powersync/lib/src/web/sync_worker.dart

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import 'package:powersync/src/streaming_sync.dart';
1616
import 'package:sqlite_async/web.dart';
1717
import 'package:web/web.dart' hide RequestMode;
1818

19-
import '../bucket_storage.dart';
2019
import 'sync_worker_protocol.dart';
20+
import 'web_bucket_storage.dart';
2121

2222
final _logger = autoLogger;
2323

@@ -258,7 +258,7 @@ class _SyncRunner {
258258
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;
259259

260260
sync = StreamingSyncImplementation(
261-
adapter: BucketStorage(database),
261+
adapter: WebBucketStorage(database),
262262
credentialsCallback: client.channel.credentialsCallback,
263263
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
264264
uploadCrud: client.channel.uploadCrud,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import 'package:powersync/sqlite_async.dart';
2+
import 'package:powersync/src/bucket_storage.dart';
3+
import 'package:sqlite_async/web.dart';
4+
5+
class WebBucketStorage extends BucketStorage {
6+
final WebSqliteConnection _webDb;
7+
8+
WebBucketStorage(this._webDb) : super(_webDb);
9+
10+
@override
11+
12+
/// Override to implement the flush parameter for web.
13+
Future<T> writeTransaction<T>(
14+
Future<T> Function(SqliteWriteContext tx) callback,
15+
{Duration? lockTimeout,
16+
required bool flush}) async {
17+
return _webDb.writeTransaction(callback,
18+
lockTimeout: lockTimeout, flush: flush);
19+
}
20+
}

0 commit comments

Comments
 (0)