Skip to content

Commit

Permalink
Merge pull request #142 from oliver-oloughlin/feature/temporary-history
Browse files Browse the repository at this point in the history
fix: set history in atomic operations
  • Loading branch information
oliver-oloughlin authored Dec 3, 2023
2 parents ec2507c + 7559196 commit b7d646a
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 154 deletions.
242 changes: 103 additions & 139 deletions src/atomic_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Collection } from "./collection.ts"
import { ulid } from "./deps.ts"
import { InvalidCollectionError } from "./errors.ts"
import type {
AtomicCheck,
Expand All @@ -7,6 +8,7 @@ import type {
CollectionOptions,
CollectionSelector,
EnqueueOptions,
HistoryEntry,
KvId,
KvObject,
KvValue,
Expand All @@ -20,7 +22,6 @@ import {
allFulfilled,
deleteIndices,
extendKey,
getDocumentId,
keyEq,
prepareEnqueue,
setIndices,
Expand Down Expand Up @@ -63,9 +64,10 @@ export class AtomicBuilder<
)
}

// Set kv and schema
// Set kv, schema and collection context
this.kv = kv
this.schema = schema
this.collection = collection

// Initiate operations or set from given operations
this.operations = operations ?? {
Expand All @@ -74,9 +76,6 @@ export class AtomicBuilder<
indexDeleteCollectionKeys: [],
indexAddCollectionKeys: [],
}

// Set colection context
this.collection = collection
}

/**
Expand Down Expand Up @@ -122,8 +121,7 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
add(value: ParseInputType<TInput, TOutput>, options?: AtomicSetOptions) {
// Perform set operation with generated id.
return this.set(null, value, options)
return this.setDocument(null, value, options)
}

/**
Expand All @@ -145,43 +143,11 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
set(
id: KvId | null,
id: KvId,
value: ParseInputType<TInput, TOutput>,
options?: AtomicSetOptions,
) {
// Create id key from collection id key and id
const collection = this.collection
const parsed = collection._model.parse(value as TInput)
const docId = id ?? collection._idGenerator(parsed)
const idKey = extendKey(collection._keys.id, docId)

// Add set operation
this.operations.atomic.check({ key: idKey, versionstamp: null }).set(
idKey,
parsed,
options,
)

if (collection._isIndexable) {
// Set data as KvObject type
const _data = parsed as KvObject

// Add collection id key for collision detection
this.operations.indexAddCollectionKeys.push(collection._keys.base)

// Add indexing operations
setIndices(
docId,
_data,
_data,
this.operations.atomic,
this.collection,
options,
)
}

// Return current AtomicBuilder
return this
return this.setDocument(id, value, options)
}

/**
Expand Down Expand Up @@ -221,6 +187,18 @@ export class AtomicBuilder<
})
}

// Set history entry if keeps history
if (this.collection._keepsHistory) {
const historyKey = extendKey(this.collection._keys.history, id, ulid())

const historyEntry: HistoryEntry<TOutput> = {
type: "delete",
timestamp: new Date(),
}

this.operations.atomic.set(historyKey, historyEntry)
}

// Return current AtomicBuilder
return this
}
Expand Down Expand Up @@ -276,13 +254,8 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
sum(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never) {
// Create id key from id and collection id key
const idKey = extendKey(this.collection._keys.id, id)

// Add sum operation to atomic ops list
this.operations.atomic.sum(idKey, value)

// Return current AtomicBuilder
return this
}

Expand All @@ -303,13 +276,8 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
min(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never) {
// Create id key from id and collection id key
const idKey = extendKey(this.collection._keys.id, id)

// Add min operation to atomic ops list
this.operations.atomic.min(idKey, value)

// Return current AtomicBuilder
return this
}

Expand All @@ -330,13 +298,8 @@ export class AtomicBuilder<
* @returns Current AtomicBuilder instance.
*/
max(id: KvId, value: TOutput extends Deno.KvU64 ? bigint : never) {
// Create id key from id and collection id key
const idKey = extendKey(this.collection._keys.id, id)

// Add max operation to atomic ops list
this.operations.atomic.max(idKey, value)

// Return current AtomicBuilder
return this
}

