Skip to content

Commit

Permalink
Improve async lock (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-oloughlin authored Feb 16, 2025
1 parent bf0e6b8 commit 2ddf597
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 18 deletions.
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "3.1.3",
"version": "3.1.4",
"exports": {
".": "./mod.ts",
"./zod": "./src/ext/zod/mod.ts",
Expand Down
61 changes: 46 additions & 15 deletions src/ext/kv/async_lock.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,64 @@
import type { TaskResult } from "./types.ts";

/**
* Lock that handles sequentially running asynchrounous tasks using a queue based startegy.
*/
export class AsyncLock {
private queue: PromiseWithResolvers<void>[];
private queue: PromiseWithResolvers<boolean>[];

constructor() {
this.queue = [];
}

async run<T>(fn: () => Promise<T>): Promise<T> {
await this.lock();
const result = await fn();
this.release();
return result;
/**
* Run a task asynchronously using the lock.
* Places the task in the task queue and runs it as soon as any prior tasks have completed.
*
* @param fn - Task callback function.
* @returns A promise resolving to the awaited return of the given callback function.
*/
async run<const T>(fn: () => T): Promise<TaskResult<Awaited<T>>> {
try {
const acquiredLock = await this.acquireLock();
if (!acquiredLock) {
return {
status: "cancelled",
};
}

const value = await fn();
this.releaseLock();
return {
status: "fulfilled",
value: value,
};
} catch (err) {
this.releaseLock();
return {
status: "rejected",
error: err,
};
}
}

async close(): Promise<void> {
for (const lock of this.queue) {
lock.resolve();
await lock.promise;
/** Cancel and remove any queued tasks. */
cancel(): void {
let lock = this.queue.shift();
while (lock) {
lock.resolve(false);
lock = this.queue.shift();
}
}

private async lock(): Promise<void> {
private async acquireLock(): Promise<boolean> {
const prev = this.queue.at(-1);
const next = Promise.withResolvers<void>();
const next = Promise.withResolvers<boolean>();
this.queue.push(next);
await prev?.promise;
return await prev?.promise ?? true;
}

private release(): void {
private releaseLock(): void {
const lock = this.queue.shift();
lock?.resolve();
lock?.resolve(true);
}
}
4 changes: 3 additions & 1 deletion src/ext/kv/atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

async commit(): Promise<DenoKvCommitError | DenoKvCommitResult> {
return await this.lock.run(async () => {
const taskResult = await this.lock.run(async () => {
const checks = await Promise.allSettled(
this.checks.map((check) => check()),
);
Expand All @@ -147,5 +147,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
versionstamp,
};
});

return taskResult.status === "fulfilled" ? taskResult.value : { ok: false };
}
}
2 changes: 1 addition & 1 deletion src/ext/kv/map_kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class MapKv implements DenoKv {
async close(): Promise<void> {
this.watchers.forEach((w) => w.cancel());
this.listener?.resolve();
await this.atomicLock.close();
this.atomicLock.cancel();
if (this.clearOnClose) await this.map.clear();
}

Expand Down
20 changes: 20 additions & 0 deletions src/ext/kv/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,23 @@ export type MapKvOptions = {
*/
clearOnClose?: boolean;
};

/**
* Result object of a queued task.
*
* `status` - indicates the state of the task result.
* Is "fulfilled" if completed successfully, "rejected" if an error was thrown during the task running, or "cancelled" if task was cancelled before running.
*
* `value` - Awaited return value of successful task.
*
* `error` - Captured error of rejected task.
*/
export type TaskResult<T> = {
status: "fulfilled";
value: T;
} | {
status: "rejected";
error: unknown;
} | {
status: "cancelled";
};

0 comments on commit 2ddf597

Please sign in to comment.