Skip to content

Commit d0907b0

Browse files
Merge pull request #82 from oliver-oloughlin/feature/cron-jobs
added cron jobs and queue topics
2 parents 758d94e + 9e29abd commit d0907b0

19 files changed

+713
-254
lines changed

README.md

+49-22
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
# kvdex
22

3-
`kvdex` is an ORM-like wrapper for Deno KV with zero third-party dependencies.
4-
It's purpose is to enhance the experience of using Deno's KV store through
5-
additional features such as indexing and typed collections, while maintaining as
6-
much of the native functionality as possible, such as atomic operations.
3+
`kvdex` is a high level abstraction layer for Deno KV with zero third-party
4+
dependencies. It's purpose is to enhance the experience of using Deno's KV store
5+
through additional features such as indexing, and strongly typed collections,
6+
and cron jobs, while maintaining as much of the native functionality as
7+
possible, like atomic operations and queue listeners.
78

89
## Highlights
910

1011
- CRUD operations for selected and ranged documents with strong typing.
1112
- Primary (unique) and secondary (non-unique) indexing.
1213
- Segmented storage for large objects that exceed the native size limit.
1314
- Support for pagination and filtering.
14-
- Message queues at database and collection level.
15+
- Repeating cron jobs.
16+
- Message queues at database and collection level with topics.
1517
- Support for atomic operations.
1618

