Skip to content

Commit

Permalink
Renamed events to messages in the PostgreSQL and SQLite schemas and a…
Browse files Browse the repository at this point in the history
…dded message kind column
  • Loading branch information
oskardudycz committed Feb 8, 2025
1 parent 5296777 commit fc7cfcf
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 131 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Check also my blog articles on Emmett:
- [Testing Event Sourcing, Emmett edition](https://event-driven.io/en/testing_event_sourcing_emmett_edition/)
- [Event Sourcing on PostgreSQL in Node.js just became possible with Emmett](https://event-driven.io/en/emmett_postgresql_event_store/)
- [Writing and testing event-driven projections with Emmett, Pongo and PostgreSQL](https://event-driven.io/en/emmett_projections_testing/)
- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/)=
- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/)

## FAQ

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void describe('appendEvent', () => {
assertFalse(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length, resultEvents.rows.length);
Expand Down Expand Up @@ -178,7 +178,7 @@ void describe('appendEvent', () => {
assertFalse(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length * 2, resultEvents.rows.length);
Expand Down Expand Up @@ -215,7 +215,7 @@ void describe('appendEvent', () => {
assertTrue(secondResult.success);

const resultEvents = await pool.execute.query(
sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId),
sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId),
);

assertEqual(events.length * 2, resultEvents.rows.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import {
type ReadEvent,
} from '@event-driven-io/emmett';
import { v4 as uuid } from 'uuid';
import { defaultTag, eventsTable, streamsTable } from './typing';
import { defaultTag, messagesTable, streamsTable } from './typing';

export const appendEventsSQL = rawSql(
`CREATE OR REPLACE FUNCTION emt_append_event(
v_event_ids text[],
v_message_ids text[],
v_events_data jsonb[],
v_events_metadata jsonb[],
v_event_schema_versions text[],
v_event_types text[],
v_message_schema_versions text[],
v_message_types text[],
v_stream_id text,
v_stream_type text,
v_expected_stream_position bigint DEFAULT NULL,
Expand Down Expand Up @@ -55,23 +55,23 @@ export const appendEventsSQL = rawSql(
WITH ev AS (
SELECT row_number() OVER () + v_expected_stream_position AS stream_position,
event_data,
event_metadata,
message_data,
message_metadata,
schema_version,
event_id,
event_type
message_id,
message_type
FROM (
SELECT *
FROM
unnest(v_event_ids, v_events_data, v_events_metadata, v_event_schema_versions, v_event_types)
AS event(event_id, event_data, event_metadata, schema_version, event_type)
unnest(v_message_ids, v_events_data, v_events_metadata, v_message_schema_versions, v_message_types)
AS event(message_id, message_data, message_metadata, schema_version, message_type)
) AS event
),
all_events_insert AS (
INSERT INTO ${eventsTable.name}
(stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
INSERT INTO ${messagesTable.name}
(stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
SELECT
v_stream_id, ev.stream_position, v_partition, ev.event_data, ev.event_metadata, ev.schema_version, ev.event_type, ev.event_id, v_transaction_id
v_stream_id, ev.stream_position, v_partition, ev.message_data, ev.message_metadata, ev.schema_version, ev.message_type, ev.message_id, v_transaction_id
FROM ev
RETURNING global_position
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,23 @@ void describe('createEventStoreSchema', () => {
});

void it('creates the events table', async () => {
assert.ok(await tableExists(pool, 'emt_events'));
assert.ok(await tableExists(pool, 'emt_messages'));
});

void it('creates the subscriptions table', async () => {
assert.ok(await tableExists(pool, 'emt_subscriptions'));
});

void it('creates the events default partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default'));
});

void it('creates the events secondary level active partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default_active'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default_active'));
});

void it('creates the events secondary level archived partition', async () => {
assert.ok(await tableExists(pool, 'emt_events_emt_default_archived'));
assert.ok(await tableExists(pool, 'emt_messages_emt_default_archived'));
});
});

Expand Down Expand Up @@ -96,7 +96,7 @@ void describe('createEventStoreSchema', () => {
rawSql(`
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = 'emt_events_test_module__global'
WHERE tablename = 'emt_messages_test_module__global'
) AS exists;
`),
),
Expand All @@ -115,7 +115,7 @@ void describe('createEventStoreSchema', () => {
rawSql(`
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = 'emt_events_test_module__test_tenant'
WHERE tablename = 'emt_messages_test_module__test_tenant'
) AS exists;`),
),
);
Expand All @@ -126,15 +126,15 @@ void describe('createEventStoreSchema', () => {
// void it('should allow adding a module for all tenants', async () => {
// await createEventStoreSchema(pool);

// await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
// await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
// VALUES ('test_stream', 0, 'global__global', '{}', '{}', '1.0', 'test', '${uuid()}', pg_current_xact_id())`);

// await pool.query(`SELECT add_module_for_all_tenants('new_module')`);

// const res = await pool.query(`
// SELECT EXISTS (
// SELECT FROM pg_tables
// WHERE tablename = 'emt_events_new_module__existing_tenant'
// WHERE tablename = 'emt_messages_new_module__existing_tenant'
// ) AS exists;
// `);

Expand All @@ -149,15 +149,15 @@ void describe('createEventStoreSchema', () => {
// void it('should allow adding a tenant for all modules', async () => {
// await createEventStoreSchema(pool);

// await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id)
// await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id)
// VALUES ('test_stream', 0, '${emmettPrefix}:partition:existing_module:existing_tenant', '{}', '{}', '1.0', 'test', '${uuid()}', 0)`);

// await pool.query(`SELECT add_tenant_for_all_modules('new_tenant')`);

// const res = await pool.query(`
// SELECT EXISTS (
// SELECT FROM pg_tables
// WHERE tablename = 'emt_events_existing_module_new_tenant'
// WHERE tablename = 'emt_messages_existing_module_new_tenant'
// ) AS exists;
// `);

Expand Down
13 changes: 7 additions & 6 deletions src/packages/emmett-postgresql/src/eventStore/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { type NodePostgresPool, type SQL } from '@event-driven-io/dumbo';
import { appendEventsSQL } from './appendToStream';
import { storeSubscriptionCheckpointSQL } from './storeSubscriptionCheckpoint';
import {
addDefaultPartition,
addEventsPartitions,
addDefaultPartitionSQL,
addModuleForAllTenantsSQL,
addModuleSQL,
addPartitionSQL,
addTablePartitions,
addTenantForAllModulesSQL,
addTenantSQL,
eventsTableSQL,
messagesTableSQL,
sanitizeNameSQL,
streamsTableSQL,
subscriptionsTableSQL,
Expand All @@ -25,18 +25,19 @@ export * from './tables';
export * from './typing';

export const schemaSQL: SQL[] = [
//migrationFromEventsToMessagesSQL,
streamsTableSQL,
eventsTableSQL,
messagesTableSQL,
subscriptionsTableSQL,
sanitizeNameSQL,
addTablePartitions,
addEventsPartitions,
addPartitionSQL,
addModuleSQL,
addTenantSQL,
addModuleForAllTenantsSQL,
addTenantForAllModulesSQL,
appendEventsSQL,
addDefaultPartition,
addDefaultPartitionSQL,
storeSubscriptionCheckpointSQL,
];

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { singleOrNull, sql, type SQLExecutor } from '@event-driven-io/dumbo';
import { defaultTag, eventsTable } from './typing';
import { defaultTag, messagesTable } from './typing';

type ReadLastMessageGlobalPositionSqlResult = {
global_position: string;
Expand All @@ -17,7 +17,7 @@ export const readLastMessageGlobalPosition = async (
execute.query<ReadLastMessageGlobalPositionSqlResult>(
sql(
`SELECT global_position
FROM ${eventsTable.name}
FROM ${messagesTable.name}
WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY transaction_id, global_position
LIMIT 1`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import {
type ReadEventMetadata,
type ReadEventMetadataWithGlobalPosition,
} from '@event-driven-io/emmett';
import { defaultTag, eventsTable } from './typing';
import { defaultTag, messagesTable } from './typing';

type ReadMessagesBatchSqlResult<EventType extends Event> = {
stream_position: string;
stream_id: string;
event_data: EventDataOf<EventType>;
event_metadata: EventMetaDataOf<EventType>;
event_schema_version: string;
event_type: EventTypeOf<EventType>;
event_id: string;
message_data: EventDataOf<EventType>;
message_metadata: EventMetaDataOf<EventType>;
message_schema_version: string;
message_type: EventTypeOf<EventType>;
message_id: string;
global_position: string;
transaction_id: string;
created: string;
Expand Down Expand Up @@ -76,8 +76,8 @@ export const readMessagesBatch = async <
const events: ReadEvent<MessageType, ReadEventMetadataType>[] = await mapRows(
execute.query<ReadMessagesBatchSqlResult<MessageType>>(
sql(
`SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id
FROM ${eventsTable.name}
`SELECT stream_id, stream_position, global_position, message_data, message_metadata, message_schema_version, message_type, message_id
FROM ${messagesTable.name}
WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ${fromCondition} ${toCondition}
ORDER BY transaction_id, global_position
${limitCondition}`,
Expand All @@ -86,14 +86,14 @@ export const readMessagesBatch = async <
),
(row) => {
const rawEvent = {
type: row.event_type,
data: row.event_data,
metadata: row.event_metadata,
type: row.message_type,
data: row.message_data,
metadata: row.message_metadata,
} as unknown as MessageType;

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
messageId: row.event_id,
messageId: row.message_id,
streamName: row.stream_id,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
Expand Down
24 changes: 12 additions & 12 deletions src/packages/emmett-postgresql/src/eventStore/schema/readStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import {
type ReadStreamResult,
} from '@event-driven-io/emmett';
import { PostgreSQLEventStoreDefaultStreamVersion } from '../postgreSQLEventStore';
import { defaultTag, eventsTable } from './typing';
import { defaultTag, messagesTable } from './typing';

type ReadStreamSqlResult<EventType extends Event> = {
stream_position: string;
event_data: EventDataOf<EventType>;
event_metadata: EventMetaDataOf<EventType>;
event_schema_version: string;
event_type: EventTypeOf<EventType>;
event_id: string;
message_data: EventDataOf<EventType>;
message_metadata: EventMetaDataOf<EventType>;
message_schema_version: string;
message_type: EventTypeOf<EventType>;
message_id: string;
global_position: string;
transaction_id: string;
created: string;
Expand Down Expand Up @@ -51,23 +51,23 @@ export const readStream = async <EventType extends Event>(
await mapRows(
execute.query<ReadStreamSqlResult<EventType>>(
sql(
`SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id
FROM ${eventsTable.name}
`SELECT stream_id, stream_position, global_position, message_data, message_metadata, message_schema_version, message_type, message_id
FROM ${messagesTable.name}
WHERE stream_id = %L AND partition = %L AND is_archived = FALSE ${fromCondition} ${toCondition}`,
streamId,
options?.partition ?? defaultTag,
),
),
(row) => {
const rawEvent = {
type: row.event_type,
data: row.event_data,
metadata: row.event_metadata,
type: row.message_type,
data: row.message_data,
metadata: row.message_metadata,
} as unknown as EventType;

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
messageId: row.event_id,
messageId: row.message_id,
streamName: streamId,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
Expand Down
Loading

0 comments on commit fc7cfcf

Please sign in to comment.