-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathsqlite_queries.dart
145 lines (132 loc) · 4.87 KB
/
sqlite_queries.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import 'package:sqlite3/common.dart' as sqlite;
import 'utils/shared_utils.dart';
import 'sqlite_connection.dart';
import 'update_notification.dart';
/// Mixin to provide default query functionality.
///
/// Classes using this need to implement [SqliteConnection.readLock]
/// and [SqliteConnection.writeLock].
mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
@override
Future<sqlite.ResultSet> execute(String sql,
[List<Object?> parameters = const []]) async {
return writeLock((ctx) async {
return ctx.execute(sql, parameters);
}, debugContext: 'execute()');
}
@override
Future<sqlite.ResultSet> getAll(String sql,
[List<Object?> parameters = const []]) {
return readLock((ctx) async {
return ctx.getAll(sql, parameters);
}, debugContext: 'getAll()');
}
@override
Future<sqlite.Row> get(String sql, [List<Object?> parameters = const []]) {
return readLock((ctx) async {
return ctx.get(sql, parameters);
}, debugContext: 'get()');
}
@override
Future<sqlite.Row?> getOptional(String sql,
[List<Object?> parameters = const []]) {
return readLock((ctx) async {
return ctx.getOptional(sql, parameters);
}, debugContext: 'getOptional()');
}
@override
Stream<sqlite.ResultSet> watch(String sql,
{List<Object?> parameters = const [],
Duration throttle = const Duration(milliseconds: 30),
Iterable<String>? triggerOnTables}) async* {
assert(updates != null,
'updates stream must be provided to allow query watching');
final tables =
triggerOnTables ?? await getSourceTables(this, sql, parameters);
final filteredStream =
updates!.transform(UpdateNotification.filterTablesTransformer(tables));
final throttledStream = UpdateNotification.throttleStream(
filteredStream, throttle,
addOne: UpdateNotification.empty());
// FIXME:
// When the subscription is cancelled, this performs a final query on the next
// update.
// The loop only stops once the "yield" is reached.
// Using asyncMap instead of a generator would solve it, but then the body
// here can't be async for getSourceTables().
await for (var _ in throttledStream) {
yield await getAll(sql, parameters);
}
}
/// Create a Stream of changes to any of the specified tables.
///
/// Example to get the same effect as [watch]:
///
/// ```dart
/// var subscription = db.onChange({'mytable'}).asyncMap((event) async {
/// var data = await db.getAll('SELECT * FROM mytable');
/// return data;
/// }).listen((data) {
/// // Do something with the data here
/// });
/// ```
///
/// This is preferred over [watch] when multiple queries need to be performed
/// together when data is changed.
Stream<UpdateNotification> onChange(Iterable<String>? tables,
{Duration throttle = const Duration(milliseconds: 30),
bool triggerImmediately = true}) {
assert(updates != null,
'updates stream must be provided to allow query watching');
final filteredStream = tables != null
? updates!.transform(UpdateNotification.filterTablesTransformer(tables))
: updates!;
final throttledStream = UpdateNotification.throttleStream(
filteredStream, throttle,
addOne: triggerImmediately ? UpdateNotification.empty() : null);
return throttledStream;
}
@override
Future<T> readTransaction<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout}) async {
return readLock((ctx) async {
return await internalReadTransaction(ctx, callback);
}, lockTimeout: lockTimeout, debugContext: 'readTransaction()');
}
@override
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout}) async {
return writeLock((ctx) async {
return await internalWriteTransaction(ctx, callback);
}, lockTimeout: lockTimeout, debugContext: 'writeTransaction()');
}
/// See [SqliteReadContext.computeWithDatabase].
///
/// When called here directly on the connection, the call is wrapped in a
/// write transaction.
@override
Future<T> computeWithDatabase<T>(
Future<T> Function(sqlite.CommonDatabase db) compute) {
return writeTransaction((tx) async {
return tx.computeWithDatabase(compute);
});
}
/// Execute a write query (INSERT, UPDATE, DELETE) multiple times with each
/// parameter set. This is more faster than executing separately with each
/// parameter set.
///
/// When called here directly on the connection, the batch is wrapped in a
/// write transaction.
@override
Future<void> executeBatch(String sql, List<List<Object?>> parameterSets) {
return writeTransaction((tx) async {
return tx.executeBatch(sql, parameterSets);
});
}
@override
Future<void> refreshSchema() {
return get("PRAGMA table_info('sqlite_master')");
}
}