diff --git a/packages/powersync_core/lib/src/abort_controller.dart b/packages/powersync_core/lib/src/abort_controller.dart index 3cacc39b..96699c68 100644 --- a/packages/powersync_core/lib/src/abort_controller.dart +++ b/packages/powersync_core/lib/src/abort_controller.dart @@ -14,6 +14,10 @@ class AbortController { return _abortRequested.future; } + Future get onCompletion { + return _abortCompleter.future; + } + /// Abort, and wait until aborting is complete. Future abort() async { aborted = true; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 0b61898c..a70d5090 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -15,7 +15,6 @@ import 'package:powersync_core/src/log_internal.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'package:powersync_core/src/open_factory/native/native_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; -import 'package:powersync_core/src/schema_logic.dart'; import 'package:powersync_core/src/streaming_sync.dart'; import 'package:powersync_core/src/sync_status.dart'; import 'package:sqlite_async/sqlite3_common.dart'; @@ -109,42 +108,55 @@ class PowerSyncDatabaseImpl /// [logger] defaults to [autoLogger], which logs to the console in debug builds.s PowerSyncDatabaseImpl.withDatabase( {required this.schema, required this.database, Logger? logger}) { - if (logger != null) { - this.logger = logger; - } else { - this.logger = autoLogger; - } + this.logger = logger ?? autoLogger; isInitialized = baseInit(); } @override @internal - - /// Connect to the PowerSync service, and keep the databases in sync. - /// - /// The connection is automatically re-opened if it fails for any reason. - /// - /// Status changes are reported on [statusStream]. - baseConnect( - {required PowerSyncBackendConnector connector, - - /// Throttle time between CRUD operations - /// Defaults to 10 milliseconds. - required Duration crudThrottleTime, - required Future Function() reconnect, - Map? params}) async { + Future connectInternal({ + required PowerSyncBackendConnector connector, + required Duration crudThrottleTime, + required AbortController abort, + Map? params, + }) async { await initialize(); - - // Disconnect if connected - await disconnect(); - final disconnector = AbortController(); - disconnecter = disconnector; - - await isInitialized; final dbRef = database.isolateConnectionFactory(); - ReceivePort rPort = ReceivePort(); + + bool triedSpawningIsolate = false; StreamSubscription? crudUpdateSubscription; - rPort.listen((data) async { + final receiveMessages = ReceivePort(); + final receiveUnhandledErrors = ReceivePort(); + final receiveExit = ReceivePort(); + + SendPort? initPort; + final hasInitPort = Completer(); + final receivedIsolateExit = Completer(); + + Future waitForShutdown() async { + // Only complete the abortion signal after the isolate shuts down. This + // ensures absolutely no trace of this sync iteration remains. + if (triedSpawningIsolate) { + await receivedIsolateExit.future; + } + + // Cleanup + crudUpdateSubscription?.cancel(); + receiveMessages.close(); + receiveUnhandledErrors.close(); + receiveExit.close(); + + // Clear status apart from lastSyncedAt + setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt)); + abort.completeAbort(); + } + + Future close() async { + initPort?.send(['close']); + await waitForShutdown(); + } + + receiveMessages.listen((data) async { if (data is List) { String action = data[0] as String; if (action == "getCredentials") { @@ -159,15 +171,13 @@ class PowerSyncDatabaseImpl await connector.prefetchCredentials(); }); } else if (action == 'init') { - SendPort port = data[1] as SendPort; + final port = initPort = data[1] as SendPort; + hasInitPort.complete(); var crudStream = database.onChange(['ps_crud'], throttle: crudThrottleTime); crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); - disconnector.onAbort.then((_) { - port.send(['close']); - }).ignore(); } else if (action == 'uploadCrud') { await (data[1] as PortCompleter).handle(() async { await connector.uploadData(this); @@ -175,11 +185,6 @@ class PowerSyncDatabaseImpl } else if (action == 'status') { final SyncStatus status = data[1] as SyncStatus; setStatus(status); - } else if (action == 'close') { - // Clear status apart from lastSyncedAt - setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt)); - rPort.close(); - crudUpdateSubscription?.cancel(); } else if (action == 'log') { LogRecord record = data[1] as LogRecord; logger.log( @@ -188,8 +193,7 @@ class PowerSyncDatabaseImpl } }); - var errorPort = ReceivePort(); - errorPort.listen((message) async { + receiveUnhandledErrors.listen((message) async { // Sample error: // flutter: [PowerSync] WARNING: 2023-06-28 16:34:11.566122: Sync Isolate error // flutter: [Connection closed while receiving data, #0 IOClient.send. (package:http/src/io_client.dart:76:13) @@ -200,38 +204,38 @@ class PowerSyncDatabaseImpl // ... logger.severe('Sync Isolate error', message); - // Reconnect - // Use the param like this instead of directly calling connect(), to avoid recursive - // locks in some edge cases. - reconnect(); + // Fatal errors are enabled, so the isolate will exit soon, causing us to + // complete the abort controller which will make the db mixin reconnect if + // necessary. There's no need to reconnect manually. }); - disconnected() { - disconnector.completeAbort(); - disconnecter = null; - rPort.close(); - // Clear status apart from lastSyncedAt - setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt)); + // Don't spawn isolate if this operation was cancelled already. + if (abort.aborted) { + return waitForShutdown(); } - var exitPort = ReceivePort(); - exitPort.listen((message) { + receiveExit.listen((message) { logger.fine('Sync Isolate exit'); - disconnected(); + receivedIsolateExit.complete(); }); - if (disconnecter?.aborted == true) { - disconnected(); - return; - } - - Isolate.spawn( - _powerSyncDatabaseIsolate, - _PowerSyncDatabaseIsolateArgs( - rPort.sendPort, dbRef, retryDelay, clientParams), - debugName: 'PowerSyncDatabase', - onError: errorPort.sendPort, - onExit: exitPort.sendPort); + // Spawning the isolate can't be interrupted + triedSpawningIsolate = true; + await Isolate.spawn( + _syncIsolate, + _PowerSyncDatabaseIsolateArgs( + receiveMessages.sendPort, dbRef, retryDelay, clientParams), + debugName: 'Sync ${database.openFactory.path}', + onError: receiveUnhandledErrors.sendPort, + errorsAreFatal: true, + onExit: receiveExit.sendPort, + ); + await hasInitPort.future; + + abort.onAbort.whenComplete(close); + + // Automatically complete the abort controller once the isolate exits. + unawaited(waitForShutdown()); } /// Takes a read lock, without starting a transaction. @@ -255,16 +259,6 @@ class PowerSyncDatabaseImpl return database.writeLock(callback, debugContext: debugContext, lockTimeout: lockTimeout); } - - @override - Future updateSchema(Schema schema) { - if (disconnecter != null) { - throw AssertionError('Cannot update schema while connected'); - } - schema.validate(); - this.schema = schema; - return updateSchemaInIsolate(database, schema); - } } class _PowerSyncDatabaseIsolateArgs { @@ -277,39 +271,48 @@ class _PowerSyncDatabaseIsolateArgs { this.sPort, this.dbRef, this.retryDelay, this.parameters); } -Future _powerSyncDatabaseIsolate( - _PowerSyncDatabaseIsolateArgs args) async { +Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { final sPort = args.sPort; - ReceivePort rPort = ReceivePort(); + final rPort = ReceivePort(); StreamController crudUpdateController = StreamController.broadcast(); final upstreamDbClient = args.dbRef.upstreamPort.open(); CommonDatabase? db; final Mutex mutex = args.dbRef.mutex.open(); StreamingSyncImplementation? openedStreamingSync; + StreamSubscription? localUpdatesSubscription; + + Future shutdown() async { + await openedStreamingSync?.abort(); + + localUpdatesSubscription?.cancel(); + db?.dispose(); + crudUpdateController.close(); + upstreamDbClient.close(); + + // The SyncSqliteConnection uses this mutex + // It needs to be closed before killing the isolate + // in order to free the mutex for other operations. + await mutex.close(); + rPort.close(); + + // TODO: If we closed our resources properly, this wouldn't be necessary... + Isolate.current.kill(); + } rPort.listen((message) async { if (message is List) { String action = message[0] as String; if (action == 'update') { - crudUpdateController.add('update'); + if (!crudUpdateController.isClosed) { + crudUpdateController.add('update'); + } } else if (action == 'close') { - // The SyncSqliteConnection uses this mutex - // It needs to be closed before killing the isolate - // in order to free the mutex for other operations. - await mutex.close(); - db?.dispose(); - crudUpdateController.close(); - upstreamDbClient.close(); - // Abort any open http requests, and wait for it to be closed properly - await openedStreamingSync?.abort(); - // No kill the Isolate - Isolate.current.kill(); + await shutdown(); } } }); - Isolate.current.addOnExitListener(sPort, response: const ['close']); - sPort.send(["init", rPort.sendPort]); + sPort.send(['init', rPort.sendPort]); // Is there a way to avoid the overhead if logging is not enabled? // This only takes effect in this isolate. @@ -317,24 +320,24 @@ Future _powerSyncDatabaseIsolate( isolateLogger.onRecord.listen((record) { var copy = LogRecord(record.level, record.message, record.loggerName, record.error, record.stackTrace); - sPort.send(["log", copy]); + sPort.send(['log', copy]); }); Future loadCredentials() async { final r = IsolateResult(); - sPort.send(["getCredentials", r.completer]); + sPort.send(['getCredentials', r.completer]); return r.future; } Future invalidateCredentials() async { final r = IsolateResult(); - sPort.send(["invalidateCredentials", r.completer]); + sPort.send(['invalidateCredentials', r.completer]); return r.future; } Future uploadCrud() async { final r = IsolateResult(); - sPort.send(["uploadCrud", r.completer]); + sPort.send(['uploadCrud', r.completer]); return r.future; } @@ -372,7 +375,7 @@ Future _powerSyncDatabaseIsolate( } } - db!.updates.listen((event) { + localUpdatesSubscription = db!.updates.listen((event) { updatedTables.add(event.tableName); updateDebouncer ??= @@ -383,7 +386,7 @@ Future _powerSyncDatabaseIsolate( // Unfortunately, this does not handle disposing while the database is opening. // This should be rare - any uncaught error is a bug. And in most cases, // it should occur after the database is already open. - db?.dispose(); + shutdown(); throw error; }); } diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 5f60ef16..8966f368 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/sqlite_async.dart'; +import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import 'powersync_database.dart'; @@ -24,6 +25,9 @@ class PowerSyncDatabaseImpl @override Schema get schema => throw UnimplementedError(); + @override + set schema(Schema s) => throw UnimplementedError(); + @override SqliteDatabase get database => throw UnimplementedError(); @@ -101,20 +105,15 @@ class PowerSyncDatabaseImpl throw UnimplementedError(); } - @override - Future updateSchema(Schema schema) { - throw UnimplementedError(); - } - @override Logger get logger => throw UnimplementedError(); @override @internal - Future baseConnect( + Future connectInternal( {required PowerSyncBackendConnector connector, required Duration crudThrottleTime, - required Future Function() reconnect, + required AbortController abort, Map? params}) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 63792c89..647967fe 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -12,12 +12,16 @@ import 'package:powersync_core/src/database/core_version.dart'; import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; +import 'package:powersync_core/src/schema_logic.dart' as schema_logic; import 'package:powersync_core/src/sync_status.dart'; mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Schema used for the local database. Schema get schema; + @internal + set schema(Schema schema); + /// The underlying database. /// /// For the most part, behavior is the same whether querying on the underlying @@ -65,9 +69,14 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @protected Future get isInitialized; - /// null when disconnected, present when connecting or connected + /// The abort controller for the current sync iteration. + /// + /// null when disconnected, present when connecting or connected. + /// + /// The controller must only be accessed from within a critical section of the + /// sync mutex. @protected - AbortController? disconnecter; + AbortController? _abortActiveSync; @protected Future baseInit() async { @@ -242,61 +251,106 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// The connection is automatically re-opened if it fails for any reason. /// /// Status changes are reported on [statusStream]. - Future connect( - {required PowerSyncBackendConnector connector, - - /// Throttle time between CRUD operations - /// Defaults to 10 milliseconds. - Duration crudThrottleTime = const Duration(milliseconds: 10), - Map? params}) async { + Future connect({ + required PowerSyncBackendConnector connector, + Duration crudThrottleTime = const Duration(milliseconds: 10), + Map? params, + }) async { clientParams = params; - Zone current = Zone.current; - - Future reconnect() { - return _activeGroup.syncConnectMutex.lock(() => baseConnect( - connector: connector, - crudThrottleTime: crudThrottleTime, - // The reconnect function needs to run in the original zone, - // to avoid recursive lock errors. - reconnect: current.bindCallback(reconnect), - params: params)); + var thisConnectAborter = AbortController(); + + late void Function() retryHandler; + + Future connectWithSyncLock() async { + // Ensure there has not been a subsequent connect() call installing a new + // sync client. + assert(identical(_abortActiveSync, thisConnectAborter)); + assert(!thisConnectAborter.aborted); + + await connectInternal( + connector: connector, + crudThrottleTime: crudThrottleTime, + params: params, + abort: thisConnectAborter, + ); + + thisConnectAborter.onCompletion.whenComplete(retryHandler); } - await reconnect(); + // If the sync encounters a failure without being aborted, retry + retryHandler = Zone.current.bindCallback(() async { + _activeGroup.syncConnectMutex.lock(() async { + // Is this still supposed to be active? (abort is only called within + // mutex) + if (!thisConnectAborter.aborted) { + // We only change _abortActiveSync after disconnecting, which resets + // the abort controller. + assert(identical(_abortActiveSync, thisConnectAborter)); + + // We need a new abort controller for this attempt + _abortActiveSync = thisConnectAborter = AbortController(); + + logger.warning('Sync client failed, retrying...'); + await connectWithSyncLock(); + } + }); + }); + + await _activeGroup.syncConnectMutex.lock(() async { + // Disconnect a previous sync client, if one is active. + await _abortCurrentSync(); + assert(_abortActiveSync == null); + + // Install the abort controller for this particular connect call, allowing + // it to be disconnected. + _abortActiveSync = thisConnectAborter; + await connectWithSyncLock(); + }); } - /// Abstract connection method to be implemented by platform specific - /// classes. This is wrapped inside an exclusive mutex in the [connect] - /// method. + /// Internal method to establish a sync client connection. + /// + /// This method will always be wrapped in an exclusive mutex through the + /// [connect] method and should not be called elsewhere. + /// This method will only be called internally when no other sync client is + /// active, so the method should not call [disconnect] itself. + /// + /// The [crudThrottleTime] is the throttle time between CRUD operations, it + /// defaults to 10 milliseconds in [connect]. @protected @internal - Future baseConnect( - {required PowerSyncBackendConnector connector, - - /// Throttle time between CRUD operations - /// Defaults to 10 milliseconds. - required Duration crudThrottleTime, - required Future Function() reconnect, - Map? params}); + Future connectInternal({ + required PowerSyncBackendConnector connector, + required Duration crudThrottleTime, + required AbortController abort, + Map? params, + }); /// Close the sync connection. /// /// Use [connect] to connect again. Future disconnect() async { - if (disconnecter != null) { + // Also wrap this in the sync mutex to ensure there's no race between us + // connecting and disconnecting. + await _activeGroup.syncConnectMutex.lock(_abortCurrentSync); + + setStatus( + SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); + } + + Future _abortCurrentSync() async { + if (_abortActiveSync case final disconnector?) { /// Checking `disconnecter.aborted` prevents race conditions /// where multiple calls to `disconnect` can attempt to abort /// the controller more than once before it has finished aborting. - if (disconnecter!.aborted == false) { - await disconnecter!.abort(); - disconnecter = null; + if (disconnector.aborted == false) { + await disconnector.abort(); + _abortActiveSync = null; } else { /// Wait for the abort to complete. Continue updating the sync status after completed - await disconnecter!.onAbort; + await disconnector.onAbort; } } - setStatus( - SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); } /// Disconnect and clear the database. @@ -333,7 +387,18 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// specified once in the constructor. /// /// Cannot be used while connected - this should only be called before [connect]. - Future updateSchema(Schema schema); + Future updateSchema(Schema schema) async { + schema.validate(); + + await _activeGroup.syncConnectMutex.lock(() async { + if (_abortActiveSync != null) { + throw AssertionError('Cannot update schema while connected'); + } + + this.schema = schema; + await database.writeLock((tx) => schema_logic.updateSchema(tx, schema)); + }); + } /// A connection factory that can be passed to different isolates. /// diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index e159392d..18b88d40 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -13,7 +13,6 @@ import 'package:powersync_core/src/open_factory/web/web_open_factory.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/streaming_sync.dart'; import 'package:sqlite_async/sqlite_async.dart'; -import 'package:powersync_core/src/schema_logic.dart' as schema_logic; import '../../web/sync_controller.dart'; @@ -113,29 +112,14 @@ class PowerSyncDatabaseImpl @override @internal - - /// Connect to the PowerSync service, and keep the databases in sync. - /// - /// The connection is automatically re-opened if it fails for any reason. - /// - /// Status changes are reported on [statusStream]. - baseConnect({ + Future connectInternal({ required PowerSyncBackendConnector connector, - - /// Throttle time between CRUD operations - /// Defaults to 10 milliseconds. required Duration crudThrottleTime, - required Future Function() reconnect, + required AbortController abort, Map? params, }) async { await initialize(); - // Disconnect if connected - await disconnect(); - disconnecter = AbortController(); - - await isInitialized; - final crudStream = database.onChange(['ps_crud'], throttle: crudThrottleTime); @@ -145,11 +129,12 @@ class PowerSyncDatabaseImpl // duplicating work across tabs. try { sync = await SyncWorkerHandle.start( - database: this, - connector: connector, - crudThrottleTimeMs: crudThrottleTime.inMilliseconds, - workerUri: Uri.base.resolve('/powersync_sync.worker.js'), - syncParams: params); + database: this, + connector: connector, + crudThrottleTimeMs: crudThrottleTime.inMilliseconds, + workerUri: Uri.base.resolve('/powersync_sync.worker.js'), + syncParams: params, + ); } catch (e) { logger.warning( 'Could not use shared worker for synchronization, falling back to locks.', @@ -175,9 +160,10 @@ class PowerSyncDatabaseImpl setStatus(event); }); sync.streamingSync(); - disconnecter?.onAbort.then((_) async { + + abort.onAbort.then((_) async { await sync.abort(); - disconnecter?.completeAbort(); + abort.completeAbort(); }).ignore(); } @@ -223,14 +209,4 @@ class PowerSyncDatabaseImpl await isInitialized; return database.writeTransaction(callback, lockTimeout: lockTimeout); } - - @override - Future updateSchema(Schema schema) { - if (disconnecter != null) { - throw AssertionError('Cannot update schema while connected'); - } - schema.validate(); - this.schema = schema; - return database.writeLock((tx) => schema_logic.updateSchema(tx, schema)); - } } diff --git a/packages/powersync_core/lib/src/isolate_completer.dart b/packages/powersync_core/lib/src/isolate_completer.dart index 7192a59c..12a5c543 100644 --- a/packages/powersync_core/lib/src/isolate_completer.dart +++ b/packages/powersync_core/lib/src/isolate_completer.dart @@ -46,13 +46,10 @@ class PortCompleter { sendPort.send(PortResult.error(error, stackTrace)); } - void addExitHandler() { - Isolate.current.addOnExitListener(sendPort, response: abortedResponse); - } - Future handle(FutureOr Function() callback, {bool ignoreStackTrace = false}) async { - addExitHandler(); + Isolate.current.addOnExitListener(sendPort, response: abortedResponse); + try { final result = await callback(); complete(result); @@ -62,6 +59,8 @@ class PortCompleter { } else { completeError(error, stacktrace); } + } finally { + Isolate.current.removeOnExitListener(sendPort); } } } diff --git a/packages/powersync_core/test/schema_test.dart b/packages/powersync_core/test/schema_test.dart index 7c8b8f33..1a3df3e0 100644 --- a/packages/powersync_core/test/schema_test.dart +++ b/packages/powersync_core/test/schema_test.dart @@ -176,14 +176,11 @@ void main() { ]), ]); - try { - powersync.updateSchema(schema2); - } catch (e) { - expect( - e, - isA().having((e) => e.message, 'message', - 'Invalid characters in table name: #notworking')); - } + await expectLater( + () => powersync.updateSchema(schema2), + throwsA(isA().having((e) => e.message, 'message', + 'Invalid characters in table name: #notworking')), + ); }); }); diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/streaming_sync_test.dart index ed528869..b509c5f5 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/streaming_sync_test.dart @@ -5,6 +5,7 @@ library; import 'dart:async'; import 'dart:math'; +import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; @@ -27,6 +28,39 @@ void main() { await testUtils.cleanDb(path: path); }); + test('repeated connect and disconnect calls', () async { + final random = Random(); + final server = await createServer(); + final ignoreLogger = Logger.detached('powersync.test'); + + final pdb = + await testUtils.setupPowerSync(path: path, logger: ignoreLogger); + pdb.retryDelay = Duration(milliseconds: 5000); + final connector = TestConnector(() async { + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); + }); + + Duration nextDelay() { + return Duration(milliseconds: random.nextInt(100)); + } + + Future connectAndDisconnect() async { + for (var i = 0; i < 10; i++) { + await Future.delayed(nextDelay()); + await pdb.connect(connector: connector); + + await Future.delayed(nextDelay()); + await pdb.disconnect(); + } + } + + // Create a bunch of tasks calling connect and disconnect() concurrently. + await Future.wait([for (var i = 0; i < 10; i++) connectAndDisconnect()]); + + expect(server.maxConnectionCount, lessThanOrEqualTo(1)); + server.close(); + }); + test('full powersync reconnect', () async { // Test repeatedly creating new PowerSync connections, then disconnect // and close the connection.