Skip to content

Commit

Permalink
Updated PostgreSQL appendToStream function to store message kind
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Feb 11, 2025
1 parent fd7d492 commit 64a6603
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 61 deletions.
2 changes: 0 additions & 2 deletions samples/webApi/expressjs-with-postgresql/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.5'

services:
app:
build:
Expand Down
116 changes: 60 additions & 56 deletions src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import {
STREAM_DOES_NOT_EXIST,
STREAM_EXISTS,
type AppendToStreamOptions,
type Event,
type ExpectedStreamVersion,
type ReadEvent,
type Message,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { v4 as uuid } from 'uuid';
import { defaultTag, messagesTable, streamsTable } from './typing';

export const appendEventsSQL = rawSql(
`CREATE OR REPLACE FUNCTION emt_append_event(
export const appendToStreamSQL = rawSql(
`CREATE OR REPLACE FUNCTION emt_append_to_stream(
v_message_ids text[],
v_events_data jsonb[],
v_events_metadata jsonb[],
v_messages_data jsonb[],
v_messages_metadata jsonb[],
v_message_schema_versions text[],
v_message_types text[],
v_message_kinds text[],
v_stream_id text,
v_stream_type text,
v_expected_stream_position bigint DEFAULT NULL,
Expand All @@ -44,68 +45,69 @@ export const appendEventsSQL = rawSql(
v_transaction_id xid8;
v_last_global_position bigint;
BEGIN
v_transaction_id := pg_current_xact_id();
IF v_expected_stream_position IS NULL THEN
SELECT COALESCE(max(stream_position), 0) INTO v_expected_stream_position
FROM ${streamsTable.name}
WHERE stream_id = v_stream_id AND partition = v_partition;
END IF;
v_next_stream_position := v_expected_stream_position + array_upper(v_events_data, 1);
v_transaction_id := pg_current_xact_id();
v_next_stream_position := v_expected_stream_position + array_upper(v_messages_data, 1);
IF v_expected_stream_position = 0 THEN
INSERT INTO ${streamsTable.name}
(stream_id, stream_position, partition, stream_type, stream_metadata, is_archived)
VALUES
(v_stream_id, v_next_stream_position, v_partition, v_stream_type, '{}', FALSE);
ELSE
UPDATE ${streamsTable.name} as s
SET stream_position = v_next_stream_position
WHERE stream_id = v_stream_id AND stream_position = v_expected_stream_position AND partition = v_partition AND is_archived = FALSE;
get diagnostics v_updated_rows = row_count;
IF v_updated_rows = 0 THEN
RETURN QUERY SELECT FALSE, NULL::bigint, NULL::bigint, NULL::xid8;
RETURN;
END IF;
END IF;
WITH ev AS (
SELECT row_number() OVER () + v_expected_stream_position AS stream_position,
message_data,
message_metadata,
schema_version,
message_id,
message_type
message_type,
message_kind
FROM (
SELECT *
FROM
unnest(v_message_ids, v_events_data, v_events_metadata, v_message_schema_versions, v_message_types)
AS event(message_id, message_data, message_metadata, schema_version, message_type)
) AS event
unnest(v_message_ids, v_messages_data, v_messages_metadata, v_message_schema_versions, v_message_types, v_message_kinds)
AS message(message_id, message_data, message_metadata, schema_version, message_type, message_kind)
) AS message
),
all_events_insert AS (
all_messages_insert AS (
INSERT INTO ${messagesTable.name}
(stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
(stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_kind, message_id, transaction_id)
SELECT
v_stream_id, ev.stream_position, v_partition, ev.message_data, ev.message_metadata, ev.schema_version, ev.message_type, ev.message_id, v_transaction_id
v_stream_id, ev.stream_position, v_partition, ev.message_data, ev.message_metadata, ev.schema_version, ev.message_type, ev.message_kind, ev.message_id, v_transaction_id
FROM ev
RETURNING global_position
)
SELECT
max(global_position) INTO v_last_global_position
FROM
all_events_insert;
IF v_expected_stream_position = 0 THEN
INSERT INTO ${streamsTable.name}
(stream_id, stream_position, partition, stream_type, stream_metadata, is_archived)
VALUES
(v_stream_id, v_next_stream_position, v_partition, v_stream_type, '{}', FALSE);
ELSE
UPDATE ${streamsTable.name} as s
SET stream_position = v_next_stream_position
WHERE stream_id = v_stream_id AND stream_position = v_expected_stream_position AND partition = v_partition AND is_archived = FALSE;
get diagnostics v_updated_rows = row_count;
IF v_updated_rows = 0 THEN
RETURN QUERY SELECT FALSE, NULL::bigint, NULL::bigint, NULL::xid8;
RETURN;
END IF;
END IF;
all_messages_insert;
RETURN QUERY SELECT TRUE, v_next_stream_position, v_last_global_position, v_transaction_id;
END;
$$;
`,
);

type AppendEventResult =
type AppendToStreamResult =
| {
success: true;
nextStreamPosition: bigint;
Expand All @@ -115,7 +117,7 @@ type AppendEventResult =
| { success: false };

export type AppendToStreamPreCommitHook = (
events: ReadEvent[],
messages: RecordedMessage[],
context: {
transaction: NodePostgresTransaction;
},
Expand All @@ -125,26 +127,26 @@ export const appendToStream = (
pool: NodePostgresPool,
streamName: string,
streamType: string,
events: Event[],
messages: Message[],
options?: AppendToStreamOptions & {
partition?: string;
preCommitHook?: AppendToStreamPreCommitHook;
},
): Promise<AppendEventResult> =>
pool.withTransaction<AppendEventResult>(async (transaction) => {
): Promise<AppendToStreamResult> =>
pool.withTransaction<AppendToStreamResult>(async (transaction) => {
const { execute } = transaction;

if (events.length === 0)
if (messages.length === 0)
return { success: false, result: { success: false } };

let appendResult: AppendEventSqlResult;
let appendResult: AppendToStreamSqlResult;

try {
const expectedStreamVersion = toExpectedVersion(
options?.expectedStreamVersion,
);

const eventsToAppend: ReadEvent[] = events.map((e, i) => ({
const messagesToAppend: RecordedMessage[] = messages.map((e, i) => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
Expand All @@ -153,21 +155,21 @@ export const appendToStream = (
streamPosition: BigInt(i),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
}));
})) as RecordedMessage[];

// TODO: return global positions from append raw and other generated data
appendResult = await appendEventsRaw(
execute,
streamName,
streamType,
eventsToAppend,
messagesToAppend,
{
expectedStreamVersion,
},
);

if (options?.preCommitHook)
await options.preCommitHook(eventsToAppend, { transaction });
await options.preCommitHook(messagesToAppend, { transaction });
} catch (error) {
if (!isOptimisticConcurrencyError(error)) throw error;

Expand Down Expand Up @@ -222,7 +224,7 @@ const toExpectedVersion = (
const isOptimisticConcurrencyError = (error: unknown): boolean =>
error instanceof Error && 'code' in error && error.code === '23505';

type AppendEventSqlResult = {
type AppendToStreamSqlResult = {
success: boolean;
next_stream_position: string | null;
last_global_position: string | null;
Expand All @@ -233,33 +235,35 @@ const appendEventsRaw = (
execute: SQLExecutor,
streamId: string,
streamType: string,
events: ReadEvent[],
messages: RecordedMessage[],
options?: {
expectedStreamVersion: bigint | null;
partition?: string;
},
): Promise<AppendEventSqlResult> =>
): Promise<AppendToStreamSqlResult> =>
single(
execute.command<AppendEventSqlResult>(
execute.command<AppendToStreamSqlResult>(
sql(
`SELECT * FROM emt_append_event(
`SELECT * FROM emt_append_to_stream(
ARRAY[%s]::text[],
ARRAY[%s]::jsonb[],
ARRAY[%s]::jsonb[],
ARRAY[%s]::text[],
ARRAY[%s]::text[],
ARRAY[%s]::text[],
%L::text,
%L::text,
%s::bigint,
%L::text
)`,
events.map((e) => sql('%L', e.metadata.messageId)).join(','),
events.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
events
messages.map((e) => sql('%L', e.metadata.messageId)).join(','),
messages.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
messages
.map((e) => sql('%L', JSONParser.stringify(e.metadata ?? {})))
.join(','),
events.map(() => `'1'`).join(','),
events.map((e) => sql('%L', e.type)).join(','),
messages.map(() => `'1'`).join(','),
messages.map((e) => sql('%L', e.type)).join(','),
messages.map((e) => sql('%L', e.kind)).join(','),
streamId,
streamType,
options?.expectedStreamVersion ?? 'NULL',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void describe('createEventStoreSchema', () => {

void describe('creates functions', () => {
void it('creates the append_event function', async () => {
assert.ok(await functionExists(pool, 'emt_append_event'));
assert.ok(await functionExists(pool, 'emt_append_to_stream'));
});

void it('creates the emt_add_partition function', async () => {
Expand Down
4 changes: 2 additions & 2 deletions src/packages/emmett-postgresql/src/eventStore/schema/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { type NodePostgresPool, type SQL } from '@event-driven-io/dumbo';
import { appendEventsSQL } from './appendToStream';
import { appendToStreamSQL } from './appendToStream';
import { storeSubscriptionCheckpointSQL } from './storeSubscriptionCheckpoint';
import {
addDefaultPartitionSQL,
Expand Down Expand Up @@ -37,7 +37,7 @@ export const schemaSQL: SQL[] = [
addTenantSQL,
addModuleForAllTenantsSQL,
addTenantForAllModulesSQL,
appendEventsSQL,
appendToStreamSQL,
addDefaultPartitionSQL,
storeSubscriptionCheckpointSQL,
];
Expand Down

0 comments on commit 64a6603

Please sign in to comment.