Skip to content

Refactor connection locks #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/powersync_core/lib/src/abort_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class AbortController {
return _abortRequested.future;
}

Future<void> get onCompletion {
return _abortCompleter.future;
}

/// Abort, and wait until aborting is complete.
Future<void> abort() async {
aborted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import 'package:powersync_core/src/log_internal.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/schema_logic.dart';
import 'package:powersync_core/src/streaming_sync.dart';
import 'package:powersync_core/src/sync_status.dart';
import 'package:sqlite_async/sqlite3_common.dart';
Expand Down Expand Up @@ -109,42 +108,55 @@ class PowerSyncDatabaseImpl
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
PowerSyncDatabaseImpl.withDatabase(
{required this.schema, required this.database, Logger? logger}) {
if (logger != null) {
this.logger = logger;
} else {
this.logger = autoLogger;
}
this.logger = logger ?? autoLogger;
isInitialized = baseInit();
}

@override
@internal

/// Connect to the PowerSync service, and keep the databases in sync.
///
/// The connection is automatically re-opened if it fails for any reason.
///
/// Status changes are reported on [statusStream].
baseConnect(
{required PowerSyncBackendConnector connector,

/// Throttle time between CRUD operations
/// Defaults to 10 milliseconds.
required Duration crudThrottleTime,
required Future<void> Function() reconnect,
Map<String, dynamic>? params}) async {
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
Map<String, dynamic>? params,
}) async {
await initialize();

// Disconnect if connected
await disconnect();
final disconnector = AbortController();
disconnecter = disconnector;

await isInitialized;
final dbRef = database.isolateConnectionFactory();
ReceivePort rPort = ReceivePort();

bool triedSpawningIsolate = false;
StreamSubscription<UpdateNotification>? crudUpdateSubscription;
rPort.listen((data) async {
final receiveMessages = ReceivePort();
final receiveUnhandledErrors = ReceivePort();
final receiveExit = ReceivePort();

SendPort? initPort;
final hasInitPort = Completer<void>();
final receivedIsolateExit = Completer<void>();

Future<void> waitForShutdown() async {
// Only complete the abortion signal after the isolate shuts down. This
// ensures absolutely no trace of this sync iteration remains.
if (triedSpawningIsolate) {
await receivedIsolateExit.future;
}

// Cleanup
crudUpdateSubscription?.cancel();
receiveMessages.close();
receiveUnhandledErrors.close();
receiveExit.close();

// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
abort.completeAbort();
}

Future<void> close() async {
initPort?.send(['close']);
await waitForShutdown();
}

receiveMessages.listen((data) async {
if (data is List) {
String action = data[0] as String;
if (action == "getCredentials") {
Expand All @@ -159,27 +171,20 @@ class PowerSyncDatabaseImpl
await connector.prefetchCredentials();
});
} else if (action == 'init') {
SendPort port = data[1] as SendPort;
final port = initPort = data[1] as SendPort;
hasInitPort.complete();
var crudStream =
database.onChange(['ps_crud'], throttle: crudThrottleTime);
crudUpdateSubscription = crudStream.listen((event) {
port.send(['update']);
});
disconnector.onAbort.then((_) {
port.send(['close']);
}).ignore();
} else if (action == 'uploadCrud') {
await (data[1] as PortCompleter).handle(() async {
await connector.uploadData(this);
});
} else if (action == 'status') {
final SyncStatus status = data[1] as SyncStatus;
setStatus(status);
} else if (action == 'close') {
// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
rPort.close();
crudUpdateSubscription?.cancel();
} else if (action == 'log') {
LogRecord record = data[1] as LogRecord;
logger.log(
Expand All @@ -188,8 +193,7 @@ class PowerSyncDatabaseImpl
}
});

var errorPort = ReceivePort();
errorPort.listen((message) async {
receiveUnhandledErrors.listen((message) async {
// Sample error:
// flutter: [PowerSync] WARNING: 2023-06-28 16:34:11.566122: Sync Isolate error
// flutter: [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
Expand All @@ -200,38 +204,38 @@ class PowerSyncDatabaseImpl
// ...
logger.severe('Sync Isolate error', message);

// Reconnect
// Use the param like this instead of directly calling connect(), to avoid recursive
// locks in some edge cases.
reconnect();
// Fatal errors are enabled, so the isolate will exit soon, causing us to
// complete the abort controller which will make the db mixin reconnect if
// necessary. There's no need to reconnect manually.
});

disconnected() {
disconnector.completeAbort();
disconnecter = null;
rPort.close();
// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
// Don't spawn isolate if this operation was cancelled already.
if (abort.aborted) {
return waitForShutdown();
}

var exitPort = ReceivePort();
exitPort.listen((message) {
receiveExit.listen((message) {
logger.fine('Sync Isolate exit');
disconnected();
receivedIsolateExit.complete();
});

if (disconnecter?.aborted == true) {
disconnected();
return;
}

Isolate.spawn(
_powerSyncDatabaseIsolate,
_PowerSyncDatabaseIsolateArgs(
rPort.sendPort, dbRef, retryDelay, clientParams),
debugName: 'PowerSyncDatabase',
onError: errorPort.sendPort,
onExit: exitPort.sendPort);
// Spawning the isolate can't be interrupted
triedSpawningIsolate = true;
await Isolate.spawn(
_syncIsolate,
_PowerSyncDatabaseIsolateArgs(
receiveMessages.sendPort, dbRef, retryDelay, clientParams),
debugName: 'Sync ${database.openFactory.path}',
onError: receiveUnhandledErrors.sendPort,
errorsAreFatal: true,
onExit: receiveExit.sendPort,
);
await hasInitPort.future;

abort.onAbort.whenComplete(close);

// Automatically complete the abort controller once the isolate exits.
unawaited(waitForShutdown());
}

/// Takes a read lock, without starting a transaction.
Expand All @@ -255,16 +259,6 @@ class PowerSyncDatabaseImpl
return database.writeLock(callback,
debugContext: debugContext, lockTimeout: lockTimeout);
}

@override
Future<void> updateSchema(Schema schema) {
if (disconnecter != null) {
throw AssertionError('Cannot update schema while connected');
}
schema.validate();
this.schema = schema;
return updateSchemaInIsolate(database, schema);
}
}

class _PowerSyncDatabaseIsolateArgs {
Expand All @@ -277,64 +271,73 @@ class _PowerSyncDatabaseIsolateArgs {
this.sPort, this.dbRef, this.retryDelay, this.parameters);
}

Future<void> _powerSyncDatabaseIsolate(
_PowerSyncDatabaseIsolateArgs args) async {
Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
final sPort = args.sPort;
ReceivePort rPort = ReceivePort();
final rPort = ReceivePort();
StreamController<String> crudUpdateController = StreamController.broadcast();
final upstreamDbClient = args.dbRef.upstreamPort.open();

CommonDatabase? db;
final Mutex mutex = args.dbRef.mutex.open();
StreamingSyncImplementation? openedStreamingSync;
StreamSubscription<void>? localUpdatesSubscription;

Future<void> shutdown() async {
await openedStreamingSync?.abort();

localUpdatesSubscription?.cancel();
db?.dispose();
crudUpdateController.close();
upstreamDbClient.close();

// The SyncSqliteConnection uses this mutex
// It needs to be closed before killing the isolate
// in order to free the mutex for other operations.
await mutex.close();
rPort.close();

// TODO: If we closed our resources properly, this wouldn't be necessary...
Isolate.current.kill();
}

rPort.listen((message) async {
if (message is List) {
String action = message[0] as String;
if (action == 'update') {
crudUpdateController.add('update');
if (!crudUpdateController.isClosed) {
crudUpdateController.add('update');
}
} else if (action == 'close') {
// The SyncSqliteConnection uses this mutex
// It needs to be closed before killing the isolate
// in order to free the mutex for other operations.
await mutex.close();
db?.dispose();
crudUpdateController.close();
upstreamDbClient.close();
// Abort any open http requests, and wait for it to be closed properly
await openedStreamingSync?.abort();
// No kill the Isolate
Isolate.current.kill();
await shutdown();
}
}
});
Isolate.current.addOnExitListener(sPort, response: const ['close']);
sPort.send(["init", rPort.sendPort]);
sPort.send(['init', rPort.sendPort]);

// Is there a way to avoid the overhead if logging is not enabled?
// This only takes effect in this isolate.
isolateLogger.level = Level.ALL;
isolateLogger.onRecord.listen((record) {
var copy = LogRecord(record.level, record.message, record.loggerName,
record.error, record.stackTrace);
sPort.send(["log", copy]);
sPort.send(['log', copy]);
});

Future<PowerSyncCredentials?> loadCredentials() async {
final r = IsolateResult<PowerSyncCredentials?>();
sPort.send(["getCredentials", r.completer]);
sPort.send(['getCredentials', r.completer]);
return r.future;
}

Future<void> invalidateCredentials() async {
final r = IsolateResult<void>();
sPort.send(["invalidateCredentials", r.completer]);
sPort.send(['invalidateCredentials', r.completer]);
return r.future;
}

Future<void> uploadCrud() async {
final r = IsolateResult<void>();
sPort.send(["uploadCrud", r.completer]);
sPort.send(['uploadCrud', r.completer]);
return r.future;
}

Expand Down Expand Up @@ -372,7 +375,7 @@ Future<void> _powerSyncDatabaseIsolate(
}
}

db!.updates.listen((event) {
localUpdatesSubscription = db!.updates.listen((event) {
updatedTables.add(event.tableName);

updateDebouncer ??=
Expand All @@ -383,7 +386,7 @@ Future<void> _powerSyncDatabaseIsolate(
// Unfortunately, this does not handle disposing while the database is opening.
// This should be rare - any uncaught error is a bug. And in most cases,
// it should occur after the database is already open.
db?.dispose();
shutdown();
throw error;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:powersync_core/sqlite_async.dart';
import 'package:powersync_core/src/abort_controller.dart';
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'powersync_database.dart';
Expand All @@ -24,6 +25,9 @@ class PowerSyncDatabaseImpl
@override
Schema get schema => throw UnimplementedError();

@override
set schema(Schema s) => throw UnimplementedError();

@override
SqliteDatabase get database => throw UnimplementedError();

Expand Down Expand Up @@ -101,20 +105,15 @@ class PowerSyncDatabaseImpl
throw UnimplementedError();
}

@override
Future<void> updateSchema(Schema schema) {
throw UnimplementedError();
}

@override
Logger get logger => throw UnimplementedError();

@override
@internal
Future<void> baseConnect(
Future<void> connectInternal(
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required Future<void> Function() reconnect,
required AbortController abort,
Map<String, dynamic>? params}) {
throw UnimplementedError();
}
Expand Down
Loading
Loading