diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts index c75d8b9f..194b90c7 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts @@ -263,7 +263,9 @@ const appendEventsRaw = ( .join(','), messages.map(() => `'1'`).join(','), messages.map((e) => sql('%L', e.type)).join(','), - messages.map((e) => sql('%L', e.kind)).join(','), + messages + .map((e) => sql('%L', e.kind === 'Event' ? 'E' : 'C')) + .join(','), streamId, streamType, options?.expectedStreamVersion ?? 'NULL', diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts index 11c1932b..1ebe58b6 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts @@ -28,7 +28,7 @@ export const messagesTableSQL = rawSql( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL, partition TEXT NOT NULL DEFAULT '${globalTag}', - message_kind TEXT NOT NULL DEFAULT 'Event', + message_kind CHAR(1) NOT NULL DEFAULT 'E', message_data JSONB NOT NULL, message_metadata JSONB NOT NULL, message_schema_version TEXT NOT NULL, diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts index c372f9a7..bf907d76 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts @@ -183,7 +183,6 @@ const appendEventsRaw = async ( events, expectedStreamVersion, streamId, - streamType, options?.partition?.toString() ?? defaultTag, ); @@ -239,7 +238,6 @@ const buildEventInsertQuery = ( events: ReadEvent[], expectedStreamVersion: bigint, streamId: string, - streamType: string, partition: string | null | undefined, ): { sqlString: string; @@ -260,11 +258,12 @@ const buildEventInsertQuery = ( const streamPosition = BigInt(event.metadata.streamPosition) + BigInt(expectedStreamVersion); - queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?)`); + queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?,?)`); queryBuilder.values.push( streamId, streamPosition.toString() ?? 0, partition ?? defaultTag, + event.kind === 'Event' ? 'E' : 'C', JSONParser.stringify(event.data), JSONParser.stringify(event.metadata), expectedStreamVersion?.toString() ?? 0, @@ -286,6 +285,7 @@ const buildEventInsertQuery = ( stream_id, stream_position, partition, + message_kind, message_data, message_metadata, message_schema_version, diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts index 74fbf7f7..18d10afe 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts @@ -20,7 +20,7 @@ export const messagesTableSQL = sql( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL, partition TEXT NOT NULL DEFAULT '${globalTag}', - message_kind TEXT NOT NULL DEFAULT 'Event', + message_kind CHAR(1) NOT NULL DEFAULT 'E', message_data JSONB NOT NULL, message_metadata JSONB NOT NULL, message_schema_version TEXT NOT NULL,