From fd7d492549d26c52df4141e9e49063066946b9e9 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Sat, 8 Feb 2025 19:29:50 +0100 Subject: [PATCH] Renamed events to messages in the PostgreSQL and SQLite schemas and added message kind column --- README.md | 2 +- .../schema/appendToStream.int.spec.ts | 6 +- .../src/eventStore/schema/appendToStream.ts | 26 ++--- .../schema/createEventStoreSchema.int.spec.ts | 20 ++-- .../src/eventStore/schema/index.ts | 14 ++- .../schema/readLastMessageGlobalPosition.ts | 4 +- .../eventStore/schema/readMessagesBatch.ts | 24 ++-- .../src/eventStore/schema/readStream.ts | 24 ++-- .../src/eventStore/schema/tables.ts | 109 +++++++++++++----- .../src/eventStore/schema/typing.ts | 4 +- .../schema/appendToStream.int.spec.ts | 12 +- .../src/eventStore/schema/appendToStream.ts | 14 +-- .../schema/createEventStoreSchema.int.spec.ts | 2 +- .../src/eventStore/schema/index.ts | 4 +- .../src/eventStore/schema/readStream.ts | 24 ++-- .../src/eventStore/schema/tables.ts | 19 +-- .../src/eventStore/schema/typing.ts | 4 +- 17 files changed, 181 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index aab11c3e..f5c8bd66 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Check also my blog articles on Emmett: - [Testing Event Sourcing, Emmett edition](https://event-driven.io/en/testing_event_sourcing_emmett_edition/) - [Event Sourcing on PostgreSQL in Node.js just became possible with Emmett](https://event-driven.io/en/emmett_postgresql_event_store/) - [Writing and testing event-driven projections with Emmett, Pongo and PostgreSQL](https://event-driven.io/en/emmett_projections_testing/) -- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/)= +- [Using event metadata in event-driven projections](https://event-driven.io/en/projections_and_event_metadata/) ## FAQ diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.int.spec.ts index d4384000..ba60d777 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.int.spec.ts @@ -133,7 +133,7 @@ void describe('appendEvent', () => { assertFalse(secondResult.success); const resultEvents = await pool.execute.query( - sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId), + sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId), ); assertEqual(events.length, resultEvents.rows.length); @@ -178,7 +178,7 @@ void describe('appendEvent', () => { assertFalse(secondResult.success); const resultEvents = await pool.execute.query( - sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId), + sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId), ); assertEqual(events.length * 2, resultEvents.rows.length); @@ -215,7 +215,7 @@ void describe('appendEvent', () => { assertTrue(secondResult.success); const resultEvents = await pool.execute.query( - sql(`SELECT * FROM emt_events WHERE stream_id = %L`, streamId), + sql(`SELECT * FROM emt_messages WHERE stream_id = %L`, streamId), ); assertEqual(events.length * 2, resultEvents.rows.length); diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts index 4cbd68b7..39e74034 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts @@ -17,15 +17,15 @@ import { type ReadEvent, } from '@event-driven-io/emmett'; import { v4 as uuid } from 'uuid'; -import { defaultTag, eventsTable, streamsTable } from './typing'; +import { defaultTag, messagesTable, streamsTable } from './typing'; export const appendEventsSQL = rawSql( `CREATE OR REPLACE FUNCTION emt_append_event( - v_event_ids text[], + v_message_ids text[], v_events_data jsonb[], v_events_metadata jsonb[], - v_event_schema_versions text[], - v_event_types text[], + v_message_schema_versions text[], + v_message_types text[], v_stream_id text, v_stream_type text, v_expected_stream_position bigint DEFAULT NULL, @@ -55,23 +55,23 @@ export const appendEventsSQL = rawSql( WITH ev AS ( SELECT row_number() OVER () + v_expected_stream_position AS stream_position, - event_data, - event_metadata, + message_data, + message_metadata, schema_version, - event_id, - event_type + message_id, + message_type FROM ( SELECT * FROM - unnest(v_event_ids, v_events_data, v_events_metadata, v_event_schema_versions, v_event_types) - AS event(event_id, event_data, event_metadata, schema_version, event_type) + unnest(v_message_ids, v_events_data, v_events_metadata, v_message_schema_versions, v_message_types) + AS event(message_id, message_data, message_metadata, schema_version, message_type) ) AS event ), all_events_insert AS ( - INSERT INTO ${eventsTable.name} - (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id) + INSERT INTO ${messagesTable.name} + (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id) SELECT - v_stream_id, ev.stream_position, v_partition, ev.event_data, ev.event_metadata, ev.schema_version, ev.event_type, ev.event_id, v_transaction_id + v_stream_id, ev.stream_position, v_partition, ev.message_data, ev.message_metadata, ev.schema_version, ev.message_type, ev.message_id, v_transaction_id FROM ev RETURNING global_position ) diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/createEventStoreSchema.int.spec.ts b/src/packages/emmett-postgresql/src/eventStore/schema/createEventStoreSchema.int.spec.ts index 43d93a7d..5d4ab6f9 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/createEventStoreSchema.int.spec.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/createEventStoreSchema.int.spec.ts @@ -42,7 +42,7 @@ void describe('createEventStoreSchema', () => { }); void it('creates the events table', async () => { - assert.ok(await tableExists(pool, 'emt_events')); + assert.ok(await tableExists(pool, 'emt_messages')); }); void it('creates the subscriptions table', async () => { @@ -50,15 +50,15 @@ void describe('createEventStoreSchema', () => { }); void it('creates the events default partition', async () => { - assert.ok(await tableExists(pool, 'emt_events_emt_default')); + assert.ok(await tableExists(pool, 'emt_messages_emt_default')); }); void it('creates the events secondary level active partition', async () => { - assert.ok(await tableExists(pool, 'emt_events_emt_default_active')); + assert.ok(await tableExists(pool, 'emt_messages_emt_default_active')); }); void it('creates the events secondary level archived partition', async () => { - assert.ok(await tableExists(pool, 'emt_events_emt_default_archived')); + assert.ok(await tableExists(pool, 'emt_messages_emt_default_archived')); }); }); @@ -96,7 +96,7 @@ void describe('createEventStoreSchema', () => { rawSql(` SELECT EXISTS ( SELECT FROM pg_tables - WHERE tablename = 'emt_events_test_module__global' + WHERE tablename = 'emt_messages_test_module__global' ) AS exists; `), ), @@ -115,7 +115,7 @@ void describe('createEventStoreSchema', () => { rawSql(` SELECT EXISTS ( SELECT FROM pg_tables - WHERE tablename = 'emt_events_test_module__test_tenant' + WHERE tablename = 'emt_messages_test_module__test_tenant' ) AS exists;`), ), ); @@ -126,7 +126,7 @@ void describe('createEventStoreSchema', () => { // void it('should allow adding a module for all tenants', async () => { // await createEventStoreSchema(pool); - // await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id) + // await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id) // VALUES ('test_stream', 0, 'global__global', '{}', '{}', '1.0', 'test', '${uuid()}', pg_current_xact_id())`); // await pool.query(`SELECT add_module_for_all_tenants('new_module')`); @@ -134,7 +134,7 @@ void describe('createEventStoreSchema', () => { // const res = await pool.query(` // SELECT EXISTS ( // SELECT FROM pg_tables - // WHERE tablename = 'emt_events_new_module__existing_tenant' + // WHERE tablename = 'emt_messages_new_module__existing_tenant' // ) AS exists; // `); @@ -149,7 +149,7 @@ void describe('createEventStoreSchema', () => { // void it('should allow adding a tenant for all modules', async () => { // await createEventStoreSchema(pool); - // await pool.query(`INSERT INTO emt_events (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, transaction_id) + // await pool.query(`INSERT INTO emt_messages (stream_id, stream_position, partition, message_data, message_metadata, message_schema_version, message_type, message_id, transaction_id) // VALUES ('test_stream', 0, '${emmettPrefix}:partition:existing_module:existing_tenant', '{}', '{}', '1.0', 'test', '${uuid()}', 0)`); // await pool.query(`SELECT add_tenant_for_all_modules('new_tenant')`); @@ -157,7 +157,7 @@ void describe('createEventStoreSchema', () => { // const res = await pool.query(` // SELECT EXISTS ( // SELECT FROM pg_tables - // WHERE tablename = 'emt_events_existing_module_new_tenant' + // WHERE tablename = 'emt_messages_existing_module_new_tenant' // ) AS exists; // `); diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/index.ts b/src/packages/emmett-postgresql/src/eventStore/schema/index.ts index cf9cb79c..9f222be4 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/index.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/index.ts @@ -2,14 +2,15 @@ import { type NodePostgresPool, type SQL } from '@event-driven-io/dumbo'; import { appendEventsSQL } from './appendToStream'; import { storeSubscriptionCheckpointSQL } from './storeSubscriptionCheckpoint'; import { - addDefaultPartition, - addEventsPartitions, + addDefaultPartitionSQL, addModuleForAllTenantsSQL, addModuleSQL, + addPartitionSQL, addTablePartitions, addTenantForAllModulesSQL, addTenantSQL, - eventsTableSQL, + messagesTableSQL, + migrationFromEventsToMessagesSQL, sanitizeNameSQL, streamsTableSQL, subscriptionsTableSQL, @@ -25,18 +26,19 @@ export * from './tables'; export * from './typing'; export const schemaSQL: SQL[] = [ + migrationFromEventsToMessagesSQL, streamsTableSQL, - eventsTableSQL, + messagesTableSQL, subscriptionsTableSQL, sanitizeNameSQL, addTablePartitions, - addEventsPartitions, + addPartitionSQL, addModuleSQL, addTenantSQL, addModuleForAllTenantsSQL, addTenantForAllModulesSQL, appendEventsSQL, - addDefaultPartition, + addDefaultPartitionSQL, storeSubscriptionCheckpointSQL, ]; diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readLastMessageGlobalPosition.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readLastMessageGlobalPosition.ts index bd737b82..348de0b9 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readLastMessageGlobalPosition.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readLastMessageGlobalPosition.ts @@ -1,5 +1,5 @@ import { singleOrNull, sql, type SQLExecutor } from '@event-driven-io/dumbo'; -import { defaultTag, eventsTable } from './typing'; +import { defaultTag, messagesTable } from './typing'; type ReadLastMessageGlobalPositionSqlResult = { global_position: string; @@ -17,7 +17,7 @@ export const readLastMessageGlobalPosition = async ( execute.query( sql( `SELECT global_position - FROM ${eventsTable.name} + FROM ${messagesTable.name} WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ORDER BY transaction_id, global_position LIMIT 1`, diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts index 25fcffc5..77e63dcf 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readMessagesBatch.ts @@ -9,16 +9,16 @@ import { type ReadEventMetadata, type ReadEventMetadataWithGlobalPosition, } from '@event-driven-io/emmett'; -import { defaultTag, eventsTable } from './typing'; +import { defaultTag, messagesTable } from './typing'; type ReadMessagesBatchSqlResult = { stream_position: string; stream_id: string; - event_data: EventDataOf; - event_metadata: EventMetaDataOf; - event_schema_version: string; - event_type: EventTypeOf; - event_id: string; + message_data: EventDataOf; + message_metadata: EventMetaDataOf; + message_schema_version: string; + message_type: EventTypeOf; + message_id: string; global_position: string; transaction_id: string; created: string; @@ -76,8 +76,8 @@ export const readMessagesBatch = async < const events: ReadEvent[] = await mapRows( execute.query>( sql( - `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id - FROM ${eventsTable.name} + `SELECT stream_id, stream_position, global_position, message_data, message_metadata, message_schema_version, message_type, message_id + FROM ${messagesTable.name} WHERE partition = %L AND is_archived = FALSE AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ${fromCondition} ${toCondition} ORDER BY transaction_id, global_position ${limitCondition}`, @@ -86,14 +86,14 @@ export const readMessagesBatch = async < ), (row) => { const rawEvent = { - type: row.event_type, - data: row.event_data, - metadata: row.event_metadata, + type: row.message_type, + data: row.message_data, + metadata: row.message_metadata, } as unknown as MessageType; const metadata: ReadEventMetadataWithGlobalPosition = { ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), - messageId: row.event_id, + messageId: row.message_id, streamName: row.stream_id, streamPosition: BigInt(row.stream_position), globalPosition: BigInt(row.global_position), diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/readStream.ts b/src/packages/emmett-postgresql/src/eventStore/schema/readStream.ts index bbfee3a3..faf92b5c 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/readStream.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/readStream.ts @@ -11,15 +11,15 @@ import { type ReadStreamResult, } from '@event-driven-io/emmett'; import { PostgreSQLEventStoreDefaultStreamVersion } from '../postgreSQLEventStore'; -import { defaultTag, eventsTable } from './typing'; +import { defaultTag, messagesTable } from './typing'; type ReadStreamSqlResult = { stream_position: string; - event_data: EventDataOf; - event_metadata: EventMetaDataOf; - event_schema_version: string; - event_type: EventTypeOf; - event_id: string; + message_data: EventDataOf; + message_metadata: EventMetaDataOf; + message_schema_version: string; + message_type: EventTypeOf; + message_id: string; global_position: string; transaction_id: string; created: string; @@ -51,8 +51,8 @@ export const readStream = async ( await mapRows( execute.query>( sql( - `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id - FROM ${eventsTable.name} + `SELECT stream_id, stream_position, global_position, message_data, message_metadata, message_schema_version, message_type, message_id + FROM ${messagesTable.name} WHERE stream_id = %L AND partition = %L AND is_archived = FALSE ${fromCondition} ${toCondition}`, streamId, options?.partition ?? defaultTag, @@ -60,14 +60,14 @@ export const readStream = async ( ), (row) => { const rawEvent = { - type: row.event_type, - data: row.event_data, - metadata: row.event_metadata, + type: row.message_type, + data: row.message_data, + metadata: row.message_metadata, } as unknown as EventType; const metadata: ReadEventMetadataWithGlobalPosition = { ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), - messageId: row.event_id, + messageId: row.message_id, streamName: streamId, streamPosition: BigInt(row.stream_position), globalPosition: BigInt(row.global_position), diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts index b40ed29d..11c1932b 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/tables.ts @@ -1,8 +1,8 @@ import { rawSql } from '@event-driven-io/dumbo'; import { defaultTag, - eventsTable, globalTag, + messagesTable, streamsTable, subscriptionsTable, } from './typing'; @@ -11,7 +11,7 @@ export const streamsTableSQL = rawSql( `CREATE TABLE IF NOT EXISTS ${streamsTable.name}( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL, - partition TEXT NOT NULL DEFAULT '${globalTag}__${globalTag}', + partition TEXT NOT NULL DEFAULT '${globalTag}', stream_type TEXT NOT NULL, stream_metadata JSONB NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, @@ -20,19 +20,20 @@ export const streamsTableSQL = rawSql( ) PARTITION BY LIST (partition);`, ); -export const eventsTableSQL = rawSql( +export const messagesTableSQL = rawSql( ` CREATE SEQUENCE IF NOT EXISTS emt_global_event_position; - CREATE TABLE IF NOT EXISTS ${eventsTable.name}( + CREATE TABLE IF NOT EXISTS ${messagesTable.name}( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL, partition TEXT NOT NULL DEFAULT '${globalTag}', - event_data JSONB NOT NULL, - event_metadata JSONB NOT NULL, - event_schema_version TEXT NOT NULL, - event_type TEXT NOT NULL, - event_id TEXT NOT NULL, + message_kind TEXT NOT NULL DEFAULT 'Event', + message_data JSONB NOT NULL, + message_metadata JSONB NOT NULL, + message_schema_version TEXT NOT NULL, + message_type TEXT NOT NULL, + message_id TEXT NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, global_position BIGINT DEFAULT nextval('emt_global_event_position'), transaction_id XID8 NOT NULL, @@ -46,7 +47,7 @@ export const subscriptionsTableSQL = rawSql( CREATE TABLE IF NOT EXISTS ${subscriptionsTable.name}( subscription_id TEXT NOT NULL, version INT NOT NULL DEFAULT 1, - partition TEXT NOT NULL DEFAULT '${globalTag}__${globalTag}', + partition TEXT NOT NULL DEFAULT '${globalTag}', last_processed_position BIGINT NOT NULL, last_processed_transaction_id XID8 NOT NULL, PRIMARY KEY (subscription_id, partition, version) @@ -97,11 +98,11 @@ export const addTablePartitions = rawSql( $$ LANGUAGE plpgsql;`, ); -export const addEventsPartitions = rawSql( +export const addPartitionSQL = rawSql( ` CREATE OR REPLACE FUNCTION emt_add_partition(partition_name TEXT) RETURNS void AS $$ BEGIN - PERFORM emt_add_table_partition('${eventsTable.name}', partition_name); + PERFORM emt_add_table_partition('${messagesTable.name}', partition_name); PERFORM emt_add_table_partition('${streamsTable.name}', partition_name); EXECUTE format(' @@ -117,23 +118,23 @@ export const addModuleSQL = rawSql( ` CREATE OR REPLACE FUNCTION add_module(new_module TEXT) RETURNS void AS $$ BEGIN - -- For ${eventsTable.name} table + -- For ${messagesTable.name} table EXECUTE format(' CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES IN (emt_sanitize_name(%L || ''__'' || %L)) PARTITION BY LIST (is_archived);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || '${globalTag}'), '${eventsTable.name}', new_module, '${globalTag}' + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || '${globalTag}'), '${messagesTable.name}', new_module, '${globalTag}' ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_active PARTITION OF %I FOR VALUES IN (FALSE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || '${globalTag}' || '_active'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || '${globalTag}') + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || '${globalTag}' || '_active'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || '${globalTag}') ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_archived PARTITION OF %I FOR VALUES IN (TRUE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || '${globalTag}' || '_archived'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || '${globalTag}') + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || '${globalTag}' || '_archived'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || '${globalTag}') ); -- For ${streamsTable.name} table @@ -163,23 +164,23 @@ export const addTenantSQL = rawSql( ` CREATE OR REPLACE FUNCTION add_tenant(new_module TEXT, new_tenant TEXT) RETURNS void AS $$ BEGIN - -- For ${eventsTable.name} table + -- For ${messagesTable.name} table EXECUTE format(' CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES IN (emt_sanitize_name(''%s__%s'')) PARTITION BY LIST (is_archived);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || new_tenant), '${eventsTable.name}', new_module, new_tenant + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || new_tenant), '${messagesTable.name}', new_module, new_tenant ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_active PARTITION OF %I FOR VALUES IN (FALSE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || new_tenant || '_active'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || new_tenant) + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || new_tenant || '_active'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || new_tenant) ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_archived PARTITION OF %I FOR VALUES IN (TRUE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || new_tenant || '_archived'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || new_tenant) + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || new_tenant || '_archived'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || new_tenant) ); -- For ${streamsTable.name} table @@ -213,25 +214,25 @@ export const addModuleForAllTenantsSQL = rawSql( BEGIN PERFORM add_module(new_module); - FOR tenant_record IN SELECT DISTINCT tenant FROM ${eventsTable.name} + FOR tenant_record IN SELECT DISTINCT tenant FROM ${messagesTable.name} LOOP - -- For ${eventsTable.name} table + -- For ${messagesTable.name} table EXECUTE format(' CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES IN (emt_sanitize_name(''%s__%s'')) PARTITION BY LIST (is_archived);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || tenant_record.tenant), '${eventsTable.name}', new_module, tenant_record.tenant + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || tenant_record.tenant), '${messagesTable.name}', new_module, tenant_record.tenant ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_active PARTITION OF %I FOR VALUES IN (FALSE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || tenant_record.tenant || '_active'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || tenant_record.tenant) + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || tenant_record.tenant || '_active'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || tenant_record.tenant) ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_archived PARTITION OF %I FOR VALUES IN (TRUE);', - emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || tenant_record.tenant || '_archived'), emt_sanitize_name('${eventsTable.name}_' || new_module || '__' || tenant_record.tenant) + emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || tenant_record.tenant || '_archived'), emt_sanitize_name('${messagesTable.name}_' || new_module || '__' || tenant_record.tenant) ); -- For ${streamsTable.name} table @@ -264,25 +265,25 @@ export const addTenantForAllModulesSQL = rawSql( DECLARE module_record RECORD; BEGIN - FOR module_record IN SELECT DISTINCT partitionname FROM pg_partman.part_config WHERE parent_table = '${eventsTable.name}' + FOR module_record IN SELECT DISTINCT partitionname FROM pg_partman.part_config WHERE parent_table = '${messagesTable.name}' LOOP - -- For ${eventsTable.name} table + -- For ${messagesTable.name} table EXECUTE format(' CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES IN (emt_sanitize_name(''%s__%s'')) PARTITION BY LIST (is_archived);', - emt_sanitize_name('${eventsTable.name}_' || module_record.partitionname || '__' || new_tenant), '${eventsTable.name}', module_record.partitionname, new_tenant + emt_sanitize_name('${messagesTable.name}_' || module_record.partitionname || '__' || new_tenant), '${messagesTable.name}', module_record.partitionname, new_tenant ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_active PARTITION OF %I FOR VALUES IN (FALSE);', - emt_sanitize_name('${eventsTable.name}_' || module_record.partitionname || '__' || new_tenant || '_active'), emt_sanitize_name('${eventsTable.name}_' || module_record.partitionname || '__' || new_tenant) + emt_sanitize_name('${messagesTable.name}_' || module_record.partitionname || '__' || new_tenant || '_active'), emt_sanitize_name('${messagesTable.name}_' || module_record.partitionname || '__' || new_tenant) ); EXECUTE format(' CREATE TABLE IF NOT EXISTS %I_archived PARTITION OF %I FOR VALUES IN (TRUE);', - emt_sanitize_name('${eventsTable.name}_' || module_record.partitionname || '__' || new_tenant || '_archived'), emt_sanitize_name('${eventsTable.name}_' || module_record.partitionname || '__' || new_tenant) + emt_sanitize_name('${messagesTable.name}_' || module_record.partitionname || '__' || new_tenant || '_archived'), emt_sanitize_name('${messagesTable.name}_' || module_record.partitionname || '__' || new_tenant) ); -- For ${streamsTable.name} table @@ -309,6 +310,52 @@ export const addTenantForAllModulesSQL = rawSql( `, ); -export const addDefaultPartition = rawSql( +export const addDefaultPartitionSQL = rawSql( `SELECT emt_add_partition('${defaultTag}');`, ); + +export const migrationFromEventsToMessagesSQL = rawSql(` +DO $$ +DECLARE + partition_record RECORD; +BEGIN + -- Rename the main table and its columns if it exists + IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = 'emt_events') THEN + -- Rename all partitions first + FOR partition_record IN + SELECT tablename + FROM pg_tables + WHERE tablename LIKE 'emt_events_%' + ORDER BY tablename DESC -- to handle child partitions first + LOOP + EXECUTE format('ALTER TABLE %I RENAME TO %I', + partition_record.tablename, + REPLACE(partition_record.tablename, 'events', 'messages')); + END LOOP; + + -- Rename the main table + ALTER TABLE emt_events RENAME TO emt_messages; + + -- Rename columns + ALTER TABLE emt_messages + RENAME COLUMN event_data TO message_data; + ALTER TABLE emt_messages + RENAME COLUMN event_metadata TO message_metadata; + ALTER TABLE emt_messages + RENAME COLUMN event_schema_version TO message_schema_version; + ALTER TABLE emt_messages + RENAME COLUMN event_type TO message_type; + ALTER TABLE emt_messages + RENAME COLUMN event_id TO message_id; + + -- Rename sequence if it exists + IF EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'emt_global_event_position') THEN + ALTER SEQUENCE emt_global_event_position + RENAME TO emt_global_message_position; + + ALTER TABLE emt_messages + ALTER COLUMN global_position + SET DEFAULT nextval('emt_global_message_position'); + END IF; + END IF; +END $$;`); diff --git a/src/packages/emmett-postgresql/src/eventStore/schema/typing.ts b/src/packages/emmett-postgresql/src/eventStore/schema/typing.ts index 8a70f228..ab26826d 100644 --- a/src/packages/emmett-postgresql/src/eventStore/schema/typing.ts +++ b/src/packages/emmett-postgresql/src/eventStore/schema/typing.ts @@ -23,8 +23,8 @@ export const streamsTable = { }, }; -export const eventsTable = { - name: `${emmettPrefix}_events`, +export const messagesTable = { + name: `${emmettPrefix}_messages`, columns: { partition: columns.partition, isArchived: columns.isArchived, diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts index 510cfdc0..9899196b 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -93,7 +93,7 @@ void describe('appendEvent', () => { await appendToStream(db, streamId, 'shopping_cart', events); const result = await appendToStream(db, streamId, 'shopping_cart', events); const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); @@ -131,7 +131,7 @@ void describe('appendEvent', () => { assertFalse(secondResult.success); const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); @@ -168,7 +168,7 @@ void describe('appendEvent', () => { assertFalse(secondResult.success); const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); @@ -215,7 +215,7 @@ void describe('appendEvent', () => { assertFalse(secondResult.success); const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); @@ -253,7 +253,7 @@ void describe('appendEvent', () => { assertTrue(secondResult.success); const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); @@ -288,7 +288,7 @@ void describe('appendEvent', () => { } const resultEvents = await db.query( - 'SELECT * FROM emt_events WHERE stream_id = $1', + 'SELECT * FROM emt_messages WHERE stream_id = $1', [streamId], ); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts index 5790c301..c372f9a7 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts @@ -15,7 +15,7 @@ import { type SQLiteConnection, type SQLiteError, } from '../../sqliteConnection'; -import { defaultTag, eventsTable, streamsTable } from './typing'; +import { defaultTag, messagesTable, streamsTable } from './typing'; export type AppendEventResult = | { @@ -282,15 +282,15 @@ const buildEventInsertQuery = ( ); const sqlString = ` - INSERT INTO ${eventsTable.name} ( + INSERT INTO ${messagesTable.name} ( stream_id, stream_position, partition, - event_data, - event_metadata, - event_schema_version, - event_type, - event_id, + message_data, + message_metadata, + message_schema_version, + message_type, + message_id, is_archived ) VALUES ${query.parameterMarkers.join(', ')} diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts index 30c9c76a..bf55b048 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts @@ -44,7 +44,7 @@ void describe('createEventStoreSchema', () => { }); void it('creates the events table', async () => { - assert.ok(await tableExists(db, 'emt_events')); + assert.ok(await tableExists(db, 'emt_messages')); }); }); }); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts index d7068d6c..5f672ed3 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts @@ -1,9 +1,9 @@ import { type SQLiteConnection } from '../../sqliteConnection'; -import { eventsTableSQL, streamsTableSQL } from './tables'; +import { messagesTableSQL, streamsTableSQL } from './tables'; export * from './tables'; -export const schemaSQL: string[] = [streamsTableSQL, eventsTableSQL]; +export const schemaSQL: string[] = [streamsTableSQL, messagesTableSQL]; export const createEventStoreSchema = async ( db: SQLiteConnection, diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts index 43d18cfc..b0b98051 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts @@ -12,15 +12,15 @@ import { } from '@event-driven-io/emmett'; import { type SQLiteConnection } from '../../sqliteConnection'; import { SQLiteEventStoreDefaultStreamVersion } from '../SQLiteEventStore'; -import { defaultTag, eventsTable } from './typing'; +import { defaultTag, messagesTable } from './typing'; type ReadStreamSqlResult = { stream_position: string; - event_data: EventDataOf; - event_metadata: EventMetaDataOf; - event_schema_version: string; - event_type: EventTypeOf; - event_id: string; + message_data: EventDataOf; + message_metadata: EventMetaDataOf; + message_schema_version: string; + message_type: EventTypeOf; + message_id: string; global_position: string; created: string; }; @@ -48,8 +48,8 @@ export const readStream = async ( const toCondition = !isNaN(to) ? `AND stream_position <= ${to}` : ''; const results = await db.query>( - `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id - FROM ${eventsTable.name} + `SELECT stream_id, stream_position, global_position, message_data, message_metadata, message_schema_version, message_type, message_id + FROM ${messagesTable.name} WHERE stream_id = ? AND partition = ? AND is_archived = FALSE ${fromCondition} ${toCondition}`, [streamId, options?.partition ?? defaultTag], ); @@ -57,16 +57,16 @@ export const readStream = async ( const events: ReadEvent[] = results.map((row) => { const rawEvent = event( - row.event_type, - JSONParser.parse(row.event_data), - JSONParser.parse(row.event_metadata), + row.message_type, + JSONParser.parse(row.message_data), + JSONParser.parse(row.message_metadata), ); return { ...rawEvent, metadata: { ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), - eventId: row.event_id, + eventId: row.message_id, streamName: streamId, streamPosition: BigInt(row.stream_position), globalPosition: BigInt(row.global_position), diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts index 888e55fb..74fbf7f7 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts @@ -1,4 +1,4 @@ -import { eventsTable, globalTag, streamsTable } from './typing'; +import { globalTag, messagesTable, streamsTable } from './typing'; export const sql = (sql: string) => sql; @@ -6,7 +6,7 @@ export const streamsTableSQL = sql( `CREATE TABLE IF NOT EXISTS ${streamsTable.name}( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL DEFAULT 0, - partition TEXT NOT NULL DEFAULT '${globalTag}__${globalTag}', + partition TEXT NOT NULL DEFAULT '${globalTag}', stream_type TEXT NOT NULL, stream_metadata JSONB NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, @@ -15,16 +15,17 @@ export const streamsTableSQL = sql( );`, ); -export const eventsTableSQL = sql( - `CREATE TABLE IF NOT EXISTS ${eventsTable.name}( +export const messagesTableSQL = sql( + `CREATE TABLE IF NOT EXISTS ${messagesTable.name}( stream_id TEXT NOT NULL, stream_position BIGINT NOT NULL, partition TEXT NOT NULL DEFAULT '${globalTag}', - event_data JSONB NOT NULL, - event_metadata JSONB NOT NULL, - event_schema_version TEXT NOT NULL, - event_type TEXT NOT NULL, - event_id TEXT NOT NULL, + message_kind TEXT NOT NULL DEFAULT 'Event', + message_data JSONB NOT NULL, + message_metadata JSONB NOT NULL, + message_schema_version TEXT NOT NULL, + message_type TEXT NOT NULL, + message_id TEXT NOT NULL, is_archived BOOLEAN NOT NULL DEFAULT FALSE, global_position INTEGER PRIMARY KEY, created DATETIME DEFAULT CURRENT_TIMESTAMP, diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts b/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts index d1b13a9a..2608a9f9 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts @@ -22,8 +22,8 @@ export const streamsTable = { }, }; -export const eventsTable = { - name: `${emmettPrefix}_events`, +export const messagesTable = { + name: `${emmettPrefix}_messages`, columns: { partition: columns.partition, isArchived: columns.isArchived,