Skip to content

Commit 68972d9

Browse files
chore: migrate locks to async-mutex
1 parent 1887a3d commit 68972d9

File tree

15 files changed

+212
-145
lines changed

15 files changed

+212
-145
lines changed

.changeset/beige-pigs-occur.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/op-sqlite': minor
3+
'@powersync/capacitor': minor
4+
---
5+
6+
Removed `async-lock` dependency in favor of `async-mutex`.

.changeset/cool-crews-hug.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/op-sqlite': patch
3+
'@powersync/capacitor': patch
4+
---
5+
6+
Fixed potential issue where extreme amounts of concurrent calls to `writeLock` could reject with the error "Too many pending tasks in queue"

.changeset/little-dancers-beam.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Exported common Mutex helper functions

demos/example-capacitor/ios/App/Podfile.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ PODS:
99
- CapacitorSplashScreen (7.0.0):
1010
- Capacitor
1111
- powersync-sqlite-core (0.4.10)
12-
- PowersyncCapacitor (0.2.0):
12+
- PowersyncCapacitor (0.3.0):
1313
- Capacitor
1414
- powersync-sqlite-core (~> 0.4.10)
1515
- SQLCipher (~> 4.0)
@@ -51,7 +51,7 @@ SPEC CHECKSUMS:
5151
CapacitorCordova: bf648a636f3c153f652d312ae145fb508b6ffced
5252
CapacitorSplashScreen: f4e58cc02aafd91c7cbaf32a3d1b44d02a115125
5353
powersync-sqlite-core: b30017e077c91915d53faebc5f7245384df78275
54-
PowersyncCapacitor: 4fe9ac76191de407fd5cc9b74b9792d41e99bf24
54+
PowersyncCapacitor: a966fb4c3544e20bfdcc7ec7aad9d364b753e884
5555
SQLCipher: eb79c64049cb002b4e9fcb30edb7979bf4706dfc
5656
ZIPFoundation: dfd3d681c4053ff7e2f7350bc4e53b5dba3f5351
5757

packages/capacitor/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
"@ionic/eslint-config": "^0.4.0",
7676
"@ionic/prettier-config": "^4.0.0",
7777
"@ionic/swiftlint-config": "^2.0.0",
78-
"@types/async-lock": "catalog:",
7978
"eslint": "^8.57.0",
8079
"prettier": "catalog:",
8180
"prettier-plugin-java": "^2.6.6",
@@ -100,6 +99,6 @@
10099
}
101100
},
102101
"dependencies": {
103-
"async-lock": "catalog:"
102+
"async-mutex": "catalog:"
104103
}
105104
}

packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import {
88
DBAdapterListener,
99
DBLockOptions,
1010
LockContext,
11+
mutexRunExclusive,
1112
QueryResult,
1213
Transaction
1314
} from '@powersync/web';
14-
import Lock from 'async-lock';
15+
import { Mutex } from 'async-mutex';
1516
import { PowerSyncCore } from '../plugin/PowerSyncCore.js';
1617
import { messageForErrorCode } from '../plugin/PowerSyncPlugin.js';
1718
import { CapacitorSQLiteOpenFactoryOptions, DEFAULT_SQLITE_OPTIONS } from './CapacitorSQLiteOpenFactory.js';
@@ -39,13 +40,15 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
3940
protected _writeConnection: SQLiteDBConnection | null;
4041
protected _readConnection: SQLiteDBConnection | null;
4142
protected initializedPromise: Promise<void>;
42-
protected lock: Lock;
43+
protected writeMutex: Mutex;
44+
protected readMutex: Mutex;
4345

4446
constructor(protected options: CapacitorSQLiteOpenFactoryOptions) {
4547
super();
4648
this._writeConnection = null;
4749
this._readConnection = null;
48-
this.lock = new Lock();
50+
this.writeMutex = new Mutex();
51+
this.readMutex = new Mutex();
4952
this.initializedPromise = this.init();
5053
}
5154

@@ -237,10 +240,14 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
237240
}
238241

239242
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
240-
return this.lock.acquire('read_lock', async () => {
241-
await this.initializedPromise;
242-
return await fn(this.generateLockContext(this.readConnection));
243-
});
243+
return mutexRunExclusive(
244+
this.readMutex,
245+
async () => {
246+
await this.initializedPromise;
247+
return await fn(this.generateLockContext(this.readConnection));
248+
},
249+
options
250+
);
244251
}
245252

246253
readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
@@ -250,24 +257,28 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
250257
}
251258

