diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts new file mode 100644 index 00000000..4051c32d --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -0,0 +1,228 @@ +import { + assertEqual, + assertFalse, + assertIsNotNull, + assertTrue, + type Event, +} from '@event-driven-io/emmett'; +import { after, before, describe, it } from 'node:test'; +import sqlite3 from 'sqlite3'; +import { v4 as uuid } from 'uuid'; +import { createEventStoreSchema } from '.'; +import { dbConn, type SQLiteConnection } from '../../sqliteConnection'; +import { appendToStream } from './appendToStream'; + +export type PricedProductItem = { + productId: string; + quantity: number; + price: number; +}; + +export type ShoppingCart = { + productItems: PricedProductItem[]; + totalAmount: number; +}; + +export type ProductItemAdded = Event< + 'ProductItemAdded', + { productItem: PricedProductItem } +>; +export type DiscountApplied = Event<'DiscountApplied', { percent: number }>; + +export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; + +void describe('appendEvent', () => { + let db: SQLiteConnection; + let conn: sqlite3.Database; + + before(async () => { + conn = new sqlite3.Database(':memory:'); + + db = dbConn(conn); + await createEventStoreSchema(db); + }); + + after(() => { + conn.close(); + }); + + const events: ShoppingCartEvent[] = [ + { + type: 'ProductItemAdded', + data: { productItem: { productId: '1', quantity: 2, price: 30 } }, + metadata: { meta: 'data1' }, + }, + { + type: 'DiscountApplied', + data: { percent: 10 }, + metadata: { meta: 'data2' }, + }, + ]; + + void it('should append events correctly', async () => { + const result = await appendToStream(db, uuid(), 'shopping_cart', events, { + expectedStreamVersion: 0n, + }); + + assertTrue(result.success); + assertEqual(result.nextStreamPosition, 2n); + assertIsNotNull(result.lastGlobalPosition); + assertTrue(result.lastGlobalPosition > 0n); + }); + + void it('should append events correctly without expected stream position', async () => { + const result = await appendToStream( + db, + uuid(), + 'shopping_cart', + events, + {}, + ); + + assertTrue(result.success); + assertEqual(result.nextStreamPosition, 2n); + assertIsNotNull(result.lastGlobalPosition); + assertTrue(result.lastGlobalPosition > 0n); + }); + + void it('should append events correctly without optimistic concurrency', async () => { + const streamId = uuid(); + await appendToStream(db, streamId, 'shopping_cart', events); + const result = await appendToStream(db, streamId, 'shopping_cart', events); + const resultEvents = await db.query( + 'SELECT * FROM emt_events WHERE stream_id = $1', + [streamId], + ); + + assertEqual(4, resultEvents.length); + assertTrue(result.success); + }); + + void it('should handle stream position conflict correctly when two streams are created', async () => { + // Given + const streamId = uuid(); + + const firstResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion: 0n, + }, + ); + assertTrue(firstResult.success); + + // When + const secondResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion: 0n, + }, + ); + + // Then + assertFalse(secondResult.success); + + const resultEvents = await db.query( + 'SELECT * FROM emt_events WHERE stream_id = $1', + [streamId], + ); + + assertEqual(events.length, resultEvents.length); + }); + + void it('should handle stream position conflict correctly when version mismatches', async () => { + // Given + const streamId = uuid(); + + const creationResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + ); + assertTrue(creationResult.success); + const expectedStreamVersion = creationResult.nextStreamPosition; + + const firstResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion, + }, + ); + + assertTrue(firstResult.success); + + // When + const secondResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion, + }, + ); + + // Then + assertFalse(secondResult.success); + + const resultEvents = await db.query( + 'SELECT * FROM emt_events WHERE stream_id = $1', + [streamId], + ); + + assertEqual(events.length * 2, resultEvents.length); + }); + + void it('should not have stream position conflict when version matches', async () => { + // Given + const streamId = uuid(); + const expectedStreamVersion = 0n; + + const firstResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion, + }, + ); + assertTrue(firstResult.success); + + // When + const secondResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion: firstResult.nextStreamPosition, + }, + ); + + // Then + assertTrue(secondResult.success); + + const resultEvents = await db.query( + 'SELECT * FROM emt_events WHERE stream_id = $1', + [streamId], + ); + + assertEqual(events.length * 2, resultEvents.length); + }); + + void it('should handle appending an empty events array gracefully', async () => { + const result = await appendToStream(db, uuid(), 'shopping_cart', []); + + assertFalse(result.success); + }); +}); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts new file mode 100644 index 00000000..29bd1fe1 --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts @@ -0,0 +1,213 @@ +import { + JSONParser, + NO_CONCURRENCY_CHECK, + STREAM_DOES_NOT_EXIST, + STREAM_EXISTS, + type AppendToStreamOptions, + type Event, + type ExpectedStreamVersion, + type ReadEvent, +} from '@event-driven-io/emmett'; +import { v4 as uuid } from 'uuid'; +import { + isSQLiteError, + SQLiteError, + type Parameters, + type SQLiteConnection, +} from '../../sqliteConnection'; +import { defaultTag, eventsTable, streamsTable } from './typing'; + +export type AppendEventResult = + | { + success: true; + nextStreamPosition: bigint; + lastGlobalPosition: bigint; + } + | { success: false }; + +export const appendToStream = async ( + db: SQLiteConnection, + streamName: string, + streamType: string, + events: Event[], + options?: AppendToStreamOptions & { + partition?: string; + }, +): Promise => { + if (events.length === 0) return { success: false }; + + const expectedStreamVersion = toExpectedVersion( + options?.expectedStreamVersion, + ); + + const eventsToAppend: ReadEvent[] = events.map( + (e: Event, i: number): ReadEvent => ({ + ...e, + metadata: { + streamName, + eventId: uuid(), + streamPosition: BigInt(i + 1), + ...e.metadata, + }, + }), + ); + + let result: AppendEventResult; + + await db.command(`BEGIN TRANSACTION`); + + try { + result = await appendEventsRaw(db, streamName, streamType, eventsToAppend, { + expectedStreamVersion, + }); + } catch (err: unknown) { + await db.command(`ROLLBACK`); + throw err; + } + + if (result.success == null || !result.success) { + await db.command(`ROLLBACK`); + return result; + } + + await db.command(`COMMIT`); + + return result; +}; + +const toExpectedVersion = ( + expected: ExpectedStreamVersion | undefined, +): bigint | null => { + if (expected === undefined) return null; + + if (expected === NO_CONCURRENCY_CHECK) return null; + + // TODO: this needs to be fixed + if (expected == STREAM_DOES_NOT_EXIST) return null; + + // TODO: this needs to be fixed + if (expected == STREAM_EXISTS) return null; + + return expected as bigint; +}; + +const appendEventsRaw = async ( + db: SQLiteConnection, + streamId: string, + streamType: string, + events: ReadEvent[], + options?: { + expectedStreamVersion: bigint | null; + partition?: string; + }, +): Promise => { + let streamPosition; + let globalPosition; + try { + let expectedStreamVersion = options?.expectedStreamVersion ?? null; + + if (expectedStreamVersion == null) { + expectedStreamVersion = await getLastStreamPosition( + db, + streamId, + expectedStreamVersion, + ); + } + + const buildQuery = `INSERT INTO ${eventsTable.name} (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, is_archived) VALUES `; + + const query = events.reduce( + ( + queryBuilder: { + sql: string[]; + values: Parameters[]; + }, + e: ReadEvent, + ) => { + const streamPosition = + e.metadata.streamPosition + expectedStreamVersion; + + queryBuilder.sql.push(`(?,?,?,?,?,?,?,?,?)`); + queryBuilder.values.push( + streamId, + streamPosition.toString(), + options?.partition?.toString() ?? defaultTag, + JSONParser.stringify(e.data), + JSONParser.stringify({ streamType: streamType, ...e.metadata }), + expectedStreamVersion?.toString() ?? 0, + e.type, + e.metadata.eventId, + false, + ); + + return queryBuilder; + }, + { + sql: [], + values: [], + }, + ); + + const sqlString = buildQuery + query.sql.join(', '); + + await db.command(sqlString, query.values); + + const positions = await db.querySingle<{ + stream_position: string; + global_position: string; + } | null>( + ` + SELECT + CAST(stream_position AS VARCHAR) AS stream_position, + CAST(global_position AS VARCHAR) AS global_position + FROM ${eventsTable.name} + WHERE stream_id = ? + ORDER BY stream_position DESC + LIMIT 1`, + [streamId], + ); + + if (positions == null) { + throw new Error('Could not find stream positions'); + } + + streamPosition = BigInt(positions.stream_position); + globalPosition = BigInt(positions.global_position); + } catch (err: unknown) { + if (isSQLiteError(err) && isOptimisticConcurrencyError(err)) { + return { + success: false, + }; + } + + throw err; + } + + return { + success: true, + nextStreamPosition: streamPosition, + lastGlobalPosition: globalPosition, + }; +}; + +const isOptimisticConcurrencyError = (error: SQLiteError): boolean => { + return error?.errno !== undefined && error.errno === 19; +}; + +async function getLastStreamPosition( + db: SQLiteConnection, + streamId: string, + expectedStreamVersion: bigint | null, +) { + const result = await db.querySingle<{ stream_position: string } | null>( + `SELECT CAST(MAX(stream_position) AS VARCHAR) AS stream_position FROM ${streamsTable.name} WHERE stream_id = ?`, + [streamId], + ); + + if (result?.stream_position == null) { + expectedStreamVersion = 0n; + } else { + expectedStreamVersion = BigInt(result.stream_position); + } + return expectedStreamVersion; +} diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts index 8f6ddea0..21f639c9 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts @@ -1,9 +1,13 @@ import { type SQLiteConnection } from '../../sqliteConnection'; -import { eventsTableSQL, streamsTableSQL } from './tables'; +import { eventsTableSQL, eventStreamTrigger, streamsTableSQL } from './tables'; export * from './tables'; -export const schemaSQL: string[] = [streamsTableSQL, eventsTableSQL]; +export const schemaSQL: string[] = [ + streamsTableSQL, + eventsTableSQL, + eventStreamTrigger, +]; export const createEventStoreSchema = async ( db: SQLiteConnection, @@ -13,6 +17,8 @@ export const createEventStoreSchema = async ( await db.command(sql); } catch (error) { console.log(error); + + return; } } }; diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts index 1523f412..f60eb640 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts @@ -32,3 +32,28 @@ export const eventsTableSQL = sql( ); `, ); + +export const eventStreamTrigger = sql( + ` + CREATE TRIGGER emt_global_event_position + AFTER INSERT ON ${eventsTable.name} + FOR EACH ROW + BEGIN + INSERT INTO ${streamsTable.name} + (stream_id, stream_position, partition, stream_type, stream_metadata, is_archived) + VALUES ( + NEW.stream_id, + 1, + NEW.partition, + json_extract(NEW.event_metadata, '$.streamType'), + '[]', + NEW.is_archived + ) + ON CONFLICT(stream_id, partition, is_archived) + DO UPDATE SET stream_position=stream_position + 1; + + UPDATE ${eventsTable.name} + SET global_position = IFNULL((SELECT MAX(global_position) from ${eventsTable.name})+1, 1) + WHERE (stream_id, stream_position, partition, is_archived) = (NEW.stream_id, NEW.stream_position, NEW.partition, NEW.is_archived); + END;`, +); diff --git a/src/packages/emmett-sqlite/src/sqliteConnection.ts b/src/packages/emmett-sqlite/src/sqliteConnection.ts index 0a95d1b6..b48125e8 100644 --- a/src/packages/emmett-sqlite/src/sqliteConnection.ts +++ b/src/packages/emmett-sqlite/src/sqliteConnection.ts @@ -1,6 +1,6 @@ import type sqlite3 from 'sqlite3'; -type Parameters = object | string | number | null; +export type Parameters = object | string | bigint | number | boolean | null; export type SQLiteConnection = { command: (sql: string, values?: Parameters[]) => Promise; @@ -8,6 +8,18 @@ export type SQLiteConnection = { querySingle: (sql: string, values?: Parameters[]) => Promise; }; +export interface SQLiteError extends Error { + errno: number; +} + +export const isSQLiteError = (error: unknown): error is SQLiteError => { + if (error instanceof Error && 'code' in error) { + return true; + } + + return false; +}; + export const dbConn = (conn: sqlite3.Database): SQLiteConnection => { const db = conn;