-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathconnection_pool.dart
246 lines (217 loc) · 7.98 KB
/
connection_pool.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import 'dart:async';
import 'dart:collection';
import 'mutex.dart';
import 'port_channel.dart';
import 'sqlite_connection.dart';
import 'sqlite_connection_impl.dart';
import 'sqlite_open_factory.dart';
import 'sqlite_queries.dart';
import 'update_notification.dart';
/// A connection pool with a single write connection and multiple read connections.
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
SqliteConnection? _writeConnection;
final Set<SqliteConnectionImpl> _allReadConnections = {};
final Queue<SqliteConnectionImpl> _availableReadConnections = Queue();
final Queue<_PendingItem> _queue = Queue();
final SqliteOpenFactory _factory;
final SerializedPortClient _upstreamPort;
@override
final Stream<UpdateNotification>? updates;
final int maxReaders;
final String? debugName;
final Mutex mutex;
@override
bool closed = false;
/// Open a new connection pool.
///
/// The provided factory is used to open connections on demand. Connections
/// are only opened when requested for the first time.
///
/// [maxReaders] specifies the maximum number of read connections.
/// A maximum of one write connection will be opened.
///
/// Read connections are opened in read-only mode, and will reject any statements
/// that modify the database.
SqliteConnectionPool(this._factory,
{this.updates,
this.maxReaders = 5,
SqliteConnection? writeConnection,
this.debugName,
required this.mutex,
required SerializedPortClient upstreamPort})
: _writeConnection = writeConnection,
_upstreamPort = upstreamPort;
/// Returns true if the _write_ connection is currently in autocommit mode.
@override
Future<bool> getAutoCommit() async {
if (_writeConnection == null) {
throw ClosedException();
}
return await _writeConnection!.getAutoCommit();
}
void _nextRead() {
if (_queue.isEmpty) {
// Wait for queue item
return;
} else if (closed) {
while (_queue.isNotEmpty) {
final nextItem = _queue.removeFirst();
nextItem.completer.completeError(const ClosedException());
}
return;
}
while (_availableReadConnections.isNotEmpty &&
_availableReadConnections.last.closed) {
// Remove connections that may have errored
final connection = _availableReadConnections.removeLast();
_allReadConnections.remove(connection);
}
if (_availableReadConnections.isEmpty &&
_allReadConnections.length == maxReaders) {
// Wait for available connection
return;
}
var nextItem = _queue.removeFirst();
while (nextItem.completer.isCompleted) {
// This item already timed out - try the next one if available
if (_queue.isEmpty) {
return;
}
nextItem = _queue.removeFirst();
}
nextItem.lockTimer?.cancel();
nextItem.completer.complete(Future.sync(() async {
final nextConnection = _availableReadConnections.isEmpty
? await _expandPool()
: _availableReadConnections.removeLast();
try {
// At this point the connection is expected to be available immediately.
// No need to calculate a new lockTimeout here.
final result = await nextConnection.readLock(nextItem.callback);
return result;
} finally {
_availableReadConnections.add(nextConnection);
Timer.run(_nextRead);
}
}));
}
@override
Future<T> readLock<T>(ReadCallback<T> callback,
{Duration? lockTimeout, String? debugContext}) async {
if (closed) {
throw ClosedException();
}
final zone = _getZone(debugContext: debugContext ?? 'get*()');
final item = _PendingItem((ctx) {
return zone.runUnary(callback, ctx);
}, lockTimeout: lockTimeout);
_queue.add(item);
_nextRead();
return (await item.future) as T;
}
@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
if (closed) {
throw ClosedException();
}
if (_writeConnection?.closed == true) {
_writeConnection = null;
}
_writeConnection ??= SqliteConnectionImpl(
upstreamPort: _upstreamPort,
primary: false,
updates: updates,
debugName: debugName != null ? '$debugName-writer' : null,
mutex: mutex,
readOnly: false,
openFactory: _factory);
return _runZoned(() {
return _writeConnection!.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
}, debugContext: debugContext ?? 'execute()');
}
/// The [Mutex] on individual connections do already error in recursive locks.
///
/// We duplicate the same check here, to:
/// 1. Also error when the recursive transaction is handled by a different
/// connection (with a different lock).
/// 2. Give a more specific error message when it happens.
T _runZoned<T>(T Function() callback, {required String debugContext}) {
return _getZone(debugContext: debugContext).run(callback);
}
Zone _getZone({required String debugContext}) {
if (Zone.current[this] != null) {
throw LockError(
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
}
return Zone.current.fork(zoneValues: {this: true});
}
Future<SqliteConnectionImpl> _expandPool() async {
var name = debugName == null
? null
: '$debugName-${_allReadConnections.length + 1}';
var connection = SqliteConnectionImpl(
upstreamPort: _upstreamPort,
primary: false,
updates: updates,
debugName: name,
mutex: mutex,
readOnly: true,
openFactory: _factory);
_allReadConnections.add(connection);
// Edge case:
// If we don't await here, there is a chance that a different connection
// is used for the transaction, and that it finishes and deletes the database
// while this one is still opening. This is specifically triggered in tests.
// To avoid that, we wait for the connection to be ready.
await connection.ready;
return connection;
}
@override
Future<void> close() async {
closed = true;
// It is possible that `readLock()` removes connections from the pool while we're
// closing connections, but not possible for new connections to be added.
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
final toClose = _allReadConnections.toList();
for (var connection in toClose) {
// Wait for connection initialization, so that any existing readLock()
// requests go through before closing.
await connection.ready;
await connection.close();
}
// Closing the write connection cleans up the journal files (-shm and -wal files).
// It can only do that if there are no other open connections, so we close the
// read-only connections first.
await _writeConnection?.close();
}
}
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
class _PendingItem {
ReadCallback<dynamic> callback;
Completer<dynamic> completer = Completer.sync();
late Future<dynamic> future = completer.future;
DateTime? deadline;
final Duration? lockTimeout;
late final Timer? lockTimer;
_PendingItem(this.callback, {this.lockTimeout}) {
if (lockTimeout != null) {
deadline = DateTime.now().add(lockTimeout!);
lockTimer = Timer(lockTimeout!, () {
// Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available.
// This matches the behavior we need for a timeout on the lock, but not the entire operation.
if (!completer.isCompleted) {
// completer.completeError(
// TimeoutException('Failed to get a read connection', lockTimeout));
completer.complete(Future.sync(() async {
throw TimeoutException(
'Failed to get a read connection', lockTimeout);
}));
}
});
} else {
lockTimer = null;
}
}
}