From bb5f6e5092cd3f5f8f9d83a8f0ed0355d11ddb95 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Mon, 24 Feb 2025 21:15:49 +0100 Subject: [PATCH] Adjusted projection definition --- ...EventStoreConsumer.projections.int.spec.ts | 135 +++++++++++------- .../consumers/postgreSQLProcessor.ts | 27 ++-- .../src/eventStore/projections/index.ts | 8 +- .../projections/pongo/projections.ts | 6 +- .../postgresProjection.multi.int.spec.ts | 3 +- .../projections/postgresProjectionSpec.ts | 6 +- 6 files changed, 111 insertions(+), 74 deletions(-) diff --git a/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLEventStoreConsumer.projections.int.spec.ts b/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLEventStoreConsumer.projections.int.spec.ts index 00236536..5f475239 100644 --- a/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLEventStoreConsumer.projections.int.spec.ts +++ b/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLEventStoreConsumer.projections.int.spec.ts @@ -1,14 +1,17 @@ -import { assertThatArray, type Event } from '@event-driven-io/emmett'; +import { assertThatArray, type ReadEvent } from '@event-driven-io/emmett'; import { PostgreSqlContainer, StartedPostgreSqlContainer, } from '@testcontainers/postgresql'; import { after, before, describe, it } from 'node:test'; import { v4 as uuid } from 'uuid'; +import type { ShoppingCartConfirmed } from '../../testing/shoppingCart.domain'; import { getPostgreSQLEventStore, type PostgresEventStore, } from '../postgreSQLEventStore'; +import { pongoMultiStreamProjection } from '../projections'; +import type { ProductItemAdded } from '../projections/postgresProjection.customid.int.spec'; import { postgreSQLEventStoreConsumer } from './postgreSQLEventStoreConsumer'; import type { PostgreSQLProcessorOptions } from './postgreSQLProcessor'; @@ -43,7 +46,7 @@ void describe('PostgreSQL event store started consumer', () => { // Given const guestId = uuid(); const streamName = `guestStay-${guestId}`; - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; @@ -52,20 +55,18 @@ void describe('PostgreSQL event store started consumer', () => { events, ); - const result: GuestStayEvent[] = []; + const result: ShoppingCartSummaryEvent[] = []; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); - consumer.processor({ + consumer.processor({ processorId: uuid(), + projection: shoppingCartsSummaryProjection, stopAfter: (event) => event.metadata.globalPosition === appendResult.lastEventGlobalPosition, - eachMessage: (event) => { - result.push(event); - }, }); try { @@ -84,25 +85,23 @@ void describe('PostgreSQL event store started consumer', () => { async () => { // Given - const result: GuestStayEvent[] = []; + const result: ShoppingCartSummaryEvent[] = []; let stopAfterPosition: bigint | undefined = undefined; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); - consumer.processor({ + consumer.processor({ processorId: uuid(), + projection: shoppingCartsSummaryProjection, stopAfter: (event) => event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, }); const guestId = uuid(); const streamName = `guestStay-${guestId}`; - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; @@ -134,33 +133,31 @@ void describe('PostgreSQL event store started consumer', () => { const otherGuestId = uuid(); const streamName = `guestStay-${guestId}`; - const initialEvents: GuestStayEvent[] = [ + const initialEvents: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; const { lastEventGlobalPosition: startPosition } = await eventStore.appendToStream(streamName, initialEvents); - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, ]; - const result: GuestStayEvent[] = []; + const result: ShoppingCartSummaryEvent[] = []; let stopAfterPosition: bigint | undefined = undefined; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); - consumer.processor({ + consumer.processor({ processorId: uuid(), + projection: shoppingCartsSummaryProjection, startFrom: { globalPosition: startPosition }, stopAfter: (event) => event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, }); try { @@ -190,33 +187,31 @@ void describe('PostgreSQL event store started consumer', () => { const otherGuestId = uuid(); const streamName = `guestStay-${guestId}`; - const initialEvents: GuestStayEvent[] = [ + const initialEvents: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; await eventStore.appendToStream(streamName, initialEvents); - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, ]; - const result: GuestStayEvent[] = []; + const result: ShoppingCartSummaryEvent[] = []; let stopAfterPosition: bigint | undefined = undefined; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); - consumer.processor({ + consumer.processor({ processorId: uuid(), + projection: shoppingCartsSummaryProjection, startFrom: 'CURRENT', stopAfter: (event) => event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, }); try { @@ -249,7 +244,7 @@ void describe('PostgreSQL event store started consumer', () => { const otherGuestId = uuid(); const streamName = `guestStay-${guestId}`; - const initialEvents: GuestStayEvent[] = [ + const initialEvents: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; @@ -258,26 +253,24 @@ void describe('PostgreSQL event store started consumer', () => { initialEvents, ); - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, ]; - let result: GuestStayEvent[] = []; + let result: ShoppingCartSummaryEvent[] = []; let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); - consumer.processor({ + consumer.processor({ processorId: uuid(), + projection: shoppingCartsSummaryProjection, startFrom: 'CURRENT', stopAfter: (event) => event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, }); await consumer.start(); @@ -314,7 +307,7 @@ void describe('PostgreSQL event store started consumer', () => { const otherGuestId = uuid(); const streamName = `guestStay-${guestId}`; - const initialEvents: GuestStayEvent[] = [ + const initialEvents: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId } }, { type: 'GuestCheckedOut', data: { guestId } }, ]; @@ -323,30 +316,29 @@ void describe('PostgreSQL event store started consumer', () => { initialEvents, ); - const events: GuestStayEvent[] = [ + const events: ShoppingCartSummaryEvent[] = [ { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, ]; - let result: GuestStayEvent[] = []; + let result: ShoppingCartSummaryEvent[] = []; let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; - const processorOptions: PostgreSQLProcessorOptions = { - processorId: uuid(), - startFrom: 'CURRENT', - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }; + const processorOptions: PostgreSQLProcessorOptions = + { + processorId: uuid(), + projection: shoppingCartsSummaryProjection, + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + }; // When const consumer = postgreSQLEventStoreConsumer({ connectionString, }); try { - consumer.processor(processorOptions); + consumer.processor(processorOptions); await consumer.start(); } finally { @@ -360,7 +352,7 @@ void describe('PostgreSQL event store started consumer', () => { const newConsumer = postgreSQLEventStoreConsumer({ connectionString, }); - newConsumer.processor(processorOptions); + newConsumer.processor(processorOptions); try { const consumerPromise = newConsumer.start(); @@ -382,7 +374,48 @@ void describe('PostgreSQL event store started consumer', () => { }); }); -type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>; -type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>; +type ShoppingCartSummary = { + _id?: string; + activeCount: number; + activeShopingCarts: string[]; +}; + +const shoppingCartsSummaryCollectionName = 'shoppingCartsSummary'; -type GuestStayEvent = GuestCheckedIn | GuestCheckedOut; +export type ShoppingCartSummaryEvent = ProductItemAdded | ShoppingCartConfirmed; + +const evolve = ( + document: ShoppingCartSummary, + { type, metadata: { streamName } }: ReadEvent, +): ShoppingCartSummary => { + switch (type) { + case 'ProductItemAdded': { + if (!document.activeShopingCarts.includes(streamName)) { + document.activeShopingCarts.push(streamName); + document.activeCount++; + } + + return document; + } + case 'ShoppingCartConfirmed': + document.activeShopingCarts = document.activeShopingCarts.filter( + (item) => item !== streamName, + ); + document.activeCount--; + + return document; + default: + return document; + } +}; + +const shoppingCartsSummaryProjection = pongoMultiStreamProjection({ + getDocumentId: (event) => event.metadata.streamName.split(':')[1]!, + collectionName: shoppingCartsSummaryCollectionName, + evolve, + canHandle: ['ProductItemAdded', 'ShoppingCartConfirmed'], + initialState: () => ({ + activeCount: 0, + activeShopingCarts: [], + }), +}); diff --git a/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLProcessor.ts b/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLProcessor.ts index b2899708..f69b8d64 100644 --- a/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLProcessor.ts +++ b/src/packages/emmett-postgresql/src/eventStore/consumers/postgreSQLProcessor.ts @@ -90,13 +90,16 @@ export type GenericPostgreSQLProcessorOptions = export type PostgreSQLProjectionProcessorOptions< EventType extends Event = Event, -> = { type: 'projection' } & PostgreSQLProjectionDefinition & { - partition?: string; - startFrom?: PostgreSQLProcessorStartFrom; - stopAfter?: ( - message: ReadEvent, - ) => boolean; - }; +> = { + processorId?: string; + version?: number; + projection: PostgreSQLProjectionDefinition; + partition?: string; + startFrom?: PostgreSQLProcessorStartFrom; + stopAfter?: ( + message: ReadEvent, + ) => boolean; +}; export type PostgreSQLProcessorOptions = | GenericPostgreSQLProcessorOptions @@ -202,12 +205,14 @@ const genericPostgreSQLProcessor = ( export const postgreSQLProjectionProcessor = ( options: PostgreSQLProjectionProcessorOptions, ): PostgreSQLProcessor => { + const projection = options.projection; + return genericPostgreSQLProcessor({ - processorId: `projection:${options.name}`, + processorId: options.processorId ?? `projection:${projection.name}`, eachMessage: async (event, context) => { - if (!options.canHandle.includes(event.type)) return; + if (!projection.canHandle.includes(event.type)) return; - await options.handle([event], context); + await projection.handle([event], context); }, ...options, }); @@ -216,7 +221,7 @@ export const postgreSQLProjectionProcessor = ( export const postgreSQLProcessor = ( options: PostgreSQLProcessorOptions, ): PostgreSQLProcessor => { - if ('type' in options) { + if ('projection' in options) { return postgreSQLProjectionProcessor(options); } diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/index.ts b/src/packages/emmett-postgresql/src/eventStore/projections/index.ts index 7b73a99f..6c2f7d59 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/index.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/index.ts @@ -86,13 +86,13 @@ export const handleProjections = async ( export const postgreSQLProjection = ( definition: PostgreSQLProjectionDefinition, -): PostgreSQLProjectionDefinition => +): PostgreSQLProjectionDefinition => projection< EventType, PostgresReadEventMetadata, PostgreSQLProjectionHandlerContext, PostgreSQLProjectionDefinition - >(definition) as PostgreSQLProjectionDefinition; + >(definition); export const postgreSQLRawBatchSQLProjection = ( handle: ( @@ -100,7 +100,7 @@ export const postgreSQLRawBatchSQLProjection = ( context: PostgreSQLProjectionHandlerContext, ) => Promise | SQL[], ...canHandle: CanHandle -): PostgreSQLProjectionDefinition => +): PostgreSQLProjectionDefinition => postgreSQLProjection({ canHandle, handle: async (events, context) => { @@ -116,7 +116,7 @@ export const postgreSQLRawSQLProjection = ( context: PostgreSQLProjectionHandlerContext, ) => Promise | SQL, ...canHandle: CanHandle -): PostgreSQLProjectionDefinition => +): PostgreSQLProjectionDefinition => postgreSQLRawBatchSQLProjection( async (events, context) => { const sqls: SQL[] = []; diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts b/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts index 4c951165..68aad923 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts @@ -70,7 +70,7 @@ export type PongoProjectionOptions = { export const pongoProjection = ({ handle, canHandle, -}: PongoProjectionOptions): PostgreSQLProjectionDefinition => +}: PongoProjectionOptions): PostgreSQLProjectionDefinition => postgreSQLProjection({ canHandle, handle: async (events, context) => { @@ -126,7 +126,7 @@ export const pongoMultiStreamProjection = < EventType, EventMetaDataType >, -): PostgreSQLProjectionDefinition => { +): PostgreSQLProjectionDefinition => { const { collectionName, getDocumentId, canHandle } = options; return pongoProjection({ @@ -190,7 +190,7 @@ export const pongoSingleStreamProjection = < EventType, EventMetaDataType >, -): PostgreSQLProjectionDefinition => { +): PostgreSQLProjectionDefinition => { return pongoMultiStreamProjection({ ...options, getDocumentId: diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.multi.int.spec.ts b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.multi.int.spec.ts index 747bddbd..b3d1eedc 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.multi.int.spec.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.multi.int.spec.ts @@ -15,7 +15,6 @@ import { PostgreSQLProjectionSpec, } from '.'; import { - type DiscountApplied, type ProductItemAdded, type ShoppingCartConfirmed, } from '../../testing/shoppingCart.domain'; @@ -23,7 +22,7 @@ import { void describe('Postgres Projections', () => { let postgres: StartedPostgreSqlContainer; let connectionString: string; - let given: PostgreSQLProjectionSpec; + let given: PostgreSQLProjectionSpec; let shoppingCartId: string; let clientId: string; diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjectionSpec.ts b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjectionSpec.ts index ff7c19f8..ed23b6e1 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjectionSpec.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjectionSpec.ts @@ -52,13 +52,13 @@ export type PostgreSQLProjectionAssert = (options: { connectionString: string; }) => Promise; -export type PostgreSQLProjectionSpecOptions = { - projection: PostgreSQLProjectionDefinition; +export type PostgreSQLProjectionSpecOptions = { + projection: PostgreSQLProjectionDefinition; } & DumboOptions; export const PostgreSQLProjectionSpec = { for: ( - options: PostgreSQLProjectionSpecOptions, + options: PostgreSQLProjectionSpecOptions, ): PostgreSQLProjectionSpec => { { const { projection, ...dumoOptions } = options;