Skip to content

Commit

Permalink
Adjusted projection definition
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Feb 24, 2025
1 parent 16d434b commit bb5f6e5
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { assertThatArray, type Event } from '@event-driven-io/emmett';
import { assertThatArray, type ReadEvent } from '@event-driven-io/emmett';
import {
PostgreSqlContainer,
StartedPostgreSqlContainer,
} from '@testcontainers/postgresql';
import { after, before, describe, it } from 'node:test';
import { v4 as uuid } from 'uuid';
import type { ShoppingCartConfirmed } from '../../testing/shoppingCart.domain';
import {
getPostgreSQLEventStore,
type PostgresEventStore,
} from '../postgreSQLEventStore';
import { pongoMultiStreamProjection } from '../projections';
import type { ProductItemAdded } from '../projections/postgresProjection.customid.int.spec';
import { postgreSQLEventStoreConsumer } from './postgreSQLEventStoreConsumer';
import type { PostgreSQLProcessorOptions } from './postgreSQLProcessor';

Expand Down Expand Up @@ -43,7 +46,7 @@ void describe('PostgreSQL event store started consumer', () => {
// Given
const guestId = uuid();
const streamName = `guestStay-${guestId}`;
const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];
Expand All @@ -52,20 +55,18 @@ void describe('PostgreSQL event store started consumer', () => {
events,
);

const result: GuestStayEvent[] = [];
const result: ShoppingCartSummaryEvent[] = [];

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
consumer.processor<GuestStayEvent>({
consumer.processor({
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
stopAfter: (event) =>
event.metadata.globalPosition ===
appendResult.lastEventGlobalPosition,
eachMessage: (event) => {
result.push(event);
},
});

try {
Expand All @@ -84,25 +85,23 @@ void describe('PostgreSQL event store started consumer', () => {
async () => {
// Given

const result: GuestStayEvent[] = [];
const result: ShoppingCartSummaryEvent[] = [];
let stopAfterPosition: bigint | undefined = undefined;

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
consumer.processor<GuestStayEvent>({
consumer.processor({
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
eachMessage: (event) => {
result.push(event);
},
});

const guestId = uuid();
const streamName = `guestStay-${guestId}`;
const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];
Expand Down Expand Up @@ -134,33 +133,31 @@ void describe('PostgreSQL event store started consumer', () => {
const otherGuestId = uuid();
const streamName = `guestStay-${guestId}`;

const initialEvents: GuestStayEvent[] = [
const initialEvents: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];
const { lastEventGlobalPosition: startPosition } =
await eventStore.appendToStream(streamName, initialEvents);

const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
];

const result: GuestStayEvent[] = [];
const result: ShoppingCartSummaryEvent[] = [];
let stopAfterPosition: bigint | undefined = undefined;

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
consumer.processor<GuestStayEvent>({
consumer.processor({
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
startFrom: { globalPosition: startPosition },
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
eachMessage: (event) => {
result.push(event);
},
});

try {
Expand Down Expand Up @@ -190,33 +187,31 @@ void describe('PostgreSQL event store started consumer', () => {
const otherGuestId = uuid();
const streamName = `guestStay-${guestId}`;

const initialEvents: GuestStayEvent[] = [
const initialEvents: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];

await eventStore.appendToStream(streamName, initialEvents);

const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
];

const result: GuestStayEvent[] = [];
const result: ShoppingCartSummaryEvent[] = [];
let stopAfterPosition: bigint | undefined = undefined;

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
consumer.processor<GuestStayEvent>({
consumer.processor({
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
startFrom: 'CURRENT',
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
eachMessage: (event) => {
result.push(event);
},
});

try {
Expand Down Expand Up @@ -249,7 +244,7 @@ void describe('PostgreSQL event store started consumer', () => {
const otherGuestId = uuid();
const streamName = `guestStay-${guestId}`;

const initialEvents: GuestStayEvent[] = [
const initialEvents: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];
Expand All @@ -258,26 +253,24 @@ void describe('PostgreSQL event store started consumer', () => {
initialEvents,
);

const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
];

let result: GuestStayEvent[] = [];
let result: ShoppingCartSummaryEvent[] = [];
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
consumer.processor<GuestStayEvent>({
consumer.processor({
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
startFrom: 'CURRENT',
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
eachMessage: (event) => {
result.push(event);
},
});

await consumer.start();
Expand Down Expand Up @@ -314,7 +307,7 @@ void describe('PostgreSQL event store started consumer', () => {
const otherGuestId = uuid();
const streamName = `guestStay-${guestId}`;

const initialEvents: GuestStayEvent[] = [
const initialEvents: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId } },
{ type: 'GuestCheckedOut', data: { guestId } },
];
Expand All @@ -323,30 +316,29 @@ void describe('PostgreSQL event store started consumer', () => {
initialEvents,
);

const events: GuestStayEvent[] = [
const events: ShoppingCartSummaryEvent[] = [
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
];

let result: GuestStayEvent[] = [];
let result: ShoppingCartSummaryEvent[] = [];
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;

const processorOptions: PostgreSQLProcessorOptions<GuestStayEvent> = {
processorId: uuid(),
startFrom: 'CURRENT',
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
eachMessage: (event) => {
result.push(event);
},
};
const processorOptions: PostgreSQLProcessorOptions<ShoppingCartSummaryEvent> =
{
processorId: uuid(),
projection: shoppingCartsSummaryProjection,
startFrom: 'CURRENT',
stopAfter: (event) =>
event.metadata.globalPosition === stopAfterPosition,
};

// When
const consumer = postgreSQLEventStoreConsumer({
connectionString,
});
try {
consumer.processor<GuestStayEvent>(processorOptions);
consumer.processor<ShoppingCartSummaryEvent>(processorOptions);

await consumer.start();
} finally {
Expand All @@ -360,7 +352,7 @@ void describe('PostgreSQL event store started consumer', () => {
const newConsumer = postgreSQLEventStoreConsumer({
connectionString,
});
newConsumer.processor<GuestStayEvent>(processorOptions);
newConsumer.processor<ShoppingCartSummaryEvent>(processorOptions);

try {
const consumerPromise = newConsumer.start();
Expand All @@ -382,7 +374,48 @@ void describe('PostgreSQL event store started consumer', () => {
});
});

type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>;
type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>;
type ShoppingCartSummary = {
_id?: string;
activeCount: number;
activeShopingCarts: string[];
};

const shoppingCartsSummaryCollectionName = 'shoppingCartsSummary';

type GuestStayEvent = GuestCheckedIn | GuestCheckedOut;
export type ShoppingCartSummaryEvent = ProductItemAdded | ShoppingCartConfirmed;

const evolve = (
document: ShoppingCartSummary,
{ type, metadata: { streamName } }: ReadEvent<ShoppingCartSummaryEvent>,
): ShoppingCartSummary => {
switch (type) {
case 'ProductItemAdded': {
if (!document.activeShopingCarts.includes(streamName)) {
document.activeShopingCarts.push(streamName);
document.activeCount++;
}

return document;
}
case 'ShoppingCartConfirmed':
document.activeShopingCarts = document.activeShopingCarts.filter(
(item) => item !== streamName,
);
document.activeCount--;

return document;
default:
return document;
}
};

const shoppingCartsSummaryProjection = pongoMultiStreamProjection({
getDocumentId: (event) => event.metadata.streamName.split(':')[1]!,
collectionName: shoppingCartsSummaryCollectionName,
evolve,
canHandle: ['ProductItemAdded', 'ShoppingCartConfirmed'],
initialState: () => ({
activeCount: 0,
activeShopingCarts: [],
}),
});
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,16 @@ export type GenericPostgreSQLProcessorOptions<EventType extends Event = Event> =

export type PostgreSQLProjectionProcessorOptions<
EventType extends Event = Event,
> = { type: 'projection' } & PostgreSQLProjectionDefinition<EventType> & {
partition?: string;
startFrom?: PostgreSQLProcessorStartFrom;
stopAfter?: (
message: ReadEvent<EventType, ReadEventMetadataWithGlobalPosition>,
) => boolean;
};
> = {
processorId?: string;
version?: number;
projection: PostgreSQLProjectionDefinition<EventType>;
partition?: string;
startFrom?: PostgreSQLProcessorStartFrom;
stopAfter?: (
message: ReadEvent<EventType, ReadEventMetadataWithGlobalPosition>,
) => boolean;
};

export type PostgreSQLProcessorOptions<EventType extends Event = Event> =
| GenericPostgreSQLProcessorOptions<EventType>
Expand Down Expand Up @@ -202,12 +205,14 @@ const genericPostgreSQLProcessor = <EventType extends Event = Event>(
export const postgreSQLProjectionProcessor = <EventType extends Event = Event>(
options: PostgreSQLProjectionProcessorOptions<EventType>,
): PostgreSQLProcessor => {
const projection = options.projection;

return genericPostgreSQLProcessor<EventType>({
processorId: `projection:${options.name}`,
processorId: options.processorId ?? `projection:${projection.name}`,
eachMessage: async (event, context) => {
if (!options.canHandle.includes(event.type)) return;
if (!projection.canHandle.includes(event.type)) return;

await options.handle([event], context);
await projection.handle([event], context);
},
...options,
});
Expand All @@ -216,7 +221,7 @@ export const postgreSQLProjectionProcessor = <EventType extends Event = Event>(
export const postgreSQLProcessor = <EventType extends Event = Event>(
options: PostgreSQLProcessorOptions<EventType>,
): PostgreSQLProcessor => {
if ('type' in options) {
if ('projection' in options) {
return postgreSQLProjectionProcessor(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ export const handleProjections = async <EventType extends Event = Event>(

export const postgreSQLProjection = <EventType extends Event>(
definition: PostgreSQLProjectionDefinition<EventType>,
): PostgreSQLProjectionDefinition =>
): PostgreSQLProjectionDefinition<EventType> =>
projection<
EventType,
PostgresReadEventMetadata,
PostgreSQLProjectionHandlerContext,
PostgreSQLProjectionDefinition<EventType>
>(definition) as PostgreSQLProjectionDefinition;
>(definition);

export const postgreSQLRawBatchSQLProjection = <EventType extends Event>(
handle: (
events: EventType[],
context: PostgreSQLProjectionHandlerContext,
) => Promise<SQL[]> | SQL[],
...canHandle: CanHandle<EventType>
): PostgreSQLProjectionDefinition =>
): PostgreSQLProjectionDefinition<EventType> =>
postgreSQLProjection<EventType>({
canHandle,
handle: async (events, context) => {
Expand All @@ -116,7 +116,7 @@ export const postgreSQLRawSQLProjection = <EventType extends Event>(
context: PostgreSQLProjectionHandlerContext,
) => Promise<SQL> | SQL,
...canHandle: CanHandle<EventType>
): PostgreSQLProjectionDefinition =>
): PostgreSQLProjectionDefinition<EventType> =>
postgreSQLRawBatchSQLProjection<EventType>(
async (events, context) => {
const sqls: SQL[] = [];
Expand Down
Loading

0 comments on commit bb5f6e5

Please sign in to comment.