diff --git a/deno.json b/deno.json index 48a809c..06f46b8 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@olli/kvdex", - "version": "3.1.3", + "version": "3.1.4", "exports": { ".": "./mod.ts", "./zod": "./src/ext/zod/mod.ts", diff --git a/src/ext/kv/async_lock.ts b/src/ext/kv/async_lock.ts index d905b22..9017073 100644 --- a/src/ext/kv/async_lock.ts +++ b/src/ext/kv/async_lock.ts @@ -1,33 +1,64 @@ +import type { TaskResult } from "./types.ts"; + +/** + * Lock that handles sequentially running asynchrounous tasks using a queue based startegy. + */ export class AsyncLock { - private queue: PromiseWithResolvers[]; + private queue: PromiseWithResolvers[]; constructor() { this.queue = []; } - async run(fn: () => Promise): Promise { - await this.lock(); - const result = await fn(); - this.release(); - return result; + /** + * Run a task asynchronously using the lock. + * Places the task in the task queue and runs it as soon as any prior tasks have completed. + * + * @param fn - Task callback function. + * @returns A promise resolving to the awaited return of the given callback function. + */ + async run(fn: () => T): Promise>> { + try { + const acquiredLock = await this.acquireLock(); + if (!acquiredLock) { + return { + status: "cancelled", + }; + } + + const value = await fn(); + this.releaseLock(); + return { + status: "fulfilled", + value: value, + }; + } catch (err) { + this.releaseLock(); + return { + status: "rejected", + error: err, + }; + } } - async close(): Promise { - for (const lock of this.queue) { - lock.resolve(); - await lock.promise; + /** Cancel and remove any queued tasks. */ + cancel(): void { + let lock = this.queue.shift(); + while (lock) { + lock.resolve(false); + lock = this.queue.shift(); } } - private async lock(): Promise { + private async acquireLock(): Promise { const prev = this.queue.at(-1); - const next = Promise.withResolvers(); + const next = Promise.withResolvers(); this.queue.push(next); - await prev?.promise; + return await prev?.promise ?? true; } - private release(): void { + private releaseLock(): void { const lock = this.queue.shift(); - lock?.resolve(); + lock?.resolve(true); } } diff --git a/src/ext/kv/atomic.ts b/src/ext/kv/atomic.ts index 492c979..87b5380 100644 --- a/src/ext/kv/atomic.ts +++ b/src/ext/kv/atomic.ts @@ -124,7 +124,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation { } async commit(): Promise { - return await this.lock.run(async () => { + const taskResult = await this.lock.run(async () => { const checks = await Promise.allSettled( this.checks.map((check) => check()), ); @@ -147,5 +147,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation { versionstamp, }; }); + + return taskResult.status === "fulfilled" ? taskResult.value : { ok: false }; } } diff --git a/src/ext/kv/map_kv.ts b/src/ext/kv/map_kv.ts index 6d56ee5..a8eb7ba 100644 --- a/src/ext/kv/map_kv.ts +++ b/src/ext/kv/map_kv.ts @@ -78,7 +78,7 @@ export class MapKv implements DenoKv { async close(): Promise { this.watchers.forEach((w) => w.cancel()); this.listener?.resolve(); - await this.atomicLock.close(); + this.atomicLock.cancel(); if (this.clearOnClose) await this.map.clear(); } diff --git a/src/ext/kv/types.ts b/src/ext/kv/types.ts index f76fc84..bfdf457 100644 --- a/src/ext/kv/types.ts +++ b/src/ext/kv/types.ts @@ -57,3 +57,23 @@ export type MapKvOptions = { */ clearOnClose?: boolean; }; + +/** + * Result object of a queued task. + * + * `status` - indicates the state of the task result. + * Is "fulfilled" if completed successfully, "rejected" if an error was thrown during the task running, or "cancelled" if task was cancelled before running. + * + * `value` - Awaited return value of successful task. + * + * `error` - Captured error of rejected task. + */ +export type TaskResult = { + status: "fulfilled"; + value: T; +} | { + status: "rejected"; + error: unknown; +} | { + status: "cancelled"; +};