Skip to content

Commit

Permalink
Added optional getDocumentId selector for single stream projection
Browse files Browse the repository at this point in the history
Thanks to that, one can still use custom id based on the event data
  • Loading branch information
oskardudycz committed Feb 11, 2025
1 parent 3eda928 commit eb6f89a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export type PongoSingleStreamProjectionOptions<
PostgresReadEventMetadata = PostgresReadEventMetadata,
> = {
canHandle: CanHandle<EventType>;
getDocumentId?: (event: ReadEvent<EventType>) => string;

collectionName: string;
} & (
Expand Down Expand Up @@ -201,6 +202,7 @@ export const pongoSingleStreamProjection = <
): PostgreSQLProjectionDefinition => {
return pongoMultiStreamProjection<Document, EventType, EventMetaDataType>({
...options,
getDocumentId: (event) => event.metadata.streamName,
getDocumentId:
options.getDocumentId ?? ((event) => event.metadata.streamName),
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import type { Event } from '@event-driven-io/emmett';
import {
PostgreSqlContainer,
StartedPostgreSqlContainer,
} from '@testcontainers/postgresql';
import { after, before, beforeEach, describe, it } from 'node:test';
import { v4 as uuid } from 'uuid';
import {
expectPongoDocuments,
pongoSingleStreamProjection,
PostgreSQLProjectionSpec,
} from '.';
import type {
DiscountApplied,
PricedProductItem,
} from '../../testing/shoppingCart.domain';

export type ProductItemAdded = Event<
'ProductItemAdded',
{ productItem: PricedProductItem; shoppingCartId: string }
>;

void describe('Postgres Projections', () => {
let postgres: StartedPostgreSqlContainer;
let connectionString: string;
let given: PostgreSQLProjectionSpec<ProductItemAdded | DiscountApplied>;
let shoppingCartId: string;
let streamName: string;

before(async () => {
postgres = await new PostgreSqlContainer().start();
connectionString = postgres.getConnectionUri();

given = PostgreSQLProjectionSpec.for({
projection: shoppingCartShortInfoProjection,
connectionString,
});
});

beforeEach(() => {
shoppingCartId = uuid();
streamName = `shoppingCart:${shoppingCartId}`;
});

after(async () => {
try {
await postgres.stop();
} catch (error) {
console.log(error);
}
});

void it('uses custom document id instead of stream name assigned in projection evolve', () =>
given([])
.when([
{
type: 'ProductItemAdded',
data: {
productItem: { price: 100, productId: 'shoes', quantity: 100 },
shoppingCartId,
},
metadata: {
streamName,
},
},
])
.then(
expectPongoDocuments
.fromCollection<ShoppingCartShortInfo>(
shoppingCartShortInfoCollectionName,
)
.withId(shoppingCartId)
.toBeEqual({
_id: shoppingCartId,
productItemsCount: 100,
totalAmount: 10000,
}),
));
});

type ShoppingCartShortInfo = {
_id?: string;
productItemsCount: number;
totalAmount: number;
};

const shoppingCartShortInfoCollectionName = 'shoppingCartShortInfo';

const evolve = (
document: ShoppingCartShortInfo,
{ type, data: event }: ProductItemAdded,
): ShoppingCartShortInfo => {
switch (type) {
case 'ProductItemAdded':
return {
...document,
_id: event.shoppingCartId,
totalAmount:
document.totalAmount +
event.productItem.price * event.productItem.quantity,
productItemsCount:
document.productItemsCount + event.productItem.quantity,
};
default:
return document;
}
};

const shoppingCartShortInfoProjection = pongoSingleStreamProjection({
collectionName: shoppingCartShortInfoCollectionName,
evolve,
getDocumentId: (event) => event.data.shoppingCartId,
canHandle: ['ProductItemAdded'],
initialState: () => ({
productItemsCount: 0,
totalAmount: 0,
}),
});

0 comments on commit eb6f89a

Please sign in to comment.