diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts b/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts index 4c2a22db..87ca4fc3 100644 --- a/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts +++ b/src/packages/emmett-postgresql/src/eventStore/projections/pongo/projections.ts @@ -167,6 +167,7 @@ export type PongoSingleStreamProjectionOptions< PostgresReadEventMetadata = PostgresReadEventMetadata, > = { canHandle: CanHandle; + getDocumentId?: (event: ReadEvent) => string; collectionName: string; } & ( @@ -201,6 +202,7 @@ export const pongoSingleStreamProjection = < ): PostgreSQLProjectionDefinition => { return pongoMultiStreamProjection({ ...options, - getDocumentId: (event) => event.metadata.streamName, + getDocumentId: + options.getDocumentId ?? ((event) => event.metadata.streamName), }); }; diff --git a/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.customid.int.spec.ts b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.customid.int.spec.ts new file mode 100644 index 00000000..7eef501d --- /dev/null +++ b/src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.customid.int.spec.ts @@ -0,0 +1,118 @@ +import type { Event } from '@event-driven-io/emmett'; +import { + PostgreSqlContainer, + StartedPostgreSqlContainer, +} from '@testcontainers/postgresql'; +import { after, before, beforeEach, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + expectPongoDocuments, + pongoSingleStreamProjection, + PostgreSQLProjectionSpec, +} from '.'; +import type { + DiscountApplied, + PricedProductItem, +} from '../../testing/shoppingCart.domain'; + +export type ProductItemAdded = Event< + 'ProductItemAdded', + { productItem: PricedProductItem; shoppingCartId: string } +>; + +void describe('Postgres Projections', () => { + let postgres: StartedPostgreSqlContainer; + let connectionString: string; + let given: PostgreSQLProjectionSpec; + let shoppingCartId: string; + let streamName: string; + + before(async () => { + postgres = await new PostgreSqlContainer().start(); + connectionString = postgres.getConnectionUri(); + + given = PostgreSQLProjectionSpec.for({ + projection: shoppingCartShortInfoProjection, + connectionString, + }); + }); + + beforeEach(() => { + shoppingCartId = uuid(); + streamName = `shoppingCart:${shoppingCartId}`; + }); + + after(async () => { + try { + await postgres.stop(); + } catch (error) { + console.log(error); + } + }); + + void it('uses custom document id instead of stream name assigned in projection evolve', () => + given([]) + .when([ + { + type: 'ProductItemAdded', + data: { + productItem: { price: 100, productId: 'shoes', quantity: 100 }, + shoppingCartId, + }, + metadata: { + streamName, + }, + }, + ]) + .then( + expectPongoDocuments + .fromCollection( + shoppingCartShortInfoCollectionName, + ) + .withId(shoppingCartId) + .toBeEqual({ + _id: shoppingCartId, + productItemsCount: 100, + totalAmount: 10000, + }), + )); +}); + +type ShoppingCartShortInfo = { + _id?: string; + productItemsCount: number; + totalAmount: number; +}; + +const shoppingCartShortInfoCollectionName = 'shoppingCartShortInfo'; + +const evolve = ( + document: ShoppingCartShortInfo, + { type, data: event }: ProductItemAdded, +): ShoppingCartShortInfo => { + switch (type) { + case 'ProductItemAdded': + return { + ...document, + _id: event.shoppingCartId, + totalAmount: + document.totalAmount + + event.productItem.price * event.productItem.quantity, + productItemsCount: + document.productItemsCount + event.productItem.quantity, + }; + default: + return document; + } +}; + +const shoppingCartShortInfoProjection = pongoSingleStreamProjection({ + collectionName: shoppingCartShortInfoCollectionName, + evolve, + getDocumentId: (event) => event.data.shoppingCartId, + canHandle: ['ProductItemAdded'], + initialState: () => ({ + productItemsCount: 0, + totalAmount: 0, + }), +});