From cc0fd382b6cd4b64c3626de6b1e48ed41dddffb0 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 2 Jan 2025 20:09:11 +0000 Subject: [PATCH] Added extra check for too high of a expected stream position and moved global positioning to be auto increment in sqlite --- .../schema/appendToStream.int.spec.ts | 37 +++++ .../src/eventStore/schema/appendToStream.ts | 128 ++++++++++++------ .../src/eventStore/schema/index.ts | 6 +- .../src/eventStore/schema/tables.ts | 8 +- 4 files changed, 127 insertions(+), 52 deletions(-) diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts index 4051c32d..b00a6a01 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -98,6 +98,43 @@ void describe('appendEvent', () => { assertTrue(result.success); }); + void it('should handle stream position if expected version is too high', async () => { + // Given + const streamId = uuid(); + + const firstResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion: 0n, + }, + ); + assertTrue(firstResult.success); + + // When + const secondResult = await appendToStream( + db, + streamId, + 'shopping_cart', + events, + { + expectedStreamVersion: 4n, + }, + ); + + // Then + assertFalse(secondResult.success); + + const resultEvents = await db.query( + 'SELECT * FROM emt_events WHERE stream_id = $1', + [streamId], + ); + + assertEqual(events.length, resultEvents.length); + }); + void it('should handle stream position conflict correctly when two streams are created', async () => { // Given const streamId = uuid(); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts index 29bd1fe1..1267c414 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts @@ -103,6 +103,7 @@ const appendEventsRaw = async ( ): Promise => { let streamPosition; let globalPosition; + try { let expectedStreamVersion = options?.expectedStreamVersion ?? null; @@ -114,56 +115,33 @@ const appendEventsRaw = async ( ); } - const buildQuery = `INSERT INTO ${eventsTable.name} (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, is_archived) VALUES `; - - const query = events.reduce( - ( - queryBuilder: { - sql: string[]; - values: Parameters[]; - }, - e: ReadEvent, - ) => { - const streamPosition = - e.metadata.streamPosition + expectedStreamVersion; - - queryBuilder.sql.push(`(?,?,?,?,?,?,?,?,?)`); - queryBuilder.values.push( - streamId, - streamPosition.toString(), - options?.partition?.toString() ?? defaultTag, - JSONParser.stringify(e.data), - JSONParser.stringify({ streamType: streamType, ...e.metadata }), - expectedStreamVersion?.toString() ?? 0, - e.type, - e.metadata.eventId, - false, - ); - - return queryBuilder; - }, - { - sql: [], - values: [], - }, + const { sqlString, values } = buildEventInsertQuery( + events, + expectedStreamVersion, + streamId, + streamType, + options?.partition?.toString() ?? defaultTag, ); - const sqlString = buildQuery + query.sql.join(', '); + const returningId = await db.querySingle<{ global_position: string }>( + sqlString, + values, + ); - await db.command(sqlString, query.values); + if (returningId?.global_position == null) { + throw new Error('Could not find global position'); + } + + globalPosition = BigInt(returningId.global_position); const positions = await db.querySingle<{ stream_position: string; - global_position: string; } | null>( ` SELECT - CAST(stream_position AS VARCHAR) AS stream_position, - CAST(global_position AS VARCHAR) AS global_position - FROM ${eventsTable.name} - WHERE stream_id = ? - ORDER BY stream_position DESC - LIMIT 1`, + CAST(stream_position AS VARCHAR) AS stream_position + FROM ${streamsTable.name} + WHERE stream_id = ?`, [streamId], ); @@ -172,7 +150,16 @@ const appendEventsRaw = async ( } streamPosition = BigInt(positions.stream_position); - globalPosition = BigInt(positions.global_position); + + if (expectedStreamVersion != null) { + const expectedStreamPositionAfterSave = + BigInt(expectedStreamVersion) + BigInt(events.length); + if (streamPosition !== expectedStreamPositionAfterSave) { + return { + success: false, + }; + } + } } catch (err: unknown) { if (isSQLiteError(err) && isOptimisticConcurrencyError(err)) { return { @@ -211,3 +198,60 @@ async function getLastStreamPosition( } return expectedStreamVersion; } + +const buildEventInsertQuery = ( + events: Event[], + expectedStreamVersion: bigint | null, + streamId: string, + streamType: string, + partition: string | null | undefined, +) => { + const query = events.reduce( + ( + queryBuilder: { + parameterMarkers: string[]; + values: Parameters[]; + }, + e: ReadEvent, + ) => { + const streamPosition = e.metadata.streamPosition + expectedStreamVersion; + + queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?)`); + queryBuilder.values.push( + streamId, + streamPosition.toString(), + partition ?? defaultTag, + JSONParser.stringify(e.data), + JSONParser.stringify({ streamType: streamType, ...e.metadata }), + expectedStreamVersion?.toString() ?? 0, + e.type, + e.metadata.eventId, + false, + ); + + return queryBuilder; + }, + { + parameterMarkers: [], + values: [], + }, + ); + + const sqlString = ` + INSERT INTO ${eventsTable.name} ( + stream_id, + stream_position, + partition, + event_data, + event_metadata, + event_schema_version, + event_type, + event_id, + is_archived + ) + VALUES ${query.parameterMarkers.join(', ')} + RETURNING + CAST(global_position as VARCHAR) AS global_position + `; + return { sqlString, values: query.values }; +}; diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts index 21f639c9..bbbd95b7 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts @@ -15,10 +15,8 @@ export const createEventStoreSchema = async ( for (const sql of schemaSQL) { try { await db.command(sql); - } catch (error) { - console.log(error); - - return; + } catch (err: unknown) { + throw err; } } }; diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts index f60eb640..d602c1e6 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts @@ -26,9 +26,9 @@ export const eventsTableSQL = sql( event_type TEXT NOT NULL, event_id TEXT NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, - global_position BIGINT , + global_position INTEGER PRIMARY KEY, created DATETIME DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (stream_id, stream_position, partition, is_archived) + UNIQUE (stream_id, stream_position, partition, is_archived) ); `, ); @@ -51,9 +51,5 @@ export const eventStreamTrigger = sql( ) ON CONFLICT(stream_id, partition, is_archived) DO UPDATE SET stream_position=stream_position + 1; - - UPDATE ${eventsTable.name} - SET global_position = IFNULL((SELECT MAX(global_position) from ${eventsTable.name})+1, 1) - WHERE (stream_id, stream_position, partition, is_archived) = (NEW.stream_id, NEW.stream_position, NEW.partition, NEW.is_archived); END;`, );