From 78d30e1ef126b91316ff97a320a4a21339e4ca32 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 27 Feb 2025 20:55:20 +0100 Subject: [PATCH] Refactored SQLite AppendToStream to use messages instead of events --- .../src/eventStore/schema/tables.ts | 6 +- .../schema/appendToStream.int.spec.ts | 7 +- .../src/eventStore/schema/appendToStream.ts | 85 ++++++++++--------- 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts index 1ebe58b6..5f7d5145 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts @@ -22,7 +22,7 @@ export const streamsTableSQL = rawSql( export const messagesTableSQL = rawSql( ` - CREATE SEQUENCE IF NOT EXISTS emt_global_event_position; + CREATE SEQUENCE IF NOT EXISTS emt_global_message_position; CREATE TABLE IF NOT EXISTS ${messagesTable.name}( stream_id TEXT NOT NULL, @@ -35,7 +35,7 @@ export const messagesTableSQL = rawSql( message_type TEXT NOT NULL, message_id TEXT NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, - global_position BIGINT DEFAULT nextval('emt_global_event_position'), + global_position BIGINT DEFAULT nextval('emt_global_message_position'), transaction_id XID8 NOT NULL, created TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (stream_id, stream_position, partition, is_archived) @@ -347,6 +347,8 @@ BEGIN RENAME COLUMN event_type TO message_type; ALTER TABLE emt_messages RENAME COLUMN event_id TO message_id; + ALTER TABLE emt_messages + ADD COLUMN message_kind CHAR(1) NOT NULL DEFAULT 'E'; -- Rename sequence if it exists IF EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'emt_global_event_position') THEN diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts index 0f5911e4..f655add2 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -4,6 +4,7 @@ import { assertIsNotNull, assertTrue, type Event, + type RecordedMessage, } from '@event-driven-io/emmett'; import { after, before, describe, it } from 'node:test'; import { v4 as uuid } from 'uuid'; @@ -268,8 +269,8 @@ void describe('appendEvent', () => { let grabbedEvents: Event[] = []; await appendToStream(db, streamId, 'shopping_cart', events, { - preCommitHook: (events: Event[]): void => { - grabbedEvents = events; + preCommitHook: (messages: RecordedMessage[]): void => { + grabbedEvents = messages.filter((m) => m.kind === 'Event'); }, }); @@ -281,7 +282,7 @@ void describe('appendEvent', () => { try { await appendToStream(db, streamId, 'shopping_cart', events, { - preCommitHook: (_: Event[]): void => { + preCommitHook: (_: RecordedMessage[]): void => { throw new Error('fake error'); }, }); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts index bf907d76..247978d1 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts @@ -4,9 +4,9 @@ import { STREAM_DOES_NOT_EXIST, STREAM_EXISTS, type AppendToStreamOptions, - type Event, type ExpectedStreamVersion, - type ReadEvent, + type Event as Message, + type RecordedMessage, } from '@event-driven-io/emmett'; import { v4 as uuid } from 'uuid'; import { @@ -29,29 +29,30 @@ export const appendToStream = async ( db: SQLiteConnection, streamName: string, streamType: string, - events: Event[], + messages: Message[], options?: AppendToStreamOptions & { partition?: string; - preCommitHook?: (events: Event[]) => void; + preCommitHook?: (events: RecordedMessage[]) => void; }, ): Promise => { - if (events.length === 0) return { success: false }; + if (messages.length === 0) return { success: false }; const expectedStreamVersion = toExpectedVersion( options?.expectedStreamVersion, ); - const eventsToAppend: ReadEvent[] = events.map( - (e: Event, i: number): ReadEvent => ({ - ...e, - kind: e.kind ?? 'Event', - metadata: { - streamName, - messageId: uuid(), - streamPosition: BigInt(i + 1), - ...('metadata' in e ? (e.metadata ?? {}) : {}), - }, - }), + const messagesToAppend: RecordedMessage[] = messages.map( + (m: Message, i: number): RecordedMessage => + ({ + ...m, + kind: m.kind ?? 'Event', + metadata: { + streamName, + messageId: uuid(), + streamPosition: BigInt(i + 1), + ...('metadata' in m ? (m.metadata ?? {}) : {}), + }, + }) as RecordedMessage, ); let result: AppendEventResult; @@ -59,11 +60,17 @@ export const appendToStream = async ( await db.command(`BEGIN TRANSACTION`); try { - result = await appendEventsRaw(db, streamName, streamType, eventsToAppend, { - expectedStreamVersion, - }); + result = await appendToStreamRaw( + db, + streamName, + streamType, + messagesToAppend, + { + expectedStreamVersion, + }, + ); - if (options?.preCommitHook) options.preCommitHook(eventsToAppend); + if (options?.preCommitHook) options.preCommitHook(messagesToAppend); } catch (err: unknown) { await db.command(`ROLLBACK`); throw err; @@ -95,11 +102,11 @@ const toExpectedVersion = ( return expected as bigint; }; -const appendEventsRaw = async ( +const appendToStreamRaw = async ( db: SQLiteConnection, streamId: string, streamType: string, - events: ReadEvent[], + messages: RecordedMessage[], options?: { expectedStreamVersion: bigint | null; partition?: string; @@ -139,7 +146,7 @@ const appendEventsRaw = async ( `, [ streamId, - events.length, + messages.length, options?.partition ?? streamsTable.columns.partition, streamType, ], @@ -156,7 +163,7 @@ const appendEventsRaw = async ( RETURNING stream_position; `, [ - events.length, + messages.length, streamId, options?.partition ?? streamsTable.columns.partition, ], @@ -171,7 +178,7 @@ const appendEventsRaw = async ( if (expectedStreamVersion != null) { const expectedStreamPositionAfterSave = - BigInt(expectedStreamVersion) + BigInt(events.length); + BigInt(expectedStreamVersion) + BigInt(messages.length); if (streamPosition !== expectedStreamPositionAfterSave) { return { success: false, @@ -179,8 +186,8 @@ const appendEventsRaw = async ( } } - const { sqlString, values } = buildEventInsertQuery( - events, + const { sqlString, values } = buildMessageInsertQuery( + messages, expectedStreamVersion, streamId, options?.partition?.toString() ?? defaultTag, @@ -234,8 +241,8 @@ async function getLastStreamPosition( return expectedStreamVersion; } -const buildEventInsertQuery = ( - events: ReadEvent[], +const buildMessageInsertQuery = ( + messages: RecordedMessage[], expectedStreamVersion: bigint, streamId: string, partition: string | null | undefined, @@ -243,32 +250,32 @@ const buildEventInsertQuery = ( sqlString: string; values: Parameters[]; } => { - const query = events.reduce( + const query = messages.reduce( ( queryBuilder: { parameterMarkers: string[]; values: Parameters[] }, - event: ReadEvent, + message: RecordedMessage, ) => { if ( - event.metadata?.streamPosition == null || - typeof event.metadata.streamPosition !== 'bigint' + message.metadata?.streamPosition == null || + typeof message.metadata.streamPosition !== 'bigint' ) { throw new Error('Stream position is required'); } const streamPosition = - BigInt(event.metadata.streamPosition) + BigInt(expectedStreamVersion); + BigInt(message.metadata.streamPosition) + BigInt(expectedStreamVersion); queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?,?)`); queryBuilder.values.push( streamId, streamPosition.toString() ?? 0, partition ?? defaultTag, - event.kind === 'Event' ? 'E' : 'C', - JSONParser.stringify(event.data), - JSONParser.stringify(event.metadata), + message.kind === 'Event' ? 'E' : 'C', + JSONParser.stringify(message.data), + JSONParser.stringify(message.metadata), expectedStreamVersion?.toString() ?? 0, - event.type, - event.metadata.messageId, + message.type, + message.metadata.messageId, false, );