-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathconnection_pool.dart
316 lines (272 loc) · 10.3 KB
/
connection_pool.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import 'dart:async';
import 'dart:collection';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart';
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
/// A connection pool with a single write connection and multiple read connections.
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
final StreamController<UpdateNotification> updatesController =
StreamController.broadcast();
@override
/// The write connection might be recreated if it's closed
/// This will allow the update stream remain constant even
/// after using a new write connection.
late final Stream<UpdateNotification> updates = updatesController.stream;
SqliteConnectionImpl? _writeConnection;
final Set<SqliteConnectionImpl> _allReadConnections = {};
final Queue<SqliteConnectionImpl> _availableReadConnections = Queue();
final Queue<_PendingItem> _queue = Queue();
final AbstractDefaultSqliteOpenFactory _factory;
final int maxReaders;
final String? debugName;
final MutexImpl mutex;
@override
bool closed = false;
/// Open a new connection pool.
///
/// The provided factory is used to open connections on demand. Connections
/// are only opened when requested for the first time.
///
/// [maxReaders] specifies the maximum number of read connections.
/// A maximum of one write connection will be opened.
///
/// Read connections are opened in read-only mode, and will reject any statements
/// that modify the database.
SqliteConnectionPool(this._factory,
{this.maxReaders = 5,
SqliteConnectionImpl? writeConnection,
this.debugName,
required this.mutex})
: _writeConnection = writeConnection {
// Use the write connection's updates
_writeConnection?.updates?.forEach(updatesController.add);
}
/// Executes a provided callback function exclusively across all read and
/// write connections in the pool.
///
/// This function first locks all read and write connections, collecting their
/// contexts. It then executes the provided [callback] function on each of these
/// contexts. After the [callback] completes for each context, the locks are released.
///
/// Example usage:
/// ```dart
/// await runExclusive((ctx) async {
/// // Perform some database operation with the ctx
/// await ctx.execute('PRAGMA schema_version');
/// });
/// ```
exclusiveLock<T>(
Future<T> Function(SqliteReadContext tx) callback,
) async {
final List<Completer<SqliteReadContext>> completers = [];
final List<Completer<void>> releasers = [];
for (final read in _allReadConnections) {
final completer = Completer<SqliteReadContext>();
completers.add(completer);
read.readLock((ctx) async {
completer.complete(ctx);
final releaser = Completer();
releasers.add(releaser);
// Keep this active, close the context when finished
await releaser.future;
});
}
final writeCompleter = Completer<SqliteReadContext>();
completers.add(writeCompleter);
_writeConnection?.writeLock((ctx) async {
writeCompleter.complete(ctx);
final releaser = Completer();
releasers.add(releaser);
await releaser.future;
});
// Get all the connection contexts and execute the callback on each of them
final List<SqliteReadContext> contexts = [];
for (final completer in completers) {
contexts.add(await completer.future);
}
for (final c in contexts) {
await callback(c);
}
// Release all the releasers
for (final r in releasers) {
r.complete();
}
}
/// Returns true if the _write_ connection is currently in autocommit mode.
@override
Future<bool> getAutoCommit() async {
if (_writeConnection == null) {
throw ClosedException();
}
return await _writeConnection!.getAutoCommit();
}
void _nextRead() {
if (_queue.isEmpty) {
// Wait for queue item
return;
} else if (closed) {
while (_queue.isNotEmpty) {
final nextItem = _queue.removeFirst();
nextItem.completer.completeError(const ClosedException());
}
return;
}
while (_availableReadConnections.isNotEmpty &&
_availableReadConnections.last.closed) {
// Remove connections that may have errored
final connection = _availableReadConnections.removeLast();
_allReadConnections.remove(connection);
}
if (_availableReadConnections.isEmpty &&
_allReadConnections.length == maxReaders) {
// Wait for available connection
return;
}
var nextItem = _queue.removeFirst();
while (nextItem.completer.isCompleted) {
// This item already timed out - try the next one if available
if (_queue.isEmpty) {
return;
}
nextItem = _queue.removeFirst();
}
nextItem.lockTimer?.cancel();
nextItem.completer.complete(Future.sync(() async {
final nextConnection = _availableReadConnections.isEmpty
? await _expandPool()
: _availableReadConnections.removeLast();
try {
// At this point the connection is expected to be available immediately.
// No need to calculate a new lockTimeout here.
final result = await nextConnection.readLock(nextItem.callback);
return result;
} finally {
_availableReadConnections.add(nextConnection);
Timer.run(_nextRead);
}
}));
}
@override
Future<T> readLock<T>(ReadCallback<T> callback,
{Duration? lockTimeout, String? debugContext}) async {
if (closed) {
throw ClosedException();
}
final zone = _getZone(debugContext: debugContext ?? 'get*()');
final item = _PendingItem((ctx) {
return zone.runUnary(callback, ctx);
}, lockTimeout: lockTimeout);
_queue.add(item);
_nextRead();
return (await item.future) as T;
}
@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
if (closed) {
throw ClosedException();
}
if (_writeConnection?.closed == true) {
_writeConnection = null;
}
if (_writeConnection == null) {
_writeConnection = (await _factory.openConnection(SqliteOpenOptions(
primaryConnection: true,
debugName: debugName != null ? '$debugName-writer' : null,
mutex: mutex,
readOnly: false))) as SqliteConnectionImpl;
// Expose the new updates on the connection pool
_writeConnection!.updates?.forEach(updatesController.add);
}
return _runZoned(() {
return _writeConnection!.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
}, debugContext: debugContext ?? 'execute()');
}
/// The [Mutex] on individual connections do already error in recursive locks.
///
/// We duplicate the same check here, to:
/// 1. Also error when the recursive transaction is handled by a different
/// connection (with a different lock).
/// 2. Give a more specific error message when it happens.
T _runZoned<T>(T Function() callback, {required String debugContext}) {
return _getZone(debugContext: debugContext).run(callback);
}
Zone _getZone({required String debugContext}) {
if (Zone.current[this] != null) {
throw LockError(
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
}
return Zone.current.fork(zoneValues: {this: true});
}
Future<SqliteConnectionImpl> _expandPool() async {
var name = debugName == null
? null
: '$debugName-${_allReadConnections.length + 1}';
var connection = SqliteConnectionImpl(
upstreamPort: upstreamPort,
primary: false,
updates: updates,
debugName: name,
mutex: mutex,
readOnly: true,
openFactory: _factory);
_allReadConnections.add(connection);
// Edge case:
// If we don't await here, there is a chance that a different connection
// is used for the transaction, and that it finishes and deletes the database
// while this one is still opening. This is specifically triggered in tests.
// To avoid that, we wait for the connection to be ready.
await connection.ready;
return connection;
}
SerializedPortClient? get upstreamPort {
return _writeConnection?.upstreamPort;
}
@override
Future<void> close() async {
closed = true;
// It is possible that `readLock()` removes connections from the pool while we're
// closing connections, but not possible for new connections to be added.
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
final toClose = _allReadConnections.toList();
for (var connection in toClose) {
// Wait for connection initialization, so that any existing readLock()
// requests go through before closing.
await connection.ready;
await connection.close();
}
// Closing the write connection cleans up the journal files (-shm and -wal files).
// It can only do that if there are no other open connections, so we close the
// read-only connections first.
await _writeConnection?.close();
}
}
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
class _PendingItem {
ReadCallback<dynamic> callback;
Completer<dynamic> completer = Completer.sync();
late Future<dynamic> future = completer.future;
DateTime? deadline;
final Duration? lockTimeout;
late final Timer? lockTimer;
_PendingItem(this.callback, {this.lockTimeout}) {
if (lockTimeout != null) {
deadline = DateTime.now().add(lockTimeout!);
lockTimer = Timer(lockTimeout!, () {
// Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available.
// This matches the behavior we need for a timeout on the lock, but not the entire operation.
if (!completer.isCompleted) {
// completer.completeError(
// TimeoutException('Failed to get a read connection', lockTimeout));
completer.complete(Future.sync(() async {
throw TimeoutException(
'Failed to get a read connection', lockTimeout);
}));
}
});
} else {
lockTimer = null;
}
}
}