diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts index e775e4a4..f627df6a 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts @@ -20,7 +20,6 @@ import { export type EventStoreDBEventStoreConsumerConfig< ConsumerEventType extends Event = Event, > = { - connectionString: string; from?: EventStoreDBEventStoreConsumerType; processors?: EventStoreDBEventStoreProcessor[]; pulling?: { @@ -35,7 +34,7 @@ export type EventStoreDBEventStoreConsumerOptions< | { connectionString: string; } - | { eventStoreDBClient: EventStoreDBClient } + | { client: EventStoreDBClient } ); export type $all = '$all'; @@ -54,7 +53,6 @@ export type EventStoreDBEventStoreConsumerType = export type EventStoreDBEventStoreConsumer< ConsumerEventType extends Event = Event, > = Readonly<{ - connectionString: string; isRunning: boolean; processors: EventStoreDBEventStoreProcessor[]; processor: ( @@ -71,17 +69,17 @@ export const eventStoreDBEventStoreConsumer = < options: EventStoreDBEventStoreConsumerOptions, ): EventStoreDBEventStoreConsumer => { let isRunning = false; - const { connectionString, pulling } = options; + const { pulling } = options; const processors = options.processors ?? []; let start: Promise; - let currentMessagePooler: - | EventStoreDBEventStoreMessageBatchPuller - | undefined; + let currentSubscription: EventStoreDBEventStoreMessageBatchPuller | undefined; - const eventStoreDBClient = - EventStoreDBClient.connectionString(connectionString); + const client = + 'client' in options + ? options.client + : EventStoreDBClient.connectionString(options.connectionString); const eachBatch: EventStoreDBEventStoreMessagesBatchHandler< ConsumerEventType @@ -97,7 +95,7 @@ export const eventStoreDBEventStoreConsumer = < const result = await Promise.allSettled( activeProcessors.map((s) => { // TODO: Add here filtering to only pass messages that can be handled by processor - return s.handle(messagesBatch, { eventStoreDBClient }); + return s.handle(messagesBatch, { client }); }), ); @@ -110,8 +108,8 @@ export const eventStoreDBEventStoreConsumer = < }; }; - const messagePuller = (currentMessagePooler = eventStoreDBSubscription({ - eventStoreDBClient, + const subscription = (currentSubscription = eventStoreDBSubscription({ + client, eachBatch, batchSize: pulling?.batchSize ?? DefaultEventStoreDBEventStoreProcessorBatchSize, @@ -120,15 +118,14 @@ export const eventStoreDBEventStoreConsumer = < const stop = async () => { if (!isRunning) return; isRunning = false; - if (currentMessagePooler) { - await currentMessagePooler.stop(); - currentMessagePooler = undefined; + if (currentSubscription) { + await currentSubscription.stop(); + currentSubscription = undefined; } await start; }; return { - connectionString, processors, get isRunning() { return isRunning; @@ -156,10 +153,10 @@ export const eventStoreDBEventStoreConsumer = < isRunning = true; const startFrom = zipEventStoreDBEventStoreMessageBatchPullerStartFrom( - await Promise.all(processors.map((o) => o.start(eventStoreDBClient))), + await Promise.all(processors.map((o) => o.start(client))), ); - return messagePuller.start({ startFrom }); + return subscription.start({ startFrom }); })(); return start; diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreProcessor.ts b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreProcessor.ts index 43e7989f..a10b6a36 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreProcessor.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreProcessor.ts @@ -17,12 +17,12 @@ export type EventStoreDBEventStoreProcessorEventsBatch< export type EventStoreDBEventStoreProcessor = { id: string; start: ( - eventStoreDBClient: EventStoreDBClient, + client: EventStoreDBClient, ) => Promise; isActive: boolean; handle: ( messagesBatch: EventStoreDBEventStoreProcessorEventsBatch, - context: { eventStoreDBClient: EventStoreDBClient }, + context: { client: EventStoreDBClient }, ) => Promise; }; @@ -88,7 +88,7 @@ export const eventStoreDBEventStoreProcessor = < return { id: options.processorId, start: ( - _eventStoreDBClient: EventStoreDBClient, + _client: EventStoreDBClient, ): Promise => { isActive = true; if (options.startFrom !== 'CURRENT') diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts index a8c32ebc..96fac4b8 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts @@ -44,7 +44,7 @@ export type EventStoreDBEventStoreMessagesBatchHandler< export type EventStoreDBSubscriptionOptions = { from?: EventStoreDBEventStoreConsumerType; - eventStoreDBClient: EventStoreDBClient; + client: EventStoreDBClient; batchSize: number; eachBatch: EventStoreDBEventStoreMessagesBatchHandler; }; @@ -82,23 +82,23 @@ const toStreamPosition = (startFrom: EventStoreDBSubscriptionStartFrom) => : startFrom.position; const subscribe = ( - eventStoreDBClient: EventStoreDBClient, + client: EventStoreDBClient, from: EventStoreDBEventStoreConsumerType | undefined, options: EventStoreDBSubscriptionStartOptions, ) => from == undefined || from.stream == $all - ? eventStoreDBClient.subscribeToAll({ + ? client.subscribeToAll({ fromPosition: toGlobalPosition(options.startFrom), filter: excludeSystemEvents(), ...(from?.options ?? {}), }) - : eventStoreDBClient.subscribeToStream(from.stream, { + : client.subscribeToStream(from.stream, { fromRevision: toStreamPosition(options.startFrom), ...(from.options ?? {}), }); export const eventStoreDBSubscription = ({ - eventStoreDBClient, + client, from, //batchSize, eachBatch, @@ -112,7 +112,7 @@ export const eventStoreDBSubscription = ({ const pullMessages = async ( options: EventStoreDBSubscriptionStartOptions, ) => { - subscription = subscribe(eventStoreDBClient, from, options); + subscription = subscribe(client, from, options); return new Promise((resolve, reject) => { finished( diff --git a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts index ffa1412e..e805b71b 100644 --- a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts +++ b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts @@ -67,7 +67,7 @@ export interface EventStoreDBEventStore options?: AppendToStreamOptions, ): Promise; consumer( - options: EventStoreDBEventStoreConsumerConfig, + options?: EventStoreDBEventStoreConsumerConfig, ): EventStoreDBEventStoreConsumer; } @@ -211,11 +211,11 @@ export const getEventStoreDBEventStore = ( }, consumer: ( - options: EventStoreDBEventStoreConsumerConfig, + options?: EventStoreDBEventStoreConsumerConfig, ): EventStoreDBEventStoreConsumer => eventStoreDBEventStoreConsumer({ - ...options, - eventStoreDBClient: eventStore, + ...(options ?? {}), + client: eventStore, }), //streamEvents: streamEvents(eventStore), diff --git a/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts b/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts index 60e46586..67334c80 100644 --- a/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts +++ b/src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts @@ -51,7 +51,7 @@ export interface PostgresEventStore options?: AppendToStreamOptions, ): Promise; consumer( - options: PostgreSQLEventStoreConsumerConfig, + options?: PostgreSQLEventStoreConsumerConfig, ): PostgreSQLEventStoreConsumer; close(): Promise; schema: { @@ -284,9 +284,12 @@ export const getPostgreSQLEventStore = ( }; }, consumer: ( - options: PostgreSQLEventStoreConsumerConfig, + options?: PostgreSQLEventStoreConsumerConfig, ): PostgreSQLEventStoreConsumer => - postgreSQLEventStoreConsumer({ ...options, pool }), + postgreSQLEventStoreConsumer({ + ...(options ?? {}), + pool, + }), close: () => pool.close(), async withSession(