Skip to content

IndexedDB flushing #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 26 additions & 35 deletions packages/powersync/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ class BucketStorage {
return rows.first['client_id'] as String;
}

Future<void> streamOp(String op) async {
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['stream', op]);
});
}

Future<void> saveSyncData(SyncDataBatch batch) async {
var count = 0;

Expand All @@ -65,7 +57,10 @@ class BucketStorage {
'buckets': [b]
}));
}
});
// No need to flush - the data is not directly visible to the user either way.
// We get major initial sync performance improvements with IndexedDB by
// not flushing here.
}, flush: false);
_compactCounter += count;
}

Expand All @@ -85,7 +80,8 @@ class BucketStorage {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['delete_bucket', bucket]);
});
// No need to flush - not directly visible to the user
}, flush: false);

_pendingBucketDeletes = true;
}
Expand Down Expand Up @@ -125,7 +121,8 @@ class BucketStorage {
"UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'",
[checkpoint.writeCheckpoint]);
}
});
// Not flushing here - the flush will happen in the next step
}, flush: false);

final valid = await updateObjectsFromBuckets(checkpoint);
if (!valid) {
Expand All @@ -150,7 +147,10 @@ class BucketStorage {
// can_update_local(db) == false
return false;
}
});
// Important to flush here.
// After this step, the synced data will be visible to the user,
// and we don't want that to be reverted.
}, flush: true);
}

Future<SyncLocalDatabaseResult> validateChecksums(
Expand All @@ -176,39 +176,25 @@ class BucketStorage {
}

Future<void> autoCompact() async {
// This is a no-op since powersync-sqlite-core v0.3.0

// 1. Delete buckets
await _deletePendingBuckets();

// 2. Clear REMOVE operations, only keeping PUT ones
await _clearRemoveOps();

// await _compactWal();
}

// ignore: unused_element
Future<void> _compactWal() async {
try {
await writeTransaction((tx) async {
await tx.execute('PRAGMA wal_checkpoint(TRUNCATE)');
});
} on SqliteException catch (e) {
// Ignore SQLITE_BUSY
if (e.resultCode == 5) {
// Ignore
} else if (e.resultCode == 6) {
// Ignore
}
}
}

Future<void> _deletePendingBuckets() async {
// This is a no-op since powersync-sqlite-core v0.3.0
if (_pendingBucketDeletes) {
// Executed once after start-up, and again when there are pending deletes.
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['delete_pending_buckets', '']);
});
// No need to flush - not directly visible to the user
}, flush: false);
_pendingBucketDeletes = false;
}
}
Expand All @@ -218,11 +204,13 @@ class BucketStorage {
return;
}

// This is a no-op since powersync-sqlite-core v0.3.0
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['clear_remove_ops', '']);
});
// No need to flush - not directly visible to the user
}, flush: false);
_compactCounter = 0;
}

Expand Down Expand Up @@ -267,7 +255,8 @@ class BucketStorage {
[opId]);

return true;
});
// Flush here - don't want to lose the write checkpoint updates.
}, flush: true);
}

Future<CrudEntry?> nextCrudItem() async {
Expand Down Expand Up @@ -313,7 +302,8 @@ class BucketStorage {
await tx.execute(
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
}
});
// Flush here - don't want to lose the write checkpoint updates.
}, flush: true);
});
}

Expand All @@ -323,7 +313,8 @@ class BucketStorage {
/// concurrently.
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout}) async {
{Duration? lockTimeout,
required bool flush}) async {
return _internalDb.writeTransaction(callback, lockTimeout: lockTimeout);
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import 'package:powersync/src/streaming_sync.dart';
import 'package:sqlite_async/web.dart';
import 'package:web/web.dart' hide RequestMode;

import '../bucket_storage.dart';
import 'sync_worker_protocol.dart';
import 'web_bucket_storage.dart';

final _logger = autoLogger;

Expand Down Expand Up @@ -258,7 +258,7 @@ class _SyncRunner {
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;

sync = StreamingSyncImplementation(
adapter: BucketStorage(database),
adapter: WebBucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
Expand Down
20 changes: 20 additions & 0 deletions packages/powersync/lib/src/web/web_bucket_storage.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import 'package:powersync/sqlite_async.dart';
import 'package:powersync/src/bucket_storage.dart';
import 'package:sqlite_async/web.dart';

class WebBucketStorage extends BucketStorage {
final WebSqliteConnection _webDb;

WebBucketStorage(this._webDb) : super(_webDb);

@override

/// Override to implement the flush parameter for web.
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout,
required bool flush}) async {
return _webDb.writeTransaction(callback,
lockTimeout: lockTimeout, flush: flush);
}
}
4 changes: 2 additions & 2 deletions scripts/compile_webworker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Future<void> main() async {

if (dbWorkerProcess.exitCode != 0) {
throw Exception(
'Could not compile db worker: ${dbWorkerProcess.stdout.toString()}');
'Could not compile db worker.\nstdout: ${dbWorkerProcess.stdout.toString()}\nstderr: ${dbWorkerProcess.stderr.toString()}');
}

final syncWorkerFilename = 'powersync_sync.worker.js';
Expand All @@ -54,7 +54,7 @@ Future<void> main() async {

if (syncWorkerProcess.exitCode != 0) {
throw Exception(
'Could not compile sync worker: ${dbWorkerProcess.stdout.toString()}');
'Could not compile sync worker:\nstdout: ${syncWorkerProcess.stdout.toString()}\nstderr: ${syncWorkerProcess.stderr.toString()}');
}

// Copy this to all demo apps web folders
Expand Down
Loading