Skip to content

Commit

Permalink
feat: Add functions to the Kv API in order to query ranges of keys an…
Browse files Browse the repository at this point in the history
…d/or values #345 - check previous transaction finished
  • Loading branch information
Tadeuchi committed Apr 14, 2023
1 parent e978de7 commit cc47e63
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/cache/SortKeyCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface SortKeyCache<V> {

close(): Promise<void>;

begin(): void;
begin(): Promise<void>;

rollback(): void;

Expand Down
72 changes: 58 additions & 14 deletions src/cache/impl/LevelDbCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import { AbstractKeyIteratorOptions } from 'abstract-level/types/abstract-iterat
*/

class ClientValueWrapper<V> {
constructor(readonly value: V, readonly tombstone: boolean = false) {}
constructor(readonly value: V, readonly tomb: boolean = false) {}
}

export class LevelDbCache<V> implements SortKeyCache<V> {
private readonly ongoingTransactionMark = '$$warp-internal-transaction$$';

private readonly logger = LoggerFactory.INST.create('LevelDbCache');
private readonly subLevelSeparator: string;
private readonly subLevelOptions: AbstractSublevelOptions<string, ClientValueWrapper<V>>;
Expand Down Expand Up @@ -79,7 +81,12 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
await contractCache.open();
try {
const result: ClientValueWrapper<V> = await contractCache.get(cacheKey.sortKey);
const resultValue = result.tombstone ? null : result.value;
let resultValue: V;
if (result.tomb === undefined && result.value === undefined) {
resultValue = result as V;
} else {
resultValue = result.tomb ? null : result.value;
}
return new SortKeyCacheResult<V>(cacheKey.sortKey, resultValue);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
Expand All @@ -98,7 +105,10 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
const keys = await contractCache.keys({ reverse: true, limit: 1 }).all();
if (keys.length) {
const lastValueWrap = await contractCache.get(keys[0]);
if (!lastValueWrap.tombstone) {
if (lastValueWrap.tomb === undefined && lastValueWrap.value === undefined) {
return new SortKeyCacheResult<V>(keys[0], lastValueWrap as V);
}
if (!lastValueWrap.tomb) {
return new SortKeyCacheResult<V>(keys[0], lastValueWrap.value);
}
}
Expand All @@ -112,7 +122,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
const keys = await contractCache.keys({ reverse: true, lte: sortKey, limit: 1 }).all();
if (keys.length) {
const cachedVal = await contractCache.get(keys[0]);
if (!cachedVal.tombstone) {
if (!cachedVal.tomb) {
return new SortKeyCacheResult<V>(keys[0], cachedVal.value);
}
}
Expand All @@ -124,7 +134,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}

/**
* Delete operation under the hood is a write operation with setting tombstone flag to true.
* Delete operation under the hood is a write operation with setting tomb flag to true.
* The idea behind is based on Cassandra Tombstone
* https://www.instaclustr.com/support/documentation/cassandra/using-cassandra/managing-tombstones-in-cassandra/
* There is a couple of benefits to this approach:
Expand All @@ -142,10 +152,9 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
// manually opening to fix https://github.com/Level/level/issues/221
await contractCache.open();
await contractCache.put(stateCacheKey.sortKey, valueWrapper);
if (!this._rollbackBatch) {
this.begin();
if (this._rollbackBatch) {
this._rollbackBatch.del(stateCacheKey.sortKey, { sublevel: contractCache });
}
this._rollbackBatch.del(stateCacheKey.sortKey, { sublevel: contractCache });
}

async delete(key: string): Promise<void> {
Expand All @@ -166,7 +175,6 @@ export class LevelDbCache<V> implements SortKeyCache<V> {

async open(): Promise<void> {
await this.db.open();
await this.begin();
}

async close(): Promise<void> {
Expand All @@ -175,20 +183,53 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}
}

begin() {
this._rollbackBatch = this.db.batch();
async begin(): Promise<void> {
await this.initRollbackBatch();
}

async rollback() {
if (this._rollbackBatch && this._rollbackBatch.length > 0) {
if (this._rollbackBatch) {
this._rollbackBatch.del(this.ongoingTransactionMark);
await this._rollbackBatch.write();
await this._rollbackBatch.close();
}
this._rollbackBatch = null;
}

private async initRollbackBatch(): Promise<
AbstractChainedBatch<MemoryLevel<string, ClientValueWrapper<V>>, string, ClientValueWrapper<V>>
> {
if (this._rollbackBatch == null) {
await this.checkPreviousTransactionFinished();
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
await this.db.put(this.ongoingTransactionMark, 'ongoing');

this._rollbackBatch = this.db.batch();
}
return this._rollbackBatch;
}

private async checkPreviousTransactionFinished() {
let transactionMarkValue;

try {
transactionMarkValue = await this.db.get(this.ongoingTransactionMark);
// eslint-disable-next-line no-empty
} catch (error) {}

if (transactionMarkValue == 'ongoing') {
throw new Error(`Database seems to be in inconsistent state. The previous transaction has not finished.`);
}
}

async commit() {
if (this._rollbackBatch) {
await this._rollbackBatch.clear().close();
await this._rollbackBatch.clear();
await this.db.del(this.ongoingTransactionMark);
await this._rollbackBatch.close();
}
this._rollbackBatch = null;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -207,7 +248,7 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
for (const joinedKey of keys) {
// default joined key format used by sub-levels:
// <separator><contract_tx_id (43 chars)><separator><sort_key>
const sortKey = joinedKey.substring(45);
const sortKey = joinedKey.split(this.subLevelSeparator)[1];
if (sortKey.localeCompare(lastSortKey) > 0) {
lastSortKey = sortKey;
}
Expand All @@ -221,6 +262,9 @@ export class LevelDbCache<V> implements SortKeyCache<V> {
}

validateKey(key: string) {
if (key.includes(this.ongoingTransactionMark)) {
throw new Error(`Validation error: Key ${key} for internal use only`);
}
if (key.includes(this.subLevelSeparator)) {
throw new Error(`Validation error: key ${key} contains db separator ${this.subLevelSeparator}`);
}
Expand Down
3 changes: 2 additions & 1 deletion src/contract/states/ContractInteractionState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ export class ContractInteractionState implements InteractionState {

async commit(interaction: GQLNodeInterface): Promise<void> {
if (interaction.dry) {
return await this.rollbackKVs();
await this.rollbackKVs();
return this.reset();
}
try {
await this.doStoreJson(this._json, interaction);
Expand Down
1 change: 1 addition & 0 deletions src/core/modules/impl/handler/JsHandlerApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export class JsHandlerApi<State> extends AbstractContractHandler<State> {

try {
await this.swGlobal.kv.open();
await this.swGlobal.kv.begin();

const handlerResult = await Promise.race([timeoutPromise, this.contractFunction(stateClone, interaction)]);

Expand Down
14 changes: 9 additions & 5 deletions src/core/modules/impl/handler/WasmHandlerApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ export class WasmHandlerApi<State> extends AbstractContractHandler<State> {
this.assignWrite(executionContext);

await this.swGlobal.kv.open();
await this.swGlobal.kv.begin();
const handlerResult = await this.doHandle(interaction);
await this.swGlobal.kv.commit();

if (interactionData.interaction.interactionType === 'view') {
// view calls are not allowed to perform any KV modifications
await this.swGlobal.kv.rollback();
} else {
await this.swGlobal.kv.commit();
}

return {
type: 'ok',
result: handlerResult,
Expand All @@ -62,10 +70,6 @@ export class WasmHandlerApi<State> extends AbstractContractHandler<State> {
};
}
} finally {
if (interactionData.interaction.interactionType === 'view') {
// view calls are not allowed to perform any KV modifications
await this.swGlobal.kv.rollback();
}
await this.swGlobal.kv.close();
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/legacy/smartweave-global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ export class KV {
return this._storage.kvMap(sortKey, options);
}

async begin() {
if (this._storage) {
return this._storage.begin();
}
}

async commit(): Promise<void> {
if (this._storage) {
if (this._transaction.dryRun) {
Expand Down

0 comments on commit cc47e63

Please sign in to comment.