252259
writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
253-
return this.lock.acquire('write_lock', async () => {
254-
await this.initializedPromise;
255-
const result = await fn(this.generateLockContext(this.writeConnection));
256-
257-
// Fetch table updates
258-
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
259-
const jsonUpdates = updates.values?.[0];
260-
if (!jsonUpdates || !jsonUpdates.table_name) {
261-
throw new Error('Could not fetch table updates');
262-
}
263-
const notification: BatchedUpdateNotification = {
264-
rawUpdates: [],
265-
tables: JSON.parse(jsonUpdates.table_name),
266-
groupedUpdates: {}
267-
};
268-
this.iterateListeners((l) => l.tablesUpdated?.(notification));
269-
return result;
270-
});
260+
return mutexRunExclusive(
261+
this.writeMutex,
262+
async () => {
263+
await this.initializedPromise;
264+
const result = await fn(this.generateLockContext(this.writeConnection));
265+
266+
// Fetch table updates
267+
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
268+
const jsonUpdates = updates.values?.[0];
269+
if (!jsonUpdates || !jsonUpdates.table_name) {
270+
throw new Error('Could not fetch table updates');
271+
}
272+
const notification: BatchedUpdateNotification = {
273+
rawUpdates: [],
274+
tables: JSON.parse(jsonUpdates.table_name),
275+
groupedUpdates: {}
276+
};
277+
this.iterateListeners((l) => l.tablesUpdated?.(notification));
278+
return result;
279+
},
280+
options
281+
);
271282
}
272283

273284
writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {

packages/capacitor/src/sync/CapacitorSyncImplementation.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,60 @@
1-
import { AbstractStreamingSyncImplementation, LockOptions } from '@powersync/web';
2-
import Lock from 'async-lock';
1+
import { AbstractStreamingSyncImplementation, LockOptions, LockType, mutexRunExclusive } from '@powersync/web';
2+
import { Mutex } from 'async-mutex';
3+
4+
type MutexMap = {
5+
/**
6+
* Used to track the consumers of this Mutex.
7+
* It should be safe to dispose the Mutex if this is empty.
8+
*/
9+
tracking: Set<number>;
10+
locks: {
11+
[Type in LockType]: Mutex;
12+
};
13+
};
14+
15+
const GLOBAL_MUTEX_STORE: Map<string, MutexMap> = new Map();
16+
17+
/**
18+
* Used to identify multiple instances of CapacitorStreamingSyncImplementation
19+
*/
20+
let _CAPACITOR_STREAMING_SYNC_SEQUENCE = 0;
321

422
export class CapacitorStreamingSyncImplementation extends AbstractStreamingSyncImplementation {
5-
static GLOBAL_LOCK = new Lock();
23+
// A unique ID for tacking this specific instance of CapacitorStreamingSyncImplementation
24+
protected instanceId = _CAPACITOR_STREAMING_SYNC_SEQUENCE++;
25+
26+
async dispose(): Promise<void> {
27+
await super.dispose();
28+
29+
// Clear up any global mutexes which aren't used anymore
30+
for (const mutexEntry of GLOBAL_MUTEX_STORE.entries()) {
31+
const [identifier, mutex] = mutexEntry;
32+
if (!mutex.tracking.has(this.instanceId)) {
33+
continue;
34+
}
35+
mutex.tracking.delete(this.instanceId);
36+
if (mutex.tracking.size == 0) {
37+
GLOBAL_MUTEX_STORE.delete(identifier);
38+
}
39+
}
40+
}
641

742
async obtainLock<T>(lockOptions: LockOptions<T>): Promise<T> {
8-
const identifier = `streaming-sync-${lockOptions.type}-${this.options.identifier}`;
9-
return CapacitorStreamingSyncImplementation.GLOBAL_LOCK.acquire(identifier, async () => {
43+
// If we don't have an identifier for some reason (should not happen), we use a shared Mutex
44+
const { identifier: baseIdentifier = 'DEFAULT' } = this.options;
45+
if (!GLOBAL_MUTEX_STORE.has(baseIdentifier)) {
46+
GLOBAL_MUTEX_STORE.set(baseIdentifier, {
47+
tracking: new Set([this.instanceId]),
48+
locks: {
49+
[LockType.CRUD]: new Mutex(),
50+
[LockType.SYNC]: new Mutex()
51+
}
52+
});
53+
}
54+
55+
const mutex = GLOBAL_MUTEX_STORE.get(baseIdentifier)!.locks[lockOptions.type];
56+
57+
return mutexRunExclusive(mutex, async () => {
1058
if (lockOptions.signal?.aborted) {
1159
throw new Error('Aborted');
1260
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export * from './utils/BaseObserver.js';
6161
export * from './utils/ControlledExecutor.js';
6262
export * from './utils/DataStream.js';
6363
export * from './utils/Logger.js';
64+
export * from './utils/mutex.js';
6465
export * from './utils/parseQuery.js';
6566

6667
export * from './types/types.js';

packages/common/src/utils/mutex.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Mutex } from 'async-mutex';
66
export async function mutexRunExclusive<T>(
77
mutex: Mutex,
88
callback: () => Promise<T>,
9-
options?: { timeoutMs: number }
9+
options?: { timeoutMs?: number }
1010
): Promise<T> {
1111
return new Promise((resolve, reject) => {
1212
const timeout = options?.timeoutMs;

packages/powersync-op-sqlite/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,11 @@
7272
},
7373
"dependencies": {
7474
"@powersync/common": "workspace:*",
75-
"async-lock": "catalog:"
75+
"async-mutex": "catalog:"
7676
},
7777
"devDependencies": {
7878
"@op-engineering/op-sqlite": "catalog:",
7979
"@react-native/eslint-config": "^0.83.1",
80-
"@types/async-lock": "catalog:",
8180
"@types/react": "^19.1.1",
8281
"del-cli": "^7.0.0",
8382
"eslint": "catalog:",

0 commit comments

Comments
 (0)