diff --git a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts index 8f62a8df..36367910 100644 --- a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts +++ b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts @@ -24,6 +24,7 @@ import { StreamNotFoundError, WrongExpectedVersionError, jsonEvent, + type AllStreamJSONRecordedEvent, type AppendExpectedRevision, type ReadStreamOptions as ESDBReadStreamOptions, type JSONRecordedEvent, @@ -54,7 +55,14 @@ export type EventStoreDBReadEvent = ReadEvent< EventStoreDBReadEventMetadata >; -export type EventStoreDBEventStore = EventStore; +export interface EventStoreDBEventStore + extends EventStore { + appendToStream( + streamName: string, + events: EventType[], + options?: AppendToStreamOptions, + ): Promise; +} export const getEventStoreDBEventStore = ( eventStore: EventStoreDBClient, @@ -199,8 +207,8 @@ export const getEventStoreDBEventStore = ( }; }; -const mapFromESDBEvent = ( - event: JSONRecordedEvent, +export const mapFromESDBEvent = ( + event: JSONRecordedEvent | AllStreamJSONRecordedEvent, ): ReadEvent => { return >{ type: event.type, diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts new file mode 100644 index 00000000..a3a84949 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts @@ -0,0 +1,358 @@ +import { assertThatArray, type Event } from '@event-driven-io/emmett'; +import { + EventStoreDBContainer, + StartedEventStoreDBContainer, +} from '@event-driven-io/emmett-testcontainers'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + type EventStoreDBEventStore, + getEventStoreDBEventStore, +} from '../eventstoreDBEventStore'; +import { eventStoreDBEventStoreConsumer } from './eventStoreDBEventStoreConsumer'; +import type { EventStoreDBEventStoreSubscriptionOptions } from './eventStoreDBEventStoreSubscription'; + +void describe('EventStoreDB event store started consumer', () => { + let eventStoreDB: StartedEventStoreDBContainer; + let connectionString: string; + let eventStore: EventStoreDBEventStore; + + before(async () => { + eventStoreDB = await new EventStoreDBContainer().start(); + connectionString = eventStoreDB.getConnectionString(); + eventStore = getEventStoreDBEventStore(eventStoreDB.getClient()); + }); + + after(async () => { + try { + await eventStoreDB.stop(); + } catch (error) { + console.log(error); + } + }); + + void describe('eachMessage', () => { + void it('handles all events appended to event store BEFORE subscription was started', async () => { + // Given + const guestId = uuid(); + const streamName = `guestStay-${guestId}`; + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const appendResult = await eventStore.appendToStream(streamName, events); + + const result: GuestStayEvent[] = []; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + consumer.subscribe({ + subscriptionId: uuid(), + stopAfter: (event) => + event.metadata.globalPosition === + appendResult.lastEventGlobalPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + await consumer.start(); + + assertThatArray(result).containsElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it('handles all events appended to event store AFTER subscription was started', async () => { + // Given + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + consumer.subscribe({ + subscriptionId: uuid(), + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + const guestId = uuid(); + const streamName = `guestStay-${guestId}`; + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it('handles ONLY events AFTER provided global position', async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition: startPosition } = + await eventStore.appendToStream(streamName, initialEvents); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: { globalPosition: startPosition }, + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it('handles all events when CURRENT position is NOT stored', async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + + await eventStore.appendToStream(streamName, initialEvents); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsElementsMatching([ + ...initialEvents, + ...events, + ]); + } finally { + await consumer.close(); + } + }); + + void it('handles only new events when CURRENT position is stored for restarted consumer', async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition } = await eventStore.appendToStream( + streamName, + initialEvents, + ); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + let result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + await consumer.start(); + await consumer.stop(); + + result = []; + + stopAfterPosition = undefined; + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it('handles only new events when CURRENT position is stored for a new consumer', async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition } = await eventStore.appendToStream( + streamName, + initialEvents, + ); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + let result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + const subscriptionOptions: EventStoreDBEventStoreSubscriptionOptions = + { + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + try { + consumer.subscribe(subscriptionOptions); + + await consumer.start(); + } finally { + await consumer.close(); + } + + result = []; + + stopAfterPosition = undefined; + + const newConsumer = eventStoreDBEventStoreConsumer({ + connectionString, + }); + newConsumer.subscribe(subscriptionOptions); + + try { + const consumerPromise = newConsumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await newConsumer.close(); + } + }); + }); +}); + +type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>; +type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>; + +type GuestStayEvent = GuestCheckedIn | GuestCheckedOut; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts new file mode 100644 index 00000000..edc085ab --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts @@ -0,0 +1,148 @@ +import { + assertFails, + assertFalse, + assertThrowsAsync, + assertTrue, + EmmettError, +} from '@event-driven-io/emmett'; +import { + EventStoreDBContainer, + StartedEventStoreDBContainer, +} from '@event-driven-io/emmett-testcontainers'; +import { after, afterEach, before, beforeEach, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + eventStoreDBEventStoreConsumer, + type EventStoreDBEventStoreConsumer, +} from './eventStoreDBEventStoreConsumer'; +import type { EventStoreDBEventStoreSubscription } from './eventStoreDBEventStoreSubscription'; +import { + type EventStoreDBEventStore, + getEventStoreDBEventStore, +} from '../eventstoreDBEventStore'; + +void describe('EventStoreDB event store consumer', () => { + let eventStoreDB: StartedEventStoreDBContainer; + let connectionString: string; + let eventStore: EventStoreDBEventStore; + const dummySubscription: EventStoreDBEventStoreSubscription = { + id: uuid(), + start: () => Promise.resolve('BEGINNING'), + handle: () => Promise.resolve(), + isActive: false, + }; + + before(async () => { + eventStoreDB = await new EventStoreDBContainer().start(); + connectionString = eventStoreDB.getConnectionString(); + eventStore = getEventStoreDBEventStore(eventStoreDB.getClient()); + }); + + after(async () => { + try { + await eventStoreDB.stop(); + } catch (error) { + console.log(error); + } + }); + + void it('creates not-started consumer for the specified connection string', () => { + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + subscriptions: [dummySubscription], + }); + + assertFalse(consumer.isRunning); + }); + + void it('creates not-started consumer if connection string targets not existing EventStoreDB database', () => { + const connectionStringToNotExistingDB = + 'postgresql://postgres:postgres@not-existing-database:5432/postgres'; + const consumer = eventStoreDBEventStoreConsumer({ + connectionString: connectionStringToNotExistingDB, + subscriptions: [dummySubscription], + }); + + assertFalse(consumer.isRunning); + }); + + void describe('created consumer', () => { + let consumer: EventStoreDBEventStoreConsumer; + + beforeEach(() => { + consumer = eventStoreDBEventStoreConsumer({ + connectionString, + subscriptions: [dummySubscription], + }); + }); + afterEach(() => consumer.stop()); + + void it('subscribes to existing event store', () => { + consumer.start().catch(() => assertFails()); + + assertTrue(consumer.isRunning); + }); + + void it('fails to start if connection string targets not existing EventStoreDB database', async () => { + const connectionStringToNotExistingDB = + 'postgresql://postgres:postgres@not-existing-database:5432/postgres'; + const consumerToNotExistingServer = eventStoreDBEventStoreConsumer({ + connectionString: connectionStringToNotExistingDB, + subscriptions: [dummySubscription], + }); + await assertThrowsAsync( + () => consumerToNotExistingServer.start(), + (error) => { + return 'code' in error && error.code === 'EAI_AGAIN'; + }, + ); + }); + + void it('fails to start if there are no subscriptions', async () => { + const consumerToNotExistingServer = eventStoreDBEventStoreConsumer({ + connectionString, + subscriptions: [], + }); + await assertThrowsAsync( + () => consumerToNotExistingServer.start(), + (error) => { + return ( + error.message === + 'Cannot start consumer without at least a single subscription' + ); + }, + ); + }); + + void it(`stopping not started consumer doesn't fail`, async () => { + await consumer.stop(); + + assertFalse(consumer.isRunning); + }); + + void it(`stopping not started consumer is idempotent`, async () => { + await consumer.stop(); + await consumer.stop(); + + assertFalse(consumer.isRunning); + }); + }); + + void describe('started consumer', () => { + let consumer: EventStoreDBEventStoreConsumer; + + beforeEach(() => { + consumer = eventStoreDBEventStoreConsumer({ + connectionString, + subscriptions: [dummySubscription], + }); + }); + afterEach(() => consumer.stop()); + + void it('stops started consumer', async () => { + await consumer.stop(); + + assertFalse(consumer.isRunning); + }); + }); +}); diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts new file mode 100644 index 00000000..880d52d6 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts @@ -0,0 +1,143 @@ +import { EmmettError, type Event } from '@event-driven-io/emmett'; +import { EventStoreDBClient } from '@eventstore/db-client'; +import { + eventStoreDBEventStoreSubscription, + type EventStoreDBEventStoreSubscription, + type EventStoreDBEventStoreSubscriptionOptions, +} from './eventStoreDBEventStoreSubscription'; +import { + DefaultEventStoreDBEventStoreSubscriptionBatchSize, + eventStoreDBEventStoreMessageBatchPuller, + zipEventStoreDBEventStoreMessageBatchPullerStartFrom, + type EventStoreDBEventStoreMessageBatchPuller, + type EventStoreDBEventStoreMessagesBatchHandler, +} from './messageBatchProcessing'; + +export type EventStoreDBEventStoreConsumerOptions = { + connectionString: string; + subscriptions?: EventStoreDBEventStoreSubscription[]; + pulling?: { + batchSize?: number; + }; +}; + +export type EventStoreDBEventStoreConsumer = Readonly<{ + connectionString: string; + isRunning: boolean; + subscriptions: EventStoreDBEventStoreSubscription[]; + subscribe: ( + options: EventStoreDBEventStoreSubscriptionOptions, + ) => EventStoreDBEventStoreSubscription; + start: () => Promise; + stop: () => Promise; + close: () => Promise; +}>; + +export const eventStoreDBEventStoreConsumer = ( + options: EventStoreDBEventStoreConsumerOptions, +): EventStoreDBEventStoreConsumer => { + let isRunning = false; + const { connectionString, pulling } = options; + const subscriptions = options.subscriptions ?? []; + + let start: Promise; + + let currentMessagePooler: + | EventStoreDBEventStoreMessageBatchPuller + | undefined; + + const eventStoreDBClient = + EventStoreDBClient.connectionString(connectionString); + + const eachBatch: EventStoreDBEventStoreMessagesBatchHandler = async ( + messagesBatch, + ) => { + const activeSubscriptions = subscriptions.filter((s) => s.isActive); + + if (activeSubscriptions.length === 0) + return { + type: 'STOP', + reason: 'No active subscriptions', + }; + + const result = await Promise.allSettled( + activeSubscriptions.map((s) => { + // TODO: Add here filtering to only pass messages that can be handled by subscription + return s.handle(messagesBatch, { eventStoreDBClient }); + }), + ); + + return result.some( + (r) => r.status === 'fulfilled' && r.value?.type !== 'STOP', + ) + ? undefined + : { + type: 'STOP', + }; + }; + + const messagePuller = (currentMessagePooler = + eventStoreDBEventStoreMessageBatchPuller({ + eventStoreDBClient, + eachBatch, + batchSize: + pulling?.batchSize ?? + DefaultEventStoreDBEventStoreSubscriptionBatchSize, + })); + + const stop = async () => { + if (!isRunning) return; + isRunning = false; + if (currentMessagePooler) { + await currentMessagePooler.stop(); + currentMessagePooler = undefined; + } + await start; + }; + + return { + connectionString, + subscriptions, + get isRunning() { + return isRunning; + }, + subscribe: ( + options: EventStoreDBEventStoreSubscriptionOptions, + ): EventStoreDBEventStoreSubscription => { + const subscription = + eventStoreDBEventStoreSubscription(options); + + subscriptions.push(subscription); + + return subscription; + }, + start: () => { + if (isRunning) return start; + + start = (async () => { + if (subscriptions.length === 0) + return Promise.reject( + new EmmettError( + 'Cannot start consumer without at least a single subscription', + ), + ); + + isRunning = true; + + const startFrom = zipEventStoreDBEventStoreMessageBatchPullerStartFrom( + await Promise.all( + subscriptions.map((o) => o.start(eventStoreDBClient)), + ), + ); + + return messagePuller.start({ startFrom }); + })(); + + return start; + }, + stop, + close: async () => { + await stop(); + }, + }; +}; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreSubscription.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreSubscription.ts new file mode 100644 index 00000000..ac9cdb64 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreSubscription.ts @@ -0,0 +1,167 @@ +import { + EmmettError, + type Event, + type ReadEvent, + type ReadEventMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import type { EventStoreDBClient } from '@eventstore/db-client'; +import type { EventStoreDBEventStoreMessageBatchPullerStartFrom } from './messageBatchProcessing'; + +export type EventStoreDBEventStoreSubscriptionEventsBatch< + EventType extends Event = Event, +> = { + messages: ReadEvent[]; +}; + +export type EventStoreDBEventStoreSubscription< + EventType extends Event = Event, +> = { + id: string; + start: ( + eventStoreDBClient: EventStoreDBClient, + ) => Promise; + isActive: boolean; + handle: ( + messagesBatch: EventStoreDBEventStoreSubscriptionEventsBatch, + context: { eventStoreDBClient: EventStoreDBClient }, + ) => Promise; +}; + +export const EventStoreDBEventStoreSubscription = { + result: { + skip: (options?: { + reason?: string; + }): EventStoreDBEventStoreSubscriptionMessageHandlerResult => ({ + type: 'SKIP', + ...(options ?? {}), + }), + stop: (options?: { + reason?: string; + error?: EmmettError; + }): EventStoreDBEventStoreSubscriptionMessageHandlerResult => ({ + type: 'STOP', + ...(options ?? {}), + }), + }, +}; + +export type EventStoreDBEventStoreSubscriptionMessageHandlerResult = + | void + | { type: 'SKIP'; reason?: string } + | { type: 'STOP'; reason?: string; error?: EmmettError }; + +export type EventStoreDBEventStoreSubscriptionEachMessageHandler< + EventType extends Event = Event, +> = ( + event: ReadEvent, +) => + | Promise + | EventStoreDBEventStoreSubscriptionMessageHandlerResult; + +export type EventStoreDBEventStoreSubscriptionStartFrom = + | EventStoreDBEventStoreMessageBatchPullerStartFrom + | 'CURRENT'; + +export type EventStoreDBEventStoreSubscriptionOptions< + EventType extends Event = Event, +> = { + subscriptionId: string; + version?: number; + partition?: string; + startFrom?: EventStoreDBEventStoreSubscriptionStartFrom; + stopAfter?: ( + message: ReadEvent, + ) => boolean; + eachMessage: EventStoreDBEventStoreSubscriptionEachMessageHandler; +}; + +export const eventStoreDBEventStoreSubscription = < + EventType extends Event = Event, +>( + options: EventStoreDBEventStoreSubscriptionOptions, +): EventStoreDBEventStoreSubscription => { + const { eachMessage } = options; + let isActive = true; + //let lastProcessedPosition: bigint | null = null; + + return { + id: options.subscriptionId, + start: ( + _eventStoreDBClient: EventStoreDBClient, + ): Promise< + EventStoreDBEventStoreMessageBatchPullerStartFrom | undefined + > => { + isActive = true; + if (options.startFrom !== 'CURRENT') + return Promise.resolve(options.startFrom); + + // const { lastProcessedPosition } = await readSubscriptionCheckpoint( + // execute, + // { + // subscriptionId: options.subscriptionId, + // partition: options.partition, + // }, + // ); + + // if (lastProcessedPosition === null) return 'BEGINNING'; + + // return { globalPosition: lastProcessedPosition }; + return Promise.resolve('BEGINNING'); + }, + get isActive() { + return isActive; + }, + handle: async ( + { messages }, + { eventStoreDBClient }, + ): Promise => { + if (!isActive) return; + + let result: + | EventStoreDBEventStoreSubscriptionMessageHandlerResult + | undefined = undefined; + + //let lastProcessedPosition: bigint | null = null; + + for (const message of messages) { + const typedMessage = message as ReadEvent< + EventType, + ReadEventMetadataWithGlobalPosition + >; + + const messageProcessingResult = await eachMessage(typedMessage); + + // TODO: Add correct handling of the storing checkpoint + // await storeSubscriptionCheckpoint(tx.execute, { + // subscriptionId: options.subscriptionId, + // version: options.version, + // lastProcessedPosition, + // newPosition: typedMessage.metadata.globalPosition, + // partition: options.partition, + // }); + + //lastProcessedPosition = typedMessage.metadata.globalPosition; + + if ( + messageProcessingResult && + messageProcessingResult.type === 'STOP' + ) { + isActive = false; + result = messageProcessingResult; + break; + } + + if (options.stopAfter && options.stopAfter(typedMessage)) { + isActive = false; + result = { type: 'STOP', reason: 'Stop condition reached' }; + break; + } + + if (messageProcessingResult && messageProcessingResult.type === 'SKIP') + continue; + } + + return result; + }, + }; +}; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/index.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/index.ts new file mode 100644 index 00000000..03601d01 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/index.ts @@ -0,0 +1,3 @@ +export * from './eventStoreDBEventStoreConsumer'; +export * from './eventStoreDBEventStoreSubscription'; +export * from './messageBatchProcessing'; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts new file mode 100644 index 00000000..c8313665 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts @@ -0,0 +1,190 @@ +import type { + EmmettError, + Event, + ReadEvent, + ReadEventMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import { + END, + EventStoreDBClient, + excludeSystemEvents, + START, + type AllStreamJSONRecordedEvent, + type AllStreamResolvedEvent, +} from '@eventstore/db-client'; +import { finished, Readable } from 'stream'; +import { mapFromESDBEvent } from '../../eventstoreDBEventStore'; + +export const DefaultEventStoreDBEventStoreSubscriptionBatchSize = 100; +export const DefaultEventStoreDBEventStoreSubscriptionPullingFrequencyInMs = 50; + +export type EventStoreDBEventStoreMessagesBatch< + EventType extends Event = Event, +> = { + messages: ReadEvent[]; +}; + +export type EventStoreDBEventStoreMessagesBatchHandlerResult = void | { + type: 'STOP'; + reason?: string; + error?: EmmettError; +}; + +export type EventStoreDBEventStoreMessagesBatchHandler< + EventType extends Event = Event, +> = ( + messagesBatch: EventStoreDBEventStoreMessagesBatch, +) => + | Promise + | EventStoreDBEventStoreMessagesBatchHandlerResult; + +export type EventStoreDBEventStoreMessageBatchPullerOptions< + EventType extends Event = Event, +> = { + eventStoreDBClient: EventStoreDBClient; + batchSize: number; + eachBatch: EventStoreDBEventStoreMessagesBatchHandler; +}; + +export type EventStoreDBEventStoreMessageBatchPullerStartFrom = + | { globalPosition: bigint } + | 'BEGINNING' + | 'END'; + +export type EventStoreDBEventStoreMessageBatchPullerStartOptions = { + startFrom: EventStoreDBEventStoreMessageBatchPullerStartFrom; +}; + +export type EventStoreDBEventStoreMessageBatchPuller = { + isRunning: boolean; + start( + options: EventStoreDBEventStoreMessageBatchPullerStartOptions, + ): Promise; + stop(): Promise; +}; + +export const eventStoreDBEventStoreMessageBatchPuller = < + EventType extends Event = Event, +>({ + eventStoreDBClient, + //batchSize, + eachBatch, +}: EventStoreDBEventStoreMessageBatchPullerOptions): EventStoreDBEventStoreMessageBatchPuller => { + let isRunning = false; + + let start: Promise; + + const pullMessages = async ( + options: EventStoreDBEventStoreMessageBatchPullerStartOptions, + ) => { + const fromPosition = + options.startFrom === 'BEGINNING' + ? START + : options.startFrom === 'END' + ? END + : { + prepare: options.startFrom.globalPosition, + commit: options.startFrom.globalPosition, + }; + + const subscription = eventStoreDBClient.subscribeToAll({ + fromPosition, + filter: excludeSystemEvents(), + }); + + return new Promise((resolve, reject) => { + finished( + subscription.on( + 'data', + async (resolvedEvent: AllStreamResolvedEvent) => { + if (!resolvedEvent.event) return; + + const event = mapFromESDBEvent( + resolvedEvent.event as AllStreamJSONRecordedEvent, + ); + + const result = await eachBatch({ messages: [event] }); + + if (result && result.type === 'STOP') { + subscription.destroy(); + } + }, + ) as unknown as Readable, + (error) => { + if (!error) { + console.info(`Stopping subscription.`); + resolve(); + return; + } + console.error(`Received error: ${JSON.stringify(error)}.`); + reject(error); + }, + ); + }); + //return subscription; + + // let waitTime = 100; + + // do { + // const { messages, currentGlobalPosition, areEventsLeft } = + // await readMessagesBatch(executor, readMessagesOptions); + + // if (messages.length > 0) { + // const result = await eachBatch({ messages }); + + // if (result && result.type === 'STOP') { + // isRunning = false; + // break; + // } + // } + + // readMessagesOptions.after = currentGlobalPosition; + + // await new Promise((resolve) => setTimeout(resolve, waitTime)); + + // if (!areEventsLeft) { + // waitTime = Math.min(waitTime * 2, 1000); + // } else { + // waitTime = pullingFrequencyInMs; + // } + // } while (isRunning); + }; + + return { + get isRunning() { + return isRunning; + }, + start: (options) => { + if (isRunning) return start; + + start = (async () => { + isRunning = true; + + return pullMessages(options); + })(); + + return start; + }, + stop: async () => { + if (!isRunning) return; + isRunning = false; + await start; + }, + }; +}; + +export const zipEventStoreDBEventStoreMessageBatchPullerStartFrom = ( + options: (EventStoreDBEventStoreMessageBatchPullerStartFrom | undefined)[], +): EventStoreDBEventStoreMessageBatchPullerStartFrom => { + if ( + options.length === 0 || + options.some((o) => o === undefined || o === 'BEGINNING') + ) + return 'BEGINNING'; + + if (options.every((o) => o === 'END')) return 'END'; + + return options + .filter((o) => o !== undefined && o !== 'BEGINNING' && o !== 'END') + .sort((a, b) => (a > b ? 1 : -1))[0]!; +}; diff --git a/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreConsumer.ts b/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreConsumer.ts index fadc81bd..82e8547d 100644 --- a/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreConsumer.ts +++ b/src/packages/emmett-postgresql/src/eventStore/subscriptions/postgreSQLEventStoreConsumer.ts @@ -17,7 +17,7 @@ import { export type PostgreSQLEventStoreConsumerOptions = { connectionString: string; subscriptions?: PostgreSQLEventStoreSubscription[]; - pooling?: { + pulling?: { batchSize?: number; pullingFrequencyInMs?: number; }; @@ -39,12 +39,12 @@ export const postgreSQLEventStoreConsumer = ( options: PostgreSQLEventStoreConsumerOptions, ): PostgreSQLEventStoreConsumer => { let isRunning = false; - const { connectionString, pooling } = options; + const { connectionString, pulling } = options; const subscriptions = options.subscriptions ?? []; let start: Promise; - let currentMessagePooler: PostgreSQLEventStoreMessageBatchPuller | undefined; + let currentMessagePuller: PostgreSQLEventStoreMessageBatchPuller | undefined; const pool = dumbo({ connectionString }); @@ -75,23 +75,23 @@ export const postgreSQLEventStoreConsumer = ( }; }; - const messagePooler = (currentMessagePooler = + const messagePooler = (currentMessagePuller = postgreSQLEventStoreMessageBatchPuller({ executor: pool.execute, eachBatch, batchSize: - pooling?.batchSize ?? DefaultPostgreSQLEventStoreSubscriptionBatchSize, + pulling?.batchSize ?? DefaultPostgreSQLEventStoreSubscriptionBatchSize, pullingFrequencyInMs: - pooling?.pullingFrequencyInMs ?? + pulling?.pullingFrequencyInMs ?? DefaultPostgreSQLEventStoreSubscriptionPullingFrequencyInMs, })); const stop = async () => { if (!isRunning) return; isRunning = false; - if (currentMessagePooler) { - await currentMessagePooler.stop(); - currentMessagePooler = undefined; + if (currentMessagePuller) { + await currentMessagePuller.stop(); + currentMessagePuller = undefined; } await start; };