Skip to content

Commit

Permalink
Refactored SQLite AppendToStream to use messages instead of events
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Feb 27, 2025
1 parent 2c42eec commit b603a9d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ BEGIN
RENAME COLUMN event_type TO message_type;
ALTER TABLE emt_messages
RENAME COLUMN event_id TO message_id;
ALTER TABLE emt_messages
ADD COLUMN message_kind CHAR(1) NOT NULL DEFAULT 'E';
-- Rename sequence if it exists
IF EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'emt_global_event_position') THEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
assertIsNotNull,
assertTrue,
type Event,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { after, before, describe, it } from 'node:test';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -268,8 +269,8 @@ void describe('appendEvent', () => {
let grabbedEvents: Event[] = [];

await appendToStream(db, streamId, 'shopping_cart', events, {
preCommitHook: (events: Event[]): void => {
grabbedEvents = events;
preCommitHook: (messages: RecordedMessage[]): void => {
grabbedEvents = messages.filter((m) => m.kind === 'Event');
},
});

Expand All @@ -281,7 +282,7 @@ void describe('appendEvent', () => {

try {
await appendToStream(db, streamId, 'shopping_cart', events, {
preCommitHook: (_: Event[]): void => {
preCommitHook: (_: RecordedMessage[]): void => {
throw new Error('fake error');
},
});
Expand Down
85 changes: 46 additions & 39 deletions src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import {
STREAM_DOES_NOT_EXIST,
STREAM_EXISTS,
type AppendToStreamOptions,
type Event,
type ExpectedStreamVersion,
type ReadEvent,
type Event as Message,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { v4 as uuid } from 'uuid';
import {
Expand All @@ -29,41 +29,48 @@ export const appendToStream = async (
db: SQLiteConnection,
streamName: string,
streamType: string,
events: Event[],
messages: Message[],
options?: AppendToStreamOptions & {
partition?: string;
preCommitHook?: (events: Event[]) => void;
preCommitHook?: (events: RecordedMessage[]) => void;
},
): Promise<AppendEventResult> => {
if (events.length === 0) return { success: false };
if (messages.length === 0) return { success: false };

const expectedStreamVersion = toExpectedVersion(
options?.expectedStreamVersion,
);

const eventsToAppend: ReadEvent[] = events.map(
(e: Event, i: number): ReadEvent => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
streamName,
messageId: uuid(),
streamPosition: BigInt(i + 1),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
}),
const messagesToAppend: RecordedMessage[] = messages.map(
(m: Message, i: number): RecordedMessage =>
({
...m,
kind: m.kind ?? 'Event',
metadata: {
streamName,
messageId: uuid(),
streamPosition: BigInt(i + 1),
...('metadata' in m ? (m.metadata ?? {}) : {}),
},
}) as RecordedMessage,
);

let result: AppendEventResult;

await db.command(`BEGIN TRANSACTION`);

try {
result = await appendEventsRaw(db, streamName, streamType, eventsToAppend, {
expectedStreamVersion,
});
result = await appendToStreamRaw(
db,
streamName,
streamType,
messagesToAppend,
{
expectedStreamVersion,
},
);

if (options?.preCommitHook) options.preCommitHook(eventsToAppend);
if (options?.preCommitHook) options.preCommitHook(messagesToAppend);
} catch (err: unknown) {
await db.command(`ROLLBACK`);
throw err;
Expand Down Expand Up @@ -95,11 +102,11 @@ const toExpectedVersion = (
return expected as bigint;
};

const appendEventsRaw = async (
const appendToStreamRaw = async (
db: SQLiteConnection,
streamId: string,
streamType: string,
events: ReadEvent[],
messages: RecordedMessage[],
options?: {
expectedStreamVersion: bigint | null;
partition?: string;
Expand Down Expand Up @@ -139,7 +146,7 @@ const appendEventsRaw = async (
`,
[
streamId,
events.length,
messages.length,
options?.partition ?? streamsTable.columns.partition,
streamType,
],
Expand All @@ -156,7 +163,7 @@ const appendEventsRaw = async (
RETURNING stream_position;
`,
[
events.length,
messages.length,
streamId,
options?.partition ?? streamsTable.columns.partition,
],
Expand All @@ -171,16 +178,16 @@ const appendEventsRaw = async (

if (expectedStreamVersion != null) {
const expectedStreamPositionAfterSave =
BigInt(expectedStreamVersion) + BigInt(events.length);
BigInt(expectedStreamVersion) + BigInt(messages.length);
if (streamPosition !== expectedStreamPositionAfterSave) {
return {
success: false,
};
}
}

const { sqlString, values } = buildEventInsertQuery(
events,
const { sqlString, values } = buildMessageInsertQuery(
messages,
expectedStreamVersion,
streamId,
options?.partition?.toString() ?? defaultTag,
Expand Down Expand Up @@ -234,41 +241,41 @@ async function getLastStreamPosition(
return expectedStreamVersion;
}

const buildEventInsertQuery = (
events: ReadEvent[],
const buildMessageInsertQuery = (
messages: RecordedMessage[],
expectedStreamVersion: bigint,
streamId: string,
partition: string | null | undefined,
): {
sqlString: string;
values: Parameters[];
} => {
const query = events.reduce(
const query = messages.reduce(
(
queryBuilder: { parameterMarkers: string[]; values: Parameters[] },
event: ReadEvent,
message: RecordedMessage,
) => {
if (
event.metadata?.streamPosition == null ||
typeof event.metadata.streamPosition !== 'bigint'
message.metadata?.streamPosition == null ||
typeof message.metadata.streamPosition !== 'bigint'
) {
throw new Error('Stream position is required');
}

const streamPosition =
BigInt(event.metadata.streamPosition) + BigInt(expectedStreamVersion);
BigInt(message.metadata.streamPosition) + BigInt(expectedStreamVersion);

queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?,?)`);
queryBuilder.values.push(
streamId,
streamPosition.toString() ?? 0,
partition ?? defaultTag,
event.kind === 'Event' ? 'E' : 'C',
JSONParser.stringify(event.data),
JSONParser.stringify(event.metadata),
message.kind === 'Event' ? 'E' : 'C',
JSONParser.stringify(message.data),
JSONParser.stringify(message.metadata),
expectedStreamVersion?.toString() ?? 0,
event.type,
event.metadata.messageId,
message.type,
message.metadata.messageId,
false,
);

Expand Down

0 comments on commit b603a9d

Please sign in to comment.