From 242e3489fe6a55ec519d0f2b45e6c37f876c2f9b Mon Sep 17 00:00:00 2001 From: oliver-oloughlin Date: Sun, 26 Nov 2023 17:15:20 +0100 Subject: [PATCH 1/2] feat: protect against re-initializing same loop/interval --- README.md | 2 +- src/constants.ts | 4 ++ src/kvdex.ts | 91 +++++++++++++++++++++++++----------- tests/db/loop.test.ts | 40 ++++++++++++++-- tests/db/setInterval.test.ts | 41 ++++++++++++++-- 5 files changed, 142 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index e02ce9b..ca7bef2 100644 --- a/README.md +++ b/README.md @@ -820,7 +820,7 @@ previous task finishes. // Prints "Hello World!" 10 times, with 1 second delay db.loop(() => console.log("Hello World!"), { delay: 1_000, - exitOn: ({ count }) => count >= 9, + exitOn: ({ count }) => count === 10, }) ``` diff --git a/src/constants.ts b/src/constants.ts index e5abf06..3cdf7c1 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -13,6 +13,10 @@ export const SEGMENT_KEY_PREFIX = "__segment__" export const UNDELIVERED_KEY_PREFIX = "__undelivered__" +export const INTERVAL_KEY_PREFIX = "__interval__" + +export const LOOP_KEY_PREFIX = "__loop__" + // Fixed limits export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000 diff --git a/src/kvdex.ts b/src/kvdex.ts index f34764b..c9104e4 100644 --- a/src/kvdex.ts +++ b/src/kvdex.ts @@ -33,6 +33,7 @@ import { DEFAULT_INTERVAL_RETRY, DEFAULT_LOOP_RETRY, KVDEX_KEY_PREFIX, + LOOP_KEY_PREFIX, UNDELIVERED_KEY_PREFIX, } from "./constants.ts" import { model } from "./model.ts" @@ -337,10 +338,10 @@ export class KvDex> { * @example * ```ts * // Will repeat indeefinitely with 1 hour interval - * db.setInterval(() => console.log("Hello World!")) + * db.setInterval("greeting", () => console.log("Hello World!")) * * // First callback starts after a 10 second delay, after that there is a 5 second delay between callbacks - * db.setInterval(() => console.log("I terminate after the 10th run"), { + * db.setInterval("terminator", () => console.log("I terminate after the 10th run"), { * // 10 second delay before the first job callback invoked * startDelay: 10_000, * @@ -355,16 +356,32 @@ export class KvDex> { * }) * ``` * + * @param name - Interval identifier. * @param fn - Callback function. * @param options - Set interval options. - * @returns Listener promise. + * @returns A listener promise. */ async setInterval( + name: string, fn: (msg: IntervalMessage) => unknown, options?: SetIntervalOptions, ) { - // Create interval handler id - const id = crypto.randomUUID() + // Check if loop has already been initialized + const nameKey = extendKey([KVDEX_KEY_PREFIX, LOOP_KEY_PREFIX], name) + const nameEntry = await this.kv.get(nameKey) + + if (nameEntry.value) { + return await this.idempotentListener() + } + + // Persist interval key + const cr = await this.kv.set(nameKey, true) + if (!cr.ok) { + throw Error(`Failed to initialize interval "${name}"`) + } + + // Set id + const nameId = `interval-${name}` // Create interval enqueuer const enqueue = async ( @@ -374,20 +391,20 @@ export class KvDex> { // Try enqueuing until delivered on number of retries is exhausted for (let i = 0; i <= (options?.retry ?? DEFAULT_INTERVAL_RETRY); i++) { await this.enqueue(msg, { - idsIfUndelivered: [id], + idsIfUndelivered: [nameId], delay, - topic: id, + topic: nameId, }) // Check if message was delivered, break for-loop if successful - const doc = await this.findUndelivered(id) + const doc = await this.findUndelivered(nameId) if (doc === null) { break } // Delete undelivered entry before retrying - await this.deleteUndelivered(id) + await this.deleteUndelivered(nameId) } } @@ -399,12 +416,13 @@ export class KvDex> { exit = await options?.exitOn?.(msg) ?? false } catch (e) { console.error( - `An error was caught while running exitOn task for interval {ID = ${id}}`, + `An error was caught while running exitOn task for interval "${name}"`, e, ) } if (exit) { + await this.kv.delete(nameKey) await options?.onExit?.(msg) return } @@ -419,7 +437,7 @@ export class KvDex> { } } catch (e) { console.error( - `An error was caught while setting the next callback delay for interval {ID = ${id}}`, + `An error was caught while setting the next callback delay for interval "${name}`, e, ) } @@ -436,7 +454,7 @@ export class KvDex> { // Invoke callback function fn(msg), ]) - }, { topic: id }) + }, { topic: nameId }) // Enqueue first task await enqueue({ @@ -447,7 +465,7 @@ export class KvDex> { }, options?.startDelay) // Return listener - return listener + return await listener } /** @@ -456,22 +474,38 @@ export class KvDex> { * @example * ```ts * // Prints "Hello World!" 10 times, with 1 second delay - * db.loop(() => console.log("Hello World!"), { + * db.loop("greeting", () => console.log("Hello World!"), { * delay: 1_000, - * exitOn: ({ count }) => count <= 9, + * exitOn: ({ count }) => count === 10, * }) * ``` * - * @param fn - * @param options - * @returns + * @param name - Loop identifier. + * @param fn - Callback function. + * @param options - Loop options. + * @returns - A listener promise. */ async loop( + name: string, fn: (msg: LoopMessage>) => T1 | Promise, options?: LoopOptions>, ) { - // Create loop handler id - const id = crypto.randomUUID() + // Check if loop has already been initialized + const nameKey = extendKey([KVDEX_KEY_PREFIX, LOOP_KEY_PREFIX], name) + const nameEntry = await this.kv.get(nameKey) + + if (nameEntry.value) { + return await this.idempotentListener() + } + + // Persist loop key + const cr = await this.kv.set(nameKey, true) + if (!cr.ok) { + throw Error(`Failed to initialize loop "${name}"`) + } + + // Set id + const nameId = `loop-${name}` // Create loop enqueuer const enqueue = async ( @@ -481,20 +515,20 @@ export class KvDex> { // Try enqueuing until delivered on number of retries is exhausted for (let i = 0; i <= (options?.retry ?? DEFAULT_LOOP_RETRY); i++) { await this.enqueue(msg, { - idsIfUndelivered: [id], + idsIfUndelivered: [nameId], delay, - topic: id, + topic: nameId, }) // Check if message was delivered, break for-loop if successful - const doc = await this.findUndelivered(id) + const doc = await this.findUndelivered(nameId) if (doc === null) { break } // Delete undelivered entry before retrying - await this.deleteUndelivered(id) + await this.deleteUndelivered(nameId) } } @@ -506,12 +540,13 @@ export class KvDex> { exit = await options?.exitOn?.(msg) ?? false } catch (e) { console.error( - `An error was caught while running exitOn task for loop {ID = ${id}}`, + `An error was caught while running exitOn task for loop "${name}"`, e, ) } if (exit) { + await this.kv.delete(nameKey) await options?.onExit?.(msg) return } @@ -526,7 +561,7 @@ export class KvDex> { } } catch (e) { console.error( - `An error was caught while setting the next callback delay for loop {ID = ${id}}`, + `An error was caught while setting the next callback delay for loop "${name}"`, e, ) } @@ -542,7 +577,7 @@ export class KvDex> { timestamp: new Date(), first: false, }, delay) - }, { topic: id }) + }, { topic: nameId }) // Enqueue first task await enqueue({ @@ -554,7 +589,7 @@ export class KvDex> { }, options?.startDelay) // Return listener - return listener + return await listener } } diff --git a/tests/db/loop.test.ts b/tests/db/loop.test.ts index 879fbe4..3c1ffd0 100644 --- a/tests/db/loop.test.ts +++ b/tests/db/loop.test.ts @@ -1,6 +1,6 @@ import { kvdex } from "../../mod.ts" import { assert } from "../deps.ts" -import { createResolver, useDb, useKv } from "../utils.ts" +import { createResolver, sleep, useDb, useKv } from "../utils.ts" Deno.test("db - loop", async (t) => { await t.step( @@ -12,12 +12,12 @@ Deno.test("db - loop", async (t) => { let count1 = 0 let count2 = 0 - const listener1 = db.loop(() => count1++, { + const listener1 = db.loop("l1", () => count1++, { exitOn: ({ first }) => first, onExit: () => sleeper1.resolve(), }) - const listener2 = db.loop(() => count2++, { + const listener2 = db.loop("l2", () => count2++, { exitOn: (msg) => msg.count < 1, onExit: () => sleeper2.resolve(), }) @@ -42,6 +42,7 @@ Deno.test("db - loop", async (t) => { let count = 0 const listener = db.loop( + "l1", ({ first, result }) => { count++ return first ? 1 : result + 1 @@ -62,4 +63,37 @@ Deno.test("db - loop", async (t) => { }) }, ) + + await t.step( + "Should not initialize second loop with identical name", + async () => { + await useDb(async (db) => { + const sleeper = createResolver() + + let count1 = 0 + let count2 = 0 + + const listener1 = db.loop("l1", () => count1++, { + exitOn: ({ count }) => count === 10, + onExit: () => sleeper.resolve(), + delay: 50, + }) + + await sleep(10) + + const listener2 = db.loop("l1", () => count2++, { + exitOn: ({ count }) => count === 10, + onExit: () => sleeper.resolve(), + delay: 25, + }) + + await sleeper.promise + + assert(count1 === 10) + assert(count2 === 0) + + return async () => await Promise.all([listener1, listener2]) + }) + }, + ) }) diff --git a/tests/db/setInterval.test.ts b/tests/db/setInterval.test.ts index 2694127..5c5f5f1 100644 --- a/tests/db/setInterval.test.ts +++ b/tests/db/setInterval.test.ts @@ -1,5 +1,5 @@ import { assert } from "../deps.ts" -import { createResolver, useDb } from "../utils.ts" +import { createResolver, sleep, useDb } from "../utils.ts" Deno.test("db - setInterval", async (t) => { await t.step( @@ -14,19 +14,19 @@ Deno.test("db - setInterval", async (t) => { const sleeper2 = createResolver() const sleeper3 = createResolver() - const l1 = db.setInterval(() => count1++, { + const l1 = db.setInterval("i1", () => count1++, { interval: 10, exitOn: ({ count }) => count === 2, onExit: sleeper1.resolve, }) - const l2 = db.setInterval(() => count2++, { + const l2 = db.setInterval("i2", () => count2++, { interval: () => Math.random() * 20, exitOn: ({ first }) => first, onExit: sleeper2.resolve, }) - const l3 = db.setInterval(() => count3++, { + const l3 = db.setInterval("i3", () => count3++, { interval: 10, exitOn: ({ interval }) => interval > 0, onExit: sleeper3.resolve, @@ -44,4 +44,37 @@ Deno.test("db - setInterval", async (t) => { }) }, ) + + await t.step( + "Should not initialize second interval with identical name", + async () => { + await useDb(async (db) => { + const sleeper = createResolver() + + let count1 = 0 + let count2 = 0 + + const listener1 = db.setInterval("i1", () => count1++, { + exitOn: ({ count }) => count === 10, + onExit: () => sleeper.resolve(), + interval: 50, + }) + + await sleep(10) + + const listener2 = db.setInterval("i1", () => count2++, { + exitOn: ({ count }) => count === 10, + onExit: () => sleeper.resolve(), + interval: 25, + }) + + await sleeper.promise + + assert(count1 === 10) + assert(count2 === 0) + + return async () => await Promise.all([listener1, listener2]) + }) + }, + ) }) From 32e6e54a2ed1532ff28e8e7d76f2e8d99f4b7858 Mon Sep 17 00:00:00 2001 From: oliver-oloughlin Date: Sun, 26 Nov 2023 17:25:49 +0100 Subject: [PATCH 2/2] chore: updated readme --- README.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ca7bef2..4833ed8 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ through additional features such as indexing, strongly typed collections and serialization/compression, while maintaining as much of the native functionality as possible, like atomic operations and queue listeners. -_Supported Deno verisons:_ **^1.37.0** +_Supported Deno verisons:_ **^1.37.2** ## Highlights @@ -172,7 +172,10 @@ const db = kvdex(kv, { The schema definition contains collection builders, or nested schema definitions. Collections can hold any type adhering to KvValue. Indexing can be specified for collections of objects, while a custom id generator and -serialization can be set for all collections. +serialization can be set for all collections. Note that the default id setter +for documents is `crypto.randomUUID()`, which means documents are ordered at +random by default as KV orders values by their key. To store documents ordered +by insertion timestamp, set a custom idGenerator, such as `ulid()`. ## Collection Methods @@ -791,10 +794,10 @@ condition is met. Interval defaults to 1 hour if not set. ```ts // Will repeat indefinitely with 1 hour interval -db.setInterval(() => console.log("Hello World!")) +db.setInterval("greeting", () => console.log("Hello World!")) // First callback is invoked after a 10 second delay, after that there is a 5 second delay between callbacks -db.setInterval(() => console.log("I terminate after running 10 times"), { +db.setInterval("terminator", () => console.log("I terminate after running 10 times"), { // Delay before the first callback is invoked startDelay: 10_000, @@ -818,7 +821,7 @@ previous task finishes. ```ts // Prints "Hello World!" 10 times, with 1 second delay -db.loop(() => console.log("Hello World!"), { +db.loop("greeting", () => console.log("Hello World!"), { delay: 1_000, exitOn: ({ count }) => count === 10, })