Skip to content

Commit b665748

Browse files
authored
Merge pull request #31 from powersync-ja/fix-mutex-reuse
Fix issue with re-using a shared Mutex or isolateConnectionFactory
2 parents f994e16 + 5c25fcd commit b665748

File tree

3 files changed

+25
-20
lines changed

3 files changed

+25
-20
lines changed

lib/src/isolate_connection_factory.dart

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class IsolateConnectionFactory {
3737
readOnly: readOnly,
3838
debugName: debugName,
3939
updates: updates.stream,
40-
closeFunction: () {
41-
openMutex.close();
40+
closeFunction: () async {
41+
await openMutex.close();
4242
updates.close();
4343
});
4444
}
@@ -89,7 +89,7 @@ class _IsolateUpdateListener {
8989
}
9090

9191
class _IsolateSqliteConnection extends SqliteConnectionImpl {
92-
final void Function() closeFunction;
92+
final Future<void> Function() closeFunction;
9393

9494
_IsolateSqliteConnection(
9595
{required super.openFactory,
@@ -103,6 +103,6 @@ class _IsolateSqliteConnection extends SqliteConnectionImpl {
103103
@override
104104
Future<void> close() async {
105105
await super.close();
106-
closeFunction();
106+
await closeFunction();
107107
}
108108
}

lib/src/mutex.dart

+2-16
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ class SharedMutex implements Mutex {
188188
closed = true;
189189
// Wait for any existing locks to complete, then prevent any further locks from being taken out.
190190
await _acquire();
191-
client.fire(const _CloseMessage());
191+
// Release the lock
192+
_unlock();
192193
// Close client immediately after _unlock(),
193194
// so that we're sure no further locks are acquired.
194195
// This also cancels any lock request in process.
@@ -201,7 +202,6 @@ class _SharedMutexServer {
201202
Completer? unlock;
202203
late final SerializedMutex serialized;
203204
final Mutex mutex;
204-
bool closed = false;
205205

206206
late final PortServer server;
207207

@@ -216,11 +216,6 @@ class _SharedMutexServer {
216216
if (arg is _AcquireMessage) {
217217
var lock = Completer.sync();
218218
mutex.lock(() async {
219-
if (closed) {
220-
// The client will error already - we just need to ensure
221-
// we don't take out another lock.
222-
return;
223-
}
224219
assert(unlock == null);
225220
unlock = Completer.sync();
226221
lock.complete();
@@ -231,10 +226,6 @@ class _SharedMutexServer {
231226
} else if (arg is _UnlockMessage) {
232227
assert(unlock != null);
233228
unlock!.complete();
234-
} else if (arg is _CloseMessage) {
235-
// Unlock and close (from client side)
236-
closed = true;
237-
unlock?.complete();
238229
}
239230
}
240231

@@ -251,11 +242,6 @@ class _UnlockMessage {
251242
const _UnlockMessage();
252243
}
253244

254-
/// Unlock and close
255-
class _CloseMessage {
256-
const _CloseMessage();
257-
}
258-
259245
class LockError extends Error {
260246
final String message;
261247

test/mutex_test.dart

+19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,25 @@ void main() {
2222
expect(result, equals(5));
2323
}
2424
});
25+
26+
test('Re-use after closing', () async {
27+
// Test that shared locks can be opened and closed multiple times.
28+
final mutex = SimpleMutex();
29+
final serialized = mutex.shared;
30+
31+
final result = await Isolate.run(() async {
32+
return _lockInIsolate(serialized);
33+
});
34+
35+
final result2 = await Isolate.run(() async {
36+
return _lockInIsolate(serialized);
37+
});
38+
39+
await mutex.lock(() async {});
40+
41+
expect(result, equals(5));
42+
expect(result2, equals(5));
43+
});
2544
}, timeout: const Timeout(Duration(milliseconds: 5000)));
2645
}
2746

0 commit comments

Comments
 (0)