Expand All @@ -363,89 +326,28 @@ export class AtomicBuilder<
* @param mutations - Atomic mutations to be performed.
* @returns Current AtomicBuilder instance.
*/
mutate(...mutations: AtomicMutation<TOutput>[]) {
// Get collection ref
const collection = this.collection

// Map from atomic mutations to kv mutations
const kvMutations: Deno.KvMutation[] = mutations.map(({ id, ...rest }) => {
const idKey = extendKey(collection._keys.id, id)

if (rest.type === "delete") {
return {
key: idKey,
...rest,
}
}

const { value: _, ...resTOutput } = rest
// deno-lint-ignore no-explicit-any
const parsed = collection._model.parse(rest.value as any)

return {
key: idKey,
value: parsed,
...resTOutput,
} as Deno.KvMutation
})

// Add mutation operation to atomic ops list
this.operations.atomic.mutate(...kvMutations)

// Addtional checks
kvMutations.forEach((mut) => {
// If mutation type is "set", add check operation
if (mut.type === "set") {
this.operations.atomic.check({
key: mut.key,
versionstamp: null,
})
}

// If collection is indexable, handle indexing
if (collection._isIndexable) {
// Get document id from mutation key
const id = getDocumentId(mut.key)

// If id is undefined, continue to next mutation
if (typeof id === "undefined") {
return
}

// If mutation type is "set", handle setting of indices
if (mut.type === "set") {
// Add collection key for collision detection
this.operations.indexAddCollectionKeys.push(collection._keys.base)

// Add indexing operations to atomic ops list
setIndices(
id,
mut.value as KvObject,
mut.value as KvObject,
this.operations.atomic,
this.collection,
{
...mut,
},
)
}

// If mutation type is "delete", create and add delete preperation function
if (mut.type === "delete") {
// Add collection key for collision detection
this.operations.indexDeleteCollectionKeys.push(
collection._keys.base,
)

// Add delete preperation function to delete preperation functions list
this.operations.prepareDeleteFns.push(async (kv) => {
const doc = await kv.get<KvObject>(mut.key)
return {
id,
data: doc.value ?? {},
}
})
}
mutate(...mutations: AtomicMutation<ParseInputType<TInput, TOutput>>[]) {
// Add each atomic mutation by case
mutations.forEach(({ id, ...rest }) => {
switch (rest.type) {
case "delete":
this.delete(id)
break
case "set":
this.set(id, rest.value, { expireIn: rest.expireIn })
break
case "add":
this.add(rest.value, { expireIn: rest.expireIn })
break
case "max":
this.max(id, rest.value as any)
break
case "min":
this.min(id, rest.value as any)
break
case "sum":
this.sum(id, rest.value as any)
break
}
})

Expand Down Expand Up @@ -555,4 +457,66 @@ export class AtomicBuilder<
// Return commit result
return commitResult
}

/***********************/
/* */
/* PRIVATE METHODS */
/* */
/***********************/

/**
* Set a new document entry
*
* @param id
* @param value
* @param options
* @returns
*/
private setDocument(
id: KvId | null,
value: ParseInputType<TInput, TOutput>,
options?: AtomicSetOptions,
) {
// Create id key from collection id key and id
const collection = this.collection
const parsed = collection._model.parse(value as TInput)
const docId = id ?? collection._idGenerator(parsed)
const idKey = extendKey(collection._keys.id, docId)

// Add set operation
this.operations.atomic
.check({ key: idKey, versionstamp: null })
.set(idKey, parsed, options)

if (collection._isIndexable) {
// Add collection id key for collision detection
this.operations.indexAddCollectionKeys.push(collection._keys.base)

// Add indexing operations
setIndices(
docId,
parsed as KvObject,
parsed as KvObject,
this.operations.atomic,
this.collection,
options,
)
}

// Set history entry if keeps history
if (this.collection._keepsHistory) {
const historyKey = extendKey(this.collection._keys.history, docId, ulid())

const historyEntry: HistoryEntry<TOutput> = {
type: "write",
timestamp: new Date(),
value: parsed,
}

this.operations.atomic.set(historyKey, historyEntry)
}

// Return current AtomicBuilder
return this
}
}
13 changes: 9 additions & 4 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export type AtomicCheck<T extends KvValue> = {
versionstamp: Document<T>["versionstamp"]
}

export type AtomicMutation<T extends KvValue> =
export type AtomicMutation<T> =
& {
id: KvId
}
Expand All @@ -199,17 +199,22 @@ export type AtomicMutation<T extends KvValue> =
value: T
expireIn?: number
}
| {
type: "add"
value: T
expireIn?: number
}
| {
type: "sum"
value: T extends Deno.KvU64 ? T : never
value: T extends Deno.KvU64 ? bigint : never
}
| {
type: "min"
value: T extends Deno.KvU64 ? T : never
value: T extends Deno.KvU64 ? bigint : never
}
| {
type: "max"
value: T extends Deno.KvU64 ? T : never
value: T extends Deno.KvU64 ? bigint : never
}
| {
type: "delete"
Expand Down
Loading

0 comments on commit b7d646a

Please sign in to comment.