Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renamed events to messages in the PostgreSQL and SQLite schemas and added message kind column #184

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Check also my blog articles on Emmett:
- [Testing Event Sourcing, Emmett edition](https://event-driven.io/en/testing_event_sourcing_emmett_edition/)
- [Event Sourcing on PostgreSQL in Node.js just became possible with Emmett](https://event-driven.io/en/emmett_postgresql_event_store/)
- [Writing and testing event-driven projections with Emmett, Pongo and PostgreSQL](https://event-driven.io/en/emmett_projections_testing/)
- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/)=
- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/)

## FAQ

Expand Down
2 changes: 0 additions & 2 deletions samples/webApi/expressjs-with-postgresql/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.5'

services:
app:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void describe('appendEvent', () => {
assertFalse(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length, resultEvents.rows.length);
Expand Down Expand Up @@ -178,7 +178,7 @@ void describe('appendEvent', () => {
assertFalse(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length * 2, resultEvents.rows.length);
Expand Down Expand Up @@ -215,7 +215,7 @@ void describe('appendEvent', () => {
assertTrue(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length * 2, resultEvents.rows.length);
Expand Down
128 changes: 67 additions & 61 deletions src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ import {
STREAM_DOES_NOT_EXIST,
STREAM_EXISTS,
type AppendToStreamOptions,
type Event,
type ExpectedStreamVersion,
type ReadEvent,
type Message,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { v4 as uuid } from 'uuid';
import { defaultTag, eventsTable, streamsTable } from './typing';
import { defaultTag, messagesTable, streamsTable } from './typing';

export const appendEventsSQL = rawSql(
`CREATE OR REPLACE FUNCTION emt_append_event(
v_event_ids text[],
v_events_data jsonb[],
v_events_metadata jsonb[],
v_event_schema_versions text[],
v_event_types text[],
export const appendToStreamSQL = rawSql(
`CREATE OR REPLACE FUNCTION emt_append_to_stream(
v_message_ids text[],
v_messages_data jsonb[],
v_messages_metadata jsonb[],
v_message_schema_versions text[],
v_message_types text[],
v_message_kinds text[],
v_stream_id text,
v_stream_type text,
v_expected_stream_position bigint DEFAULT NULL,
Expand All @@ -44,42 +45,15 @@ export const appendEventsSQL = rawSql(
v_transaction_id xid8;
v_last_global_position bigint;
BEGIN
v_transaction_id := pg_current_xact_id();

IF v_expected_stream_position IS NULL THEN
SELECT COALESCE(max(stream_position), 0) INTO v_expected_stream_position
FROM ${streamsTable.name}
WHERE stream_id = v_stream_id AND partition = v_partition;
END IF;

v_next_stream_position := v_expected_stream_position + array_upper(v_events_data, 1);
v_transaction_id := pg_current_xact_id();

WITH ev AS (
SELECT row_number() OVER () + v_expected_stream_position AS stream_position,
event_data,
event_metadata,
schema_version,
event_id,
event_type
FROM (
SELECT *
FROM
unnest(v_event_ids, v_events_data, v_events_metadata, v_event_schema_versions, v_event_types)
AS event(event_id, event_data, event_metadata, schema_version, event_type)
) AS event
),
all_events_insert AS (
INSERT INTO ${eventsTable.name}
(stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
SELECT
v_stream_id, ev.stream_position, v_partition, ev.event_data, ev.event_metadata, ev.schema_version, ev.event_type, ev.event_id, v_transaction_id
FROM ev
RETURNING global_position
)
SELECT
max(global_position) INTO v_last_global_position
FROM
all_events_insert;

v_next_stream_position := v_expected_stream_position + array_upper(v_messages_data, 1);

IF v_expected_stream_position = 0 THEN
INSERT INTO ${streamsTable.name}
Expand All @@ -99,13 +73,41 @@ export const appendEventsSQL = rawSql(
END IF;
END IF;

WITH ev AS (
SELECT row_number() OVER () + v_expected_stream_position AS stream_position,
message_data,
message_metadata,
schema_version,
message_id,
message_type,
message_kind
FROM (
SELECT *
FROM
unnest(v_message_ids, v_messages_data, v_messages_metadata, v_message_schema_versions, v_message_types, v_message_kinds)
AS message(message_id, message_data, message_metadata, schema_version, message_type, message_kind)
) AS message
),
all_messages_insert AS (
INSERT INTO ${messagesTable.name}
(stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_kind, message_id, transaction_id)
SELECT
v_stream_id, ev.stream_position, v_partition, ev.message_data, ev.message_metadata, ev.schema_version, ev.message_type, ev.message_kind, ev.message_id, v_transaction_id
FROM ev
RETURNING global_position
)
SELECT
max(global_position) INTO v_last_global_position
FROM
all_messages_insert;

RETURN QUERY SELECT TRUE, v_next_stream_position, v_last_global_position, v_transaction_id;
END;
$$;
`,
);

type AppendEventResult =
type AppendToStreamResult =
| {
success: true;
nextStreamPosition: bigint;
Expand All @@ -115,7 +117,7 @@ type AppendEventResult =
| { success: false };

export type AppendToStreamPreCommitHook = (
events: ReadEvent[],
messages: RecordedMessage[],
context: {
transaction: NodePostgresTransaction;
},
Expand All @@ -125,26 +127,26 @@ export const appendToStream = (
pool: NodePostgresPool,
streamName: string,
streamType: string,
events: Event[],
messages: Message[],
options?: AppendToStreamOptions & {
partition?: string;
preCommitHook?: AppendToStreamPreCommitHook;
},
): Promise<AppendEventResult> =>
pool.withTransaction<AppendEventResult>(async (transaction) => {
): Promise<AppendToStreamResult> =>
pool.withTransaction<AppendToStreamResult>(async (transaction) => {
const { execute } = transaction;

if (events.length === 0)
if (messages.length === 0)
return { success: false, result: { success: false } };

let appendResult: AppendEventSqlResult;
let appendResult: AppendToStreamSqlResult;

try {
const expectedStreamVersion = toExpectedVersion(
options?.expectedStreamVersion,
);

const eventsToAppend: ReadEvent[] = events.map((e, i) => ({
const messagesToAppend: RecordedMessage[] = messages.map((e, i) => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
Expand All @@ -153,21 +155,21 @@ export const appendToStream = (
streamPosition: BigInt(i),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
}));
})) as RecordedMessage[];

// TODO: return global positions from append raw and other generated data
appendResult = await appendEventsRaw(
execute,
streamName,
streamType,
eventsToAppend,
messagesToAppend,
{
expectedStreamVersion,
},
);

if (options?.preCommitHook)
await options.preCommitHook(eventsToAppend, { transaction });
await options.preCommitHook(messagesToAppend, { transaction });
} catch (error) {
if (!isOptimisticConcurrencyError(error)) throw error;

Expand Down Expand Up @@ -222,7 +224,7 @@ const toExpectedVersion = (
const isOptimisticConcurrencyError = (error: unknown): boolean =>
error instanceof Error && 'code' in error && error.code === '23505';

type AppendEventSqlResult = {
type AppendToStreamSqlResult = {
success: boolean;
next_stream_position: string | null;
last_global_position: string | null;
Expand All @@ -233,33 +235,37 @@ const appendEventsRaw = (
execute: SQLExecutor,
streamId: string,
streamType: string,
events: ReadEvent[],
messages: RecordedMessage[],
options?: {
expectedStreamVersion: bigint | null;
partition?: string;
},
): Promise<AppendEventSqlResult> =>
): Promise<AppendToStreamSqlResult> =>
single(
execute.command<AppendEventSqlResult>(
execute.command<AppendToStreamSqlResult>(
sql(
`SELECT * FROM emt_append_event(
`SELECT * FROM emt_append_to_stream(
ARRAY[%s]::text[],
ARRAY[%s]::jsonb[],
ARRAY[%s]::jsonb[],
ARRAY[%s]::text[],
ARRAY[%s]::text[],
ARRAY[%s]::text[],
%L::text,
%L::text,
%s::bigint,
%L::text
)`,
events.map((e) => sql('%L', e.metadata.messageId)).join(','),
events.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
events
messages.map((e) => sql('%L', e.metadata.messageId)).join(','),
messages.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
messages
.map((e) => sql('%L', JSONParser.stringify(e.metadata ?? {})))
.join(','),
events.map(() => `'1'`).join(','),
events.map((e) => sql('%L', e.type)).join(','),
messages.map(() => `'1'`).join(','),
messages.map((e) => sql('%L', e.type)).join(','),
messages
.map((e) => sql('%L', e.kind === 'Event' ? 'E' : 'C'))
.join(','),
streamId,
streamType,
options?.expectedStreamVersion ?? 'NULL',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,29 @@ void describe('createEventStoreSchema', () => {
});

void it('creates the events table', async () => {
assert.ok(await tableExists(pool, 'emt_events'));
assert.ok(await tableExists(pool, 'emt_messages'));
});

void it('creates the subscriptions table', async () => {
assert.ok(await tableExists(pool, 'emt_subscriptions'));
});

void it('creates the events default partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default'));
});

void it('creates the events secondary level active partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default_active'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default_active'));
});

void it('creates the events secondary level archived partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default_archived'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default_archived'));
});
});

void describe('creates functions', () => {
void it('creates the append_event function', async () => {
assert.ok(await functionExists(pool, 'emt_append_event'));
assert.ok(await functionExists(pool, 'emt_append_to_stream'));
});

void it('creates the emt_add_partition function', async () => {
Expand Down Expand Up @@ -96,7 +96,7 @@ void describe('createEventStoreSchema', () => {
rawSql(`
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = 'emt_events_test_module__global'
WHERE tablename = 'emt_messages_test_module__global'
) AS exists;
`),
),
Expand All @@ -115,7 +115,7 @@ void describe('createEventStoreSchema', () => {
rawSql(`
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = 'emt_events_test_module__test_tenant'
WHERE tablename = 'emt_messages_test_module__test_tenant'
) AS exists;`),
),
);
Expand All @@ -126,15 +126,15 @@ void describe('createEventStoreSchema', () => {
// void it('should allow adding a module for all tenants', async () => {
// await createEventStoreSchema(pool);

// await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
// await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
// VALUES ('test_stream', 0, 'global__global', '{}', '{}', '1.0', 'test', '${uuid()}', pg_current_xact_id())`);

// await pool.query(`SELECT add_module_for_all_tenants('new_module')`);

// const res = await pool.query(`
// SELECT EXISTS (
// SELECT FROM pg_tables
// WHERE tablename = 'emt_events_new_module__existing_tenant'
// WHERE tablename = 'emt_messages_new_module__existing_tenant'
// ) AS exists;
// `);

Expand All @@ -149,15 +149,15 @@ void describe('createEventStoreSchema', () => {
// void it('should allow adding a tenant for all modules', async () => {
// await createEventStoreSchema(pool);

// await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
// await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
// VALUES ('test_stream', 0, '${emmettPrefix}:partition:existing_module:existing_tenant', '{}', '{}', '1.0', 'test', '${uuid()}', 0)`);

// await pool.query(`SELECT add_tenant_for_all_modules('new_tenant')`);

// const res = await pool.query(`
// SELECT EXISTS (
// SELECT FROM pg_tables
// WHERE tablename = 'emt_events_existing_module_new_tenant'
// WHERE tablename = 'emt_messages_existing_module_new_tenant'
// ) AS exists;
// `);

Expand Down
Loading