Skip to content

Commit

Permalink
Merge pull request #128 from oliver-oloughlin/feature/persistence-pro…
Browse files Browse the repository at this point in the history
…tected-loops-and-intervals

feat: protect against re-initializing same loop/interval
  • Loading branch information
oliver-oloughlin authored Nov 26, 2023
2 parents 3ca648b + 32e6e54 commit 739094a
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 41 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ through additional features such as indexing, strongly typed collections and
serialization/compression, while maintaining as much of the native functionality
as possible, like atomic operations and queue listeners.

_Supported Deno verisons:_ **^1.37.0**
_Supported Deno verisons:_ **^1.37.2**

## Highlights

Expand Down Expand Up @@ -172,7 +172,10 @@ const db = kvdex(kv, {
The schema definition contains collection builders, or nested schema
definitions. Collections can hold any type adhering to KvValue. Indexing can be
specified for collections of objects, while a custom id generator and
serialization can be set for all collections.
serialization can be set for all collections. Note that the default id setter
for documents is `crypto.randomUUID()`, which means documents are ordered at
random by default as KV orders values by their key. To store documents ordered
by insertion timestamp, set a custom idGenerator, such as `ulid()`.

## Collection Methods

Expand Down Expand Up @@ -791,10 +794,10 @@ condition is met. Interval defaults to 1 hour if not set.

```ts
// Will repeat indefinitely with 1 hour interval
db.setInterval(() => console.log("Hello World!"))
db.setInterval("greeting", () => console.log("Hello World!"))

// First callback is invoked after a 10 second delay, after that there is a 5 second delay between callbacks
db.setInterval(() => console.log("I terminate after running 10 times"), {
db.setInterval("terminator", () => console.log("I terminate after running 10 times"), {
// Delay before the first callback is invoked
startDelay: 10_000,

Expand All @@ -818,9 +821,9 @@ previous task finishes.

```ts
// Prints "Hello World!" 10 times, with 1 second delay
db.loop(() => console.log("Hello World!"), {
db.loop("greeting", () => console.log("Hello World!"), {
delay: 1_000,
exitOn: ({ count }) => count >= 9,
exitOn: ({ count }) => count === 10,
})
```

Expand Down
4 changes: 4 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ export const SEGMENT_KEY_PREFIX = "__segment__"

export const UNDELIVERED_KEY_PREFIX = "__undelivered__"

export const INTERVAL_KEY_PREFIX = "__interval__"

export const LOOP_KEY_PREFIX = "__loop__"

// Fixed limits
export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000

Expand Down
91 changes: 63 additions & 28 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
DEFAULT_INTERVAL_RETRY,
DEFAULT_LOOP_RETRY,
KVDEX_KEY_PREFIX,
LOOP_KEY_PREFIX,
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"
import { model } from "./model.ts"
Expand Down Expand Up @@ -337,10 +338,10 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
* @example
* ```ts
* // Will repeat indeefinitely with 1 hour interval
* db.setInterval(() => console.log("Hello World!"))
* db.setInterval("greeting", () => console.log("Hello World!"))
*
* // First callback starts after a 10 second delay, after that there is a 5 second delay between callbacks
* db.setInterval(() => console.log("I terminate after the 10th run"), {
* db.setInterval("terminator", () => console.log("I terminate after the 10th run"), {
* // 10 second delay before the first job callback invoked
* startDelay: 10_000,
*
Expand All @@ -355,16 +356,32 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
* })
* ```
*
* @param name - Interval identifier.
* @param fn - Callback function.
* @param options - Set interval options.
* @returns Listener promise.
* @returns A listener promise.
*/
async setInterval(
name: string,
fn: (msg: IntervalMessage) => unknown,
options?: SetIntervalOptions,
) {
// Create interval handler id
const id = crypto.randomUUID()
// Check if loop has already been initialized
const nameKey = extendKey([KVDEX_KEY_PREFIX, LOOP_KEY_PREFIX], name)
const nameEntry = await this.kv.get<true>(nameKey)

if (nameEntry.value) {
return await this.idempotentListener()
}

// Persist interval key
const cr = await this.kv.set(nameKey, true)
if (!cr.ok) {
throw Error(`Failed to initialize interval "${name}"`)
}

// Set id
const nameId = `interval-${name}`

// Create interval enqueuer
const enqueue = async (
Expand All @@ -374,20 +391,20 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
// Try enqueuing until delivered on number of retries is exhausted
for (let i = 0; i <= (options?.retry ?? DEFAULT_INTERVAL_RETRY); i++) {
await this.enqueue(msg, {
idsIfUndelivered: [id],
idsIfUndelivered: [nameId],
delay,
topic: id,
topic: nameId,
})

// Check if message was delivered, break for-loop if successful
const doc = await this.findUndelivered(id)
const doc = await this.findUndelivered(nameId)

if (doc === null) {
break
}

// Delete undelivered entry before retrying
await this.deleteUndelivered(id)
await this.deleteUndelivered(nameId)
}
}

Expand All @@ -399,12 +416,13 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
exit = await options?.exitOn?.(msg) ?? false
} catch (e) {
console.error(
`An error was caught while running exitOn task for interval {ID = ${id}}`,
`An error was caught while running exitOn task for interval "${name}"`,
e,
)
}

if (exit) {
await this.kv.delete(nameKey)
await options?.onExit?.(msg)
return
}
Expand All @@ -419,7 +437,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
}
} catch (e) {
console.error(
`An error was caught while setting the next callback delay for interval {ID = ${id}}`,
`An error was caught while setting the next callback delay for interval "${name}`,
e,
)
}
Expand All @@ -436,7 +454,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
// Invoke callback function
fn(msg),
])
}, { topic: id })
}, { topic: nameId })