1719
## Table of Contents
@@ -53,6 +55,7 @@ much of the native functionality as possible, such as atomic operations.
5355
- [enqueue()](#enqueue-1)
5456
- [listenQueue()](#listenqueue-1)
5557
- [findUndelivered()](#findundelivered-1)
58+
- [cron()](#cron)
5659
- [atomic()](#atomic)
5760
- [Atomic Operations](#atomic-operations)
5861
- [Without checking](#without-checking)
@@ -432,26 +435,28 @@ const count = await db.users.count({
432435

433436
### enqueue()
434437

435-
Add data (of any type) to the collection queue to be delivered to the queue
436-
listener via `db.collection.listenQueue()`. The data will only be received by
437-
queue listeners on the specified collection. The method takes an optional
438-
options argument that can be used to set a delivery delay.
438+
Add data to the collection queue to be delivered to the queue listener via
439+
`db.collection.listenQueue()`. The data will only be received by queue listeners
440+
on the specified collection and topic. The method takes an optional options
441+
argument that can be used to set a delivery delay and topic.
439442

440443
```ts
441444
// Immediate delivery
442445
await db.users.enqueue("some data")
443446

444447
// Delay of 2 seconds before delivery
445-
await db.users.enqueue("some data", {
448+
await db.users.enqueue("cake", {
446449
delay: 2_000,
450+
topic: "food",
447451
})
448452
```
449453

450454
### listenQueue()
451455

452456
Listen for data from the collection queue that was enqueued with
453457
`db.collection.enqueue()`. Will only receive data that was enqueued to the
454-
specific collection queue. Takes a handler function as argument.
458+
specific collection queue and topic. Expects a handler function as argument, as
459+
well as optional options that can be used to set the topic.
455460

456461
```ts
457462
// Prints the data to console when recevied
@@ -463,11 +468,11 @@ db.users.listenQueue(async (data) => {
463468

464469
const res = await fetch("...", {
465470
method: "POST",
466-
body: dataBody,
471+
body: data,
467472
})
468473

469474
console.log("POSTED:", dataBody, res.ok)
470-
})
475+
}, { topic: "posts" })
471476
```
472477

473478
## Indexable Collection Methods
@@ -594,26 +599,28 @@ await db.deleteAll()
594599

595600
### enqueue()
596601

597-
Add data (of any type) to the database queue to be delivered to the queue
598-
listener via `db.listenQueue()`. The data will only be received by queue
599-
listeners on the database queue. The method takes an optional options argument
600-
that can be used to set a delivery delay.
602+
Add data to the database queue to be delivered to the queue listener via
603+
`db.listenQueue()`. The data will only be received by queue listeners on the
604+
database queue and specified topic. The method takes an optional options
605+
argument that can be used to set a delivery delay and topic.
601606

602607
```ts
603608
// Immediate delivery
604609
await db.enqueue("some data")
605610

606611
// Delay of 2 seconds before delivery
607-
await db.enqueue("some data", {
612+
await db.enqueue("cake", {
608613
delay: 2_000,
614+
topic: "food",
609615
})
610616
```
611617

612618
### listenQueue()
613619

614620
Listen for data from the database queue that was enqueued with `db.enqueue()`.
615-
Will only receive data that was enqueued to the database queue. Takes a handler
616-
function as argument.
621+
Will only receive data that was enqueued to the database queue and specified
622+
topic. Expects a handler function as argument, as well as optional options that
623+
can be used to set the topic.
617624

618625
```ts
619626
// Prints the data to console when recevied
@@ -625,11 +632,11 @@ db.listenQueue(async (data) => {
625632

626633
const res = await fetch("...", {
627634
method: "POST",
628-
body: dataBody,
635+
body: data,
629636
})
630637

631638
console.log("POSTED:", dataBody, res.ok)
632-
})
639+
}, { topic: "posts" })
633640
```
634641

635642
### findUndelivered()
@@ -646,6 +653,26 @@ const doc2 = await db.findUndelivered("undelivered_id", {
646653
})
647654
```
648655

656+
### cron()
657+
658+
Create a cron job that will run on interval, either indefinitely or until an
659+
exit condition is met. If no interval is set, the next job will run immediately
660+
after the previous has finished. Like with queue listeners, there can be
661+
multiple cron jobs defined.
662+
663+
```ts
664+
// Will repeat indeefinitely without delay
665+
db.cron(() => console.log("Hello World!"))
666+
667+
// First job starts with a 10 second delay, after that there is a 5 second delay between jobs
668+
// Will terminate after the 10th run (count starts at 0), or if the job returns n < 0.25
669+
db.cron(() => Math.random(), {
670+
startDelay: 10_000,
671+
interval: 5_000,
672+
exit: ({ count, result }) => count >= 10 || result < 0.25,
673+
})
674+
```
675+
649676
### atomic()
650677

651678
Initiate an atomic operation. The method takes a selection function as argument

src/atomic_builder.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type {
1414
KvValue,
1515
Model,
1616
Operations,
17+
QueueValue,
1718
Schema,
1819
SchemaDefinition,
1920
} from "./types.ts"
@@ -437,9 +438,14 @@ export class AtomicBuilder<
437438
return this
438439
}
439440

440-
enqueue(data: KvValue, options?: EnqueueOptions) {
441+
enqueue(data: QueueValue, options?: EnqueueOptions) {
441442
// Prepare and add enqueue operation
442-
const prep = prepareEnqueue(this.collection._keys.baseKey, data, options)
443+
const prep = prepareEnqueue(
444+
this.collection._keys.baseKey,
445+
data,
446+
options,
447+
)
448+
443449
this.operations.atomic.enqueue(prep.msg, prep.options)
444450

445451
// Return current AtomicBuilder

src/collection.ts

+39-27
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,22 @@ import type {
1717
KvObject,
1818
KvValue,
1919
ListOptions,
20+
QueueListenerOptions,
2021
QueueMessageHandler,
22+
QueueValue,
2123
SetOptions,
2224
UpdateData,
2325
UpdateManyOptions,
2426
} from "./types.ts"
2527
import {
2628
allFulfilled,
29+
createHandlerId,
2730
createListSelector,
2831
extendKey,
2932
generateId,
3033
getDocumentId,
3134
isKvObject,
32-
keyEq,
3335
kvGetMany,
34-
parseQueueMessage,
3536
prepareEnqueue,
3637
} from "./utils.ts"
3738
import { Document } from "./document.ts"
@@ -45,7 +46,11 @@ export class Collection<
4546
const T1 extends KvValue,
4647
const T2 extends CollectionOptions<T1>,
4748
> {
49+
private queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>
50+
private idempotentListener: () => void
51+
4852
protected kv: Deno.Kv
53+
4954
readonly _idGenerator: IdGenerator<KvValue>
5055
readonly _keys: CollectionKeys
5156

@@ -60,7 +65,17 @@ export class Collection<
6065
*
6166
* @param options - Collection options.
6267
*/
63-
constructor(kv: Deno.Kv, key: KvKey, options?: T2) {
68+
constructor(
69+
kv: Deno.Kv,
70+
key: KvKey,
71+
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
72+
idempotentListener: () => void,
73+
options?: T2,
74+
) {
75+
// Set reference to queue handlers and idempotent listener
76+
this.queueHandlers = queueHandlers
77+
this.idempotentListener = idempotentListener
78+
6479
// Set the KV instance
6580
this.kv = kv
6681

@@ -535,9 +550,15 @@ export class Collection<
535550
* @param options - Enqueue options, optional.
536551
* @returns - Promise resolving to Deno.KvCommitResult.
537552
*/
538-
async enqueue(data: KvValue, options?: EnqueueOptions) {
539-
// Prepare and perform enqueue operation
540-
const prep = prepareEnqueue(this._keys.baseKey, data, options)
553+
async enqueue(data: QueueValue, options?: EnqueueOptions) {
554+
// Prepare message and options for enqueue
555+
const prep = prepareEnqueue(
556+
this._keys.baseKey,
557+
data,
558+
options,
559+
)
560+
561+
// Enqueue message with options
541562
return await this.kv.enqueue(prep.msg, prep.options)
542563
}
543564

@@ -565,32 +586,23 @@ export class Collection<
565586
* ```
566587
*
567588
* @param handler - Message handler function.
589+
* @param options - Queue listener options.
590+
* @returns void.
568591
*/
569-
async listenQueue<T extends KvValue = KvValue>(
592+
listenQueue<T extends QueueValue = QueueValue>(
570593
handler: QueueMessageHandler<T>,
594+
options?: QueueListenerOptions,
571595
) {
572-
// Listen for kv queue messages
573-
await this.kv.listenQueue(async (msg) => {
574-
// Parse queue message
575-
const parsed = parseQueueMessage<T>(msg)
576-
577-
// If failed parse, ignore
578-
if (!parsed.ok) {
579-
return
580-
}
596+
// Create handler id
597+
const handlerId = createHandlerId(this._keys.baseKey, options?.topic)
581598

582-
// Destruct queue message
583-
const { collectionKey, data } = parsed.msg
599+
// Add new handler to specified handlers
600+
const handlers = this.queueHandlers.get(handlerId) ?? []
601+
handlers.push(handler as QueueMessageHandler<QueueValue>)
602+
this.queueHandlers.set(handlerId, handlers)
584603

585-
// Check that collection key is set and matches current collection context
586-
if (
587-
Array.isArray(collectionKey) &&
588-
keyEq(collectionKey, this._keys.baseKey)
589-
) {
590-
// Invoke data handler
591-
await handler(data)
592-
}
593-
})
604+
// Activate idempotent listener
605+
this.idempotentListener()
594606
}
595607

596608
/**

src/collection_builder.ts

+26-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import type {
99
LargeCollectionOptions,
1010
LargeKvValue,
1111
Model,
12+
QueueMessageHandler,
13+
QueueValue,
1214
} from "./types.ts"
1315

1416
/**
@@ -51,10 +53,17 @@ class CollectionBuilder<const T extends KvValue> {
5153
build(
5254
options?: CollectionOptions<T>,
5355
) {
54-
return (kv: Deno.Kv, key: KvKey) =>
56+
return (
57+
kv: Deno.Kv,
58+
key: KvKey,
59+
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
60+
idempotentListener: () => void,
61+
) =>
5562
new Collection<T, CollectionOptions<T>>(
5663
kv,
5764
key,
65+
queueHandlers,
66+
idempotentListener,
5867
options,
5968
)
6069
}
@@ -73,10 +82,17 @@ class IndexableCollectionBuilder<const T extends Model> {
7382
build<const T2 extends IndexableCollectionOptions<T>>(
7483
options: T2,
7584
) {
76-
return (kv: Deno.Kv, key: KvKey) =>
85+
return (
86+
kv: Deno.Kv,
87+
key: KvKey,
88+
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
89+
idempotentListener: () => void,
90+
) =>
7791
new IndexableCollection<T, T2>(
7892
kv,
7993
key,
94+
queueHandlers,
95+
idempotentListener,
8096
options,
8197
)
8298
}
@@ -93,10 +109,17 @@ class LargeCollectionBuilder<const T extends LargeKvValue> {
93109
* @returns A LargeCollection instance.
94110
*/
95111
build(options?: LargeCollectionOptions<T>) {
96-
return (kv: Deno.Kv, key: KvKey) =>
112+
return (
113+
kv: Deno.Kv,
114+
key: KvKey,
115+
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
116+
idempotentListener: () => void,
117+
) =>
97118
new LargeCollection<T, LargeCollectionOptions<T>>(
98119
kv,
99120
key,
121+
queueHandlers,
122+
idempotentListener,
100123
options,
101124
)
102125
}

src/indexable_collection.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import type {
1818
ListOptions,
1919
Model,
2020
PrimaryIndexKeys,
21+
QueueMessageHandler,
22+
QueueValue,
2123
SecondaryIndexKeys,
2224
SetOptions,
2325
UpdateData,
@@ -64,9 +66,15 @@ export class IndexableCollection<
6466
*
6567
* @param options - Indexable Collection options.
6668
*/
67-
constructor(kv: Deno.Kv, key: KvKey, options: T2) {
69+
constructor(
70+
kv: Deno.Kv,
71+
key: KvKey,
72+
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
73+
idempotentListener: () => void,
74+
options: T2,
75+
) {
6876
// Invoke super constructor
69-
super(kv, key, options)
77+
super(kv, key, queueHandlers, idempotentListener, options)
7078

7179
// Set indexable collection keys
7280
this._keys = {

0 commit comments

Comments
 (0)