Skip to content

Commit bea5c58

Browse files
authored
Merge pull request #38 from powersync-ja/connectionpool-performance
Improve connection pool read performance
2 parents 662e578 + 88d042b commit bea5c58

File tree

5 files changed

+272
-97
lines changed

5 files changed

+272
-97
lines changed

Diff for: lib/src/connection_pool.dart

+131-79
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:collection';
23

34
import 'mutex.dart';
45
import 'port_channel.dart';
@@ -12,7 +13,9 @@ import 'update_notification.dart';
1213
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
1314
SqliteConnection? _writeConnection;
1415

15-
final List<SqliteConnectionImpl> _readConnections = [];
16+
final Set<SqliteConnectionImpl> _allReadConnections = {};
17+
final Queue<SqliteConnectionImpl> _availableReadConnections = Queue();
18+
final Queue<_PendingItem> _queue = Queue();
1619

1720
final SqliteOpenFactory _factory;
1821
final SerializedPortClient _upstreamPort;
@@ -53,72 +56,84 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5356
@override
5457
Future<bool> getAutoCommit() async {
5558
if (_writeConnection == null) {
56-
throw AssertionError('Closed');
59+
throw ClosedException();
5760
}
5861
return await _writeConnection!.getAutoCommit();
5962
}
6063