// Enqueue first task
await enqueue({
Expand All @@ -447,7 +465,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
}, options?.startDelay)

// Return listener
return listener
return await listener
}

/**
Expand All @@ -456,22 +474,38 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
* @example
* ```ts
* // Prints "Hello World!" 10 times, with 1 second delay
* db.loop(() => console.log("Hello World!"), {
* db.loop("greeting", () => console.log("Hello World!"), {
* delay: 1_000,
* exitOn: ({ count }) => count <= 9,
* exitOn: ({ count }) => count === 10,
* })
* ```
*
* @param fn
* @param options
* @returns
* @param name - Loop identifier.
* @param fn - Callback function.
* @param options - Loop options.
* @returns - A listener promise.
*/
async loop<const T1 extends QueueValue>(
name: string,
fn: (msg: LoopMessage<Awaited<T1>>) => T1 | Promise<T1>,
options?: LoopOptions<Awaited<T1>>,
) {
// Create loop handler id
const id = crypto.randomUUID()
// Check if loop has already been initialized
const nameKey = extendKey([KVDEX_KEY_PREFIX, LOOP_KEY_PREFIX], name)
const nameEntry = await this.kv.get<true>(nameKey)

if (nameEntry.value) {
return await this.idempotentListener()
}

// Persist loop key
const cr = await this.kv.set(nameKey, true)
if (!cr.ok) {
throw Error(`Failed to initialize loop "${name}"`)
}

// Set id
const nameId = `loop-${name}`

// Create loop enqueuer
const enqueue = async (
Expand All @@ -481,20 +515,20 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
// Try enqueuing until delivered on number of retries is exhausted
for (let i = 0; i <= (options?.retry ?? DEFAULT_LOOP_RETRY); i++) {
await this.enqueue(msg, {
idsIfUndelivered: [id],
idsIfUndelivered: [nameId],
delay,
topic: id,
topic: nameId,
})

// Check if message was delivered, break for-loop if successful
const doc = await this.findUndelivered(id)
const doc = await this.findUndelivered(nameId)

if (doc === null) {
break
}

// Delete undelivered entry before retrying
await this.deleteUndelivered(id)
await this.deleteUndelivered(nameId)
}
}

Expand All @@ -506,12 +540,13 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
exit = await options?.exitOn?.(msg) ?? false
} catch (e) {
console.error(
`An error was caught while running exitOn task for loop {ID = ${id}}`,
`An error was caught while running exitOn task for loop "${name}"`,
e,
)
}

if (exit) {
await this.kv.delete(nameKey)
await options?.onExit?.(msg)
return
}
Expand All @@ -526,7 +561,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
}
} catch (e) {
console.error(
`An error was caught while setting the next callback delay for loop {ID = ${id}}`,
`An error was caught while setting the next callback delay for loop "${name}"`,
e,
)
}
Expand All @@ -542,7 +577,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
timestamp: new Date(),
first: false,
}, delay)
}, { topic: id })
}, { topic: nameId })

// Enqueue first task
await enqueue({
Expand All @@ -554,7 +589,7 @@ export class KvDex<const TSchema extends Schema<SchemaDefinition>> {
}, options?.startDelay)

// Return listener
return listener
return await listener
}
}

Expand Down
40 changes: 37 additions & 3 deletions tests/db/loop.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { kvdex } from "../../mod.ts"
import { assert } from "../deps.ts"
import { createResolver, useDb, useKv } from "../utils.ts"
import { createResolver, sleep, useDb, useKv } from "../utils.ts"

Deno.test("db - loop", async (t) => {
await t.step(
Expand All @@ -12,12 +12,12 @@ Deno.test("db - loop", async (t) => {
let count1 = 0
let count2 = 0

const listener1 = db.loop(() => count1++, {
const listener1 = db.loop("l1", () => count1++, {
exitOn: ({ first }) => first,
onExit: () => sleeper1.resolve(),
})

const listener2 = db.loop(() => count2++, {
const listener2 = db.loop("l2", () => count2++, {
exitOn: (msg) => msg.count < 1,
onExit: () => sleeper2.resolve(),
})
Expand All @@ -42,6 +42,7 @@ Deno.test("db - loop", async (t) => {
let count = 0

const listener = db.loop<number>(
"l1",
({ first, result }) => {
count++
return first ? 1 : result + 1
Expand All @@ -62,4 +63,37 @@ Deno.test("db - loop", async (t) => {
})
},
)

await t.step(
"Should not initialize second loop with identical name",
async () => {
await useDb(async (db) => {
const sleeper = createResolver()

let count1 = 0
let count2 = 0

const listener1 = db.loop("l1", () => count1++, {
exitOn: ({ count }) => count === 10,
onExit: () => sleeper.resolve(),
delay: 50,
})

await sleep(10)

const listener2 = db.loop("l1", () => count2++, {
exitOn: ({ count }) => count === 10,
onExit: () => sleeper.resolve(),
delay: 25,
})

await sleeper.promise

assert(count1 === 10)
assert(count2 === 0)

return async () => await Promise.all([listener1, listener2])
})
},
)
})
Loading

0 comments on commit 739094a

Please sign in to comment.