61-
@override
62-
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
63-
{Duration? lockTimeout, String? debugContext}) async {
64-
await _expandPool();
65-
66-
return _runZoned(() async {
67-
bool haveLock = false;
68-
var completer = Completer<T>();
64+
void _nextRead() {
65+
if (_queue.isEmpty) {
66+
// Wait for queue item
67+
return;
68+
} else if (closed) {
69+
while (_queue.isNotEmpty) {
70+
final nextItem = _queue.removeFirst();
71+
nextItem.completer.completeError(const ClosedException());
72+
}
73+
return;
74+
}
6975

70-
var futures = _readConnections.sublist(0).map((connection) async {
71-
if (connection.closed) {
72-
_readConnections.remove(connection);
73-
}
74-
try {
75-
return await connection.readLock((ctx) async {
76-
if (haveLock) {
77-
// Already have a different lock - release this one.
78-
return false;
79-
}
80-
haveLock = true;
81-
82-
var future = callback(ctx);
83-
completer.complete(future);
84-
85-
// We have to wait for the future to complete before we can release the
86-
// lock.
87-
try {
88-
await future;
89-
} catch (_) {
90-
// Ignore
91-
}
92-
93-
return true;
94-
}, lockTimeout: lockTimeout, debugContext: debugContext);
95-
} on TimeoutException {
96-
return false;
97-
}
98-
});
76+
while (_availableReadConnections.isNotEmpty &&
77+
_availableReadConnections.last.closed) {
78+
// Remove connections that may have errored
79+
final connection = _availableReadConnections.removeLast();
80+
_allReadConnections.remove(connection);
81+
}
9982

100-
final stream = Stream<bool>.fromFutures(futures);
101-
var gotAny = await stream.any((element) => element);
83+
if (_availableReadConnections.isEmpty &&
84+
_allReadConnections.length == maxReaders) {
85+
// Wait for available connection
86+
return;
87+
}
10288

103-
if (!gotAny) {
104-
// All TimeoutExceptions
105-
throw TimeoutException('Failed to get a read connection', lockTimeout);
89+
var nextItem = _queue.removeFirst();
90+
while (nextItem.completer.isCompleted) {
91+
// This item already timed out - try the next one if available
92+
if (_queue.isEmpty) {
93+
return;
10694
}
95+
nextItem = _queue.removeFirst();
96+
}
97+
98+
nextItem.lockTimer?.cancel();
10799

100+
nextItem.completer.complete(Future.sync(() async {
101+
final nextConnection = _availableReadConnections.isEmpty
102+
? await _expandPool()
103+
: _availableReadConnections.removeLast();
108104
try {
109-
return await completer.future;
110-
} catch (e) {
111-
// throw e;
112-
rethrow;
105+
// At this point the connection is expected to be available immediately.
106+
// No need to calculate a new lockTimeout here.
107+
final result = await nextConnection.readLock(nextItem.callback);
108+
return result;
109+
} finally {
110+
_availableReadConnections.add(nextConnection);
111+
Timer.run(_nextRead);
113112
}
114-
}, debugContext: debugContext ?? 'get*()');
113+
}));
114+
}
115+
116+
@override
117+
Future<T> readLock<T>(ReadCallback<T> callback,
118+
{Duration? lockTimeout, String? debugContext}) async {
119+
if (closed) {
120+
throw ClosedException();
121+
}
122+
final zone = _getZone(debugContext: debugContext ?? 'get*()');
123+
final item = _PendingItem((ctx) {
124+
return zone.runUnary(callback, ctx);
125+
}, lockTimeout: lockTimeout);
126+
_queue.add(item);
127+
_nextRead();
128+
129+
return (await item.future) as T;
115130
}
116131

117132
@override
118133
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
119134
{Duration? lockTimeout, String? debugContext}) {
120135
if (closed) {
121-
throw AssertionError('Closed');
136+
throw ClosedException();
122137
}
123138
if (_writeConnection?.closed == true) {
124139
_writeConnection = null;
@@ -144,46 +159,52 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
144159
/// connection (with a different lock).
145160
/// 2. Give a more specific error message when it happens.
146161
T _runZoned<T>(T Function() callback, {required String debugContext}) {
162+
return _getZone(debugContext: debugContext).run(callback);
163+
}
164+
165+
Zone _getZone({required String debugContext}) {
147166
if (Zone.current[this] != null) {
148167
throw LockError(
149168
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
150169
}
151-
var zone = Zone.current.fork(zoneValues: {this: true});
152-
return zone.run(callback);
170+
return Zone.current.fork(zoneValues: {this: true});
153171
}
154172

155-
Future<void> _expandPool() async {
156-
if (closed || _readConnections.length >= maxReaders) {
157-
return;
158-
}
159-
bool hasCapacity = _readConnections.any((connection) => !connection.locked);
160-
if (!hasCapacity) {
161-
var name = debugName == null
162-
? null
163-
: '$debugName-${_readConnections.length + 1}';
164-
var connection = SqliteConnectionImpl(
165-
upstreamPort: _upstreamPort,
166-
primary: false,
167-
updates: updates,
168-
debugName: name,
169-
mutex: mutex,
170-
readOnly: true,
171-
openFactory: _factory);
172-
_readConnections.add(connection);
173-
174-
// Edge case:
175-
// If we don't await here, there is a chance that a different connection
176-
// is used for the transaction, and that it finishes and deletes the database
177-
// while this one is still opening. This is specifically triggered in tests.
178-
// To avoid that, we wait for the connection to be ready.
179-
await connection.ready;
180-
}
173+
Future<SqliteConnectionImpl> _expandPool() async {
174+
var name = debugName == null
175+
? null
176+
: '$debugName-${_allReadConnections.length + 1}';
177+
var connection = SqliteConnectionImpl(
178+
upstreamPort: _upstreamPort,
179+
primary: false,
180+
updates: updates,
181+
debugName: name,
182+
mutex: mutex,
183+
readOnly: true,
184+
openFactory: _factory);
185+
_allReadConnections.add(connection);
186+
187+
// Edge case:
188+
// If we don't await here, there is a chance that a different connection
189+
// is used for the transaction, and that it finishes and deletes the database
190+
// while this one is still opening. This is specifically triggered in tests.
191+
// To avoid that, we wait for the connection to be ready.
192+
await connection.ready;
193+
return connection;
181194
}
182195

183196
@override
184197
Future<void> close() async {
185198
closed = true;
186-
for (var connection in _readConnections) {
199+
200+
// It is possible that `readLock()` removes connections from the pool while we're
201+
// closing connections, but not possible for new connections to be added.
202+
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
203+
final toClose = _allReadConnections.toList();
204+
for (var connection in toClose) {
205+
// Wait for connection initialization, so that any existing readLock()
206+
// requests go through before closing.
207+
await connection.ready;
187208
await connection.close();
188209
}
189210
// Closing the write connection cleans up the journal files (-shm and -wal files).
@@ -192,3 +213,34 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
192213
await _writeConnection?.close();
193214
}
194215
}
216+
217+
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
218+
219+
class _PendingItem {
220+
ReadCallback<dynamic> callback;
221+
Completer<dynamic> completer = Completer.sync();
222+
late Future<dynamic> future = completer.future;
223+
DateTime? deadline;
224+
final Duration? lockTimeout;
225+
late final Timer? lockTimer;
226+
227+
_PendingItem(this.callback, {this.lockTimeout}) {
228+
if (lockTimeout != null) {
229+
deadline = DateTime.now().add(lockTimeout!);
230+
lockTimer = Timer(lockTimeout!, () {
231+
// Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available.
232+
// This matches the behavior we need for a timeout on the lock, but not the entire operation.
233+
if (!completer.isCompleted) {
234+
// completer.completeError(
235+
// TimeoutException('Failed to get a read connection', lockTimeout));
236+
completer.complete(Future.sync(() async {
237+
throw TimeoutException(
238+
'Failed to get a read connection', lockTimeout);
239+
}));
240+
}
241+
});
242+
} else {
243+
lockTimer = null;
244+
}
245+
}
246+
}

0 commit comments

Comments
 (0)