From 68542af8ed21b58252e5a23b75c82c25b72d2fb9 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 13 Feb 2025 21:26:31 +0100 Subject: [PATCH] Added support for subscribing to stream for ESDB Consumer --- .../src/eventStore/eventstoreDBEventStore.ts | 3 +- ...eDBEventStoreConsumer.handling.int.spec.ts | 383 ++++++++++++++++++ ...tStoreDBEventStoreConsumer.handling.int.ts | 358 ---------------- ...ventStoreDBEventStoreConsumer.int.spec.ts} | 6 +- .../eventStoreDBEventStoreConsumer.ts | 20 +- .../messageBatchProcessing/index.ts | 123 +++--- 6 files changed, 469 insertions(+), 424 deletions(-) create mode 100644 src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.spec.ts delete mode 100644 src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts rename src/packages/emmett-esdb/src/eventStore/subscriptions/{eventStoreDBEventStoreConsumer.int.ts => eventStoreDBEventStoreConsumer.int.spec.ts} (94%) diff --git a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts index 36367910..0764d4ee 100644 --- a/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts +++ b/src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts @@ -24,7 +24,6 @@ import { StreamNotFoundError, WrongExpectedVersionError, jsonEvent, - type AllStreamJSONRecordedEvent, type AppendExpectedRevision, type ReadStreamOptions as ESDBReadStreamOptions, type JSONRecordedEvent, @@ -208,7 +207,7 @@ export const getEventStoreDBEventStore = ( }; export const mapFromESDBEvent = ( - event: JSONRecordedEvent | AllStreamJSONRecordedEvent, + event: JSONRecordedEvent, ): ReadEvent => { return >{ type: event.type, diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.spec.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.spec.ts new file mode 100644 index 00000000..1470e797 --- /dev/null +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.spec.ts @@ -0,0 +1,383 @@ +import { assertThatArray, type Event } from '@event-driven-io/emmett'; +import { + EventStoreDBContainer, + StartedEventStoreDBContainer, +} from '@event-driven-io/emmett-testcontainers'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + type EventStoreDBEventStore, + getEventStoreDBEventStore, +} from '../eventstoreDBEventStore'; +import { + $all, + eventStoreDBEventStoreConsumer, + type EventStoreDBEventStoreConsumerType, +} from './eventStoreDBEventStoreConsumer'; +import type { EventStoreDBEventStoreSubscriptionOptions } from './eventStoreDBEventStoreSubscription'; + +void describe('EventStoreDB event store started consumer', () => { + let eventStoreDB: StartedEventStoreDBContainer; + let connectionString: string; + let eventStore: EventStoreDBEventStore; + + before(async () => { + eventStoreDB = await new EventStoreDBContainer().start(); + connectionString = eventStoreDB.getConnectionString(); + eventStore = getEventStoreDBEventStore(eventStoreDB.getClient()); + }); + + after(async () => { + try { + await eventStoreDB.stop(); + } catch (error) { + console.log(error); + } + }); + + const consumeFrom: [ + string, + (streamName: string) => EventStoreDBEventStoreConsumerType, + ][] = [ + ['all', () => ({ stream: $all })], + ['stream', (streamName) => ({ stream: streamName })], + ]; + + consumeFrom.forEach(([displayName, from]) => { + void describe('eachMessage', () => { + void it(`handles all events from ${displayName} appended to event store BEFORE subscription was started`, async () => { + // Given + const guestId = uuid(); + const streamName = `guestStay-${guestId}`; + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + + const result: GuestStayEvent[] = []; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + consumer.subscribe({ + subscriptionId: uuid(), + stopAfter: (event) => + event.metadata.globalPosition === + appendResult.lastEventGlobalPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + await consumer.start(); + + assertThatArray(result).containsElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it(`handles all events from ${displayName} appended to event store AFTER subscription was started`, async () => { + // Given + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + const guestId = uuid(); + const streamName = `guestStay-${guestId}`; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + consumer.subscribe({ + subscriptionId: uuid(), + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it(`handles ONLY events from ${displayName} AFTER provided global position`, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition: startPosition } = + await eventStore.appendToStream(streamName, initialEvents); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: { position: startPosition }, + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it(`handles all events from ${displayName} when CURRENT position is NOT stored`, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + + await eventStore.appendToStream(streamName, initialEvents); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + const result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = undefined; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsElementsMatching([ + ...initialEvents, + ...events, + ]); + } finally { + await consumer.close(); + } + }); + + void it.skip(`handles only new events when CURRENT position is stored for restarted consumer from ${displayName}`, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition } = await eventStore.appendToStream( + streamName, + initialEvents, + ); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + let result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + consumer.subscribe({ + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }); + + await consumer.start(); + await consumer.stop(); + + result = []; + + stopAfterPosition = undefined; + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await consumer.close(); + } + }); + + void it.skip(`handles only new events when CURRENT position is stored for a new consumer from ${displayName}`, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${guestId}`; + + const initialEvents: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + const { lastEventGlobalPosition } = await eventStore.appendToStream( + streamName, + initialEvents, + ); + + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + ]; + + let result: GuestStayEvent[] = []; + let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + const subscriptionOptions: EventStoreDBEventStoreSubscriptionOptions = + { + subscriptionId: uuid(), + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.globalPosition === stopAfterPosition, + eachMessage: (event) => { + result.push(event); + }, + }; + + // When + const consumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + try { + consumer.subscribe(subscriptionOptions); + + await consumer.start(); + } finally { + await consumer.close(); + } + + result = []; + + stopAfterPosition = undefined; + + const newConsumer = eventStoreDBEventStoreConsumer({ + connectionString, + from: from(streamName), + }); + newConsumer.subscribe(subscriptionOptions); + + try { + const consumerPromise = newConsumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.lastEventGlobalPosition; + + await consumerPromise; + + assertThatArray(result).containsOnlyElementsMatching(events); + } finally { + await newConsumer.close(); + } + }); + }); + }); +}); + +type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>; +type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>; + +type GuestStayEvent = GuestCheckedIn | GuestCheckedOut; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts deleted file mode 100644 index a3a84949..00000000 --- a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.handling.int.ts +++ /dev/null @@ -1,358 +0,0 @@ -import { assertThatArray, type Event } from '@event-driven-io/emmett'; -import { - EventStoreDBContainer, - StartedEventStoreDBContainer, -} from '@event-driven-io/emmett-testcontainers'; -import { after, before, describe, it } from 'node:test'; -import { v4 as uuid } from 'uuid'; -import { - type EventStoreDBEventStore, - getEventStoreDBEventStore, -} from '../eventstoreDBEventStore'; -import { eventStoreDBEventStoreConsumer } from './eventStoreDBEventStoreConsumer'; -import type { EventStoreDBEventStoreSubscriptionOptions } from './eventStoreDBEventStoreSubscription'; - -void describe('EventStoreDB event store started consumer', () => { - let eventStoreDB: StartedEventStoreDBContainer; - let connectionString: string; - let eventStore: EventStoreDBEventStore; - - before(async () => { - eventStoreDB = await new EventStoreDBContainer().start(); - connectionString = eventStoreDB.getConnectionString(); - eventStore = getEventStoreDBEventStore(eventStoreDB.getClient()); - }); - - after(async () => { - try { - await eventStoreDB.stop(); - } catch (error) { - console.log(error); - } - }); - - void describe('eachMessage', () => { - void it('handles all events appended to event store BEFORE subscription was started', async () => { - // Given - const guestId = uuid(); - const streamName = `guestStay-${guestId}`; - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - const appendResult = await eventStore.appendToStream(streamName, events); - - const result: GuestStayEvent[] = []; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - consumer.subscribe({ - subscriptionId: uuid(), - stopAfter: (event) => - event.metadata.globalPosition === - appendResult.lastEventGlobalPosition, - eachMessage: (event) => { - result.push(event); - }, - }); - - try { - await consumer.start(); - - assertThatArray(result).containsElementsMatching(events); - } finally { - await consumer.close(); - } - }); - - void it('handles all events appended to event store AFTER subscription was started', async () => { - // Given - - const result: GuestStayEvent[] = []; - let stopAfterPosition: bigint | undefined = undefined; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - consumer.subscribe({ - subscriptionId: uuid(), - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }); - - const guestId = uuid(); - const streamName = `guestStay-${guestId}`; - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - - try { - const consumerPromise = consumer.start(); - - const appendResult = await eventStore.appendToStream( - streamName, - events, - ); - stopAfterPosition = appendResult.lastEventGlobalPosition; - - await consumerPromise; - - assertThatArray(result).containsElementsMatching(events); - } finally { - await consumer.close(); - } - }); - - void it('handles ONLY events AFTER provided global position', async () => { - // Given - const guestId = uuid(); - const otherGuestId = uuid(); - const streamName = `guestStay-${guestId}`; - - const initialEvents: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - const { lastEventGlobalPosition: startPosition } = - await eventStore.appendToStream(streamName, initialEvents); - - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, - { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, - ]; - - const result: GuestStayEvent[] = []; - let stopAfterPosition: bigint | undefined = undefined; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - consumer.subscribe({ - subscriptionId: uuid(), - startFrom: { globalPosition: startPosition }, - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }); - - try { - const consumerPromise = consumer.start(); - - const appendResult = await eventStore.appendToStream( - streamName, - events, - ); - stopAfterPosition = appendResult.lastEventGlobalPosition; - - await consumerPromise; - - assertThatArray(result).containsOnlyElementsMatching(events); - } finally { - await consumer.close(); - } - }); - - void it('handles all events when CURRENT position is NOT stored', async () => { - // Given - const guestId = uuid(); - const otherGuestId = uuid(); - const streamName = `guestStay-${guestId}`; - - const initialEvents: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - - await eventStore.appendToStream(streamName, initialEvents); - - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, - { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, - ]; - - const result: GuestStayEvent[] = []; - let stopAfterPosition: bigint | undefined = undefined; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - consumer.subscribe({ - subscriptionId: uuid(), - startFrom: 'CURRENT', - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }); - - try { - const consumerPromise = consumer.start(); - - const appendResult = await eventStore.appendToStream( - streamName, - events, - ); - stopAfterPosition = appendResult.lastEventGlobalPosition; - - await consumerPromise; - - assertThatArray(result).containsElementsMatching([ - ...initialEvents, - ...events, - ]); - } finally { - await consumer.close(); - } - }); - - void it('handles only new events when CURRENT position is stored for restarted consumer', async () => { - // Given - const guestId = uuid(); - const otherGuestId = uuid(); - const streamName = `guestStay-${guestId}`; - - const initialEvents: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - const { lastEventGlobalPosition } = await eventStore.appendToStream( - streamName, - initialEvents, - ); - - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, - { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, - ]; - - let result: GuestStayEvent[] = []; - let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - consumer.subscribe({ - subscriptionId: uuid(), - startFrom: 'CURRENT', - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }); - - await consumer.start(); - await consumer.stop(); - - result = []; - - stopAfterPosition = undefined; - - try { - const consumerPromise = consumer.start(); - - const appendResult = await eventStore.appendToStream( - streamName, - events, - ); - stopAfterPosition = appendResult.lastEventGlobalPosition; - - await consumerPromise; - - assertThatArray(result).containsOnlyElementsMatching(events); - } finally { - await consumer.close(); - } - }); - - void it('handles only new events when CURRENT position is stored for a new consumer', async () => { - // Given - const guestId = uuid(); - const otherGuestId = uuid(); - const streamName = `guestStay-${guestId}`; - - const initialEvents: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId } }, - { type: 'GuestCheckedOut', data: { guestId } }, - ]; - const { lastEventGlobalPosition } = await eventStore.appendToStream( - streamName, - initialEvents, - ); - - const events: GuestStayEvent[] = [ - { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, - { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, - ]; - - let result: GuestStayEvent[] = []; - let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; - - const subscriptionOptions: EventStoreDBEventStoreSubscriptionOptions = - { - subscriptionId: uuid(), - startFrom: 'CURRENT', - stopAfter: (event) => - event.metadata.globalPosition === stopAfterPosition, - eachMessage: (event) => { - result.push(event); - }, - }; - - // When - const consumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - try { - consumer.subscribe(subscriptionOptions); - - await consumer.start(); - } finally { - await consumer.close(); - } - - result = []; - - stopAfterPosition = undefined; - - const newConsumer = eventStoreDBEventStoreConsumer({ - connectionString, - }); - newConsumer.subscribe(subscriptionOptions); - - try { - const consumerPromise = newConsumer.start(); - - const appendResult = await eventStore.appendToStream( - streamName, - events, - ); - stopAfterPosition = appendResult.lastEventGlobalPosition; - - await consumerPromise; - - assertThatArray(result).containsOnlyElementsMatching(events); - } finally { - await newConsumer.close(); - } - }); - }); -}); - -type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>; -type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>; - -type GuestStayEvent = GuestCheckedIn | GuestCheckedOut; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.spec.ts similarity index 94% rename from src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts rename to src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.spec.ts index 2cfed85b..ef86c6a4 100644 --- a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.ts +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.int.spec.ts @@ -51,7 +51,7 @@ void describe('EventStoreDB event store consumer', () => { void it('creates not-started consumer if connection string targets not existing EventStoreDB database', () => { const connectionStringToNotExistingDB = - 'postgresql://postgres:postgres@not-existing-database:5432/postgres'; + 'esdb://not-existing:2113?tls=false'; const consumer = eventStoreDBEventStoreConsumer({ connectionString: connectionStringToNotExistingDB, subscriptions: [dummySubscription], @@ -79,7 +79,7 @@ void describe('EventStoreDB event store consumer', () => { void it('fails to start if connection string targets not existing EventStoreDB database', async () => { const connectionStringToNotExistingDB = - 'postgresql://postgres:postgres@not-existing-database:5432/postgres'; + 'esdb://not-existing:2113?tls=false'; const consumerToNotExistingServer = eventStoreDBEventStoreConsumer({ connectionString: connectionStringToNotExistingDB, subscriptions: [dummySubscription], @@ -87,7 +87,7 @@ void describe('EventStoreDB event store consumer', () => { await assertThrowsAsync( () => consumerToNotExistingServer.start(), (error) => { - return 'code' in error && error.code === 'EAI_AGAIN'; + return 'type' in error && error.type === 'unavailable'; }, ); }); diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts index 880d52d6..aed2188c 100644 --- a/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/eventStoreDBEventStoreConsumer.ts @@ -1,5 +1,9 @@ import { EmmettError, type Event } from '@event-driven-io/emmett'; -import { EventStoreDBClient } from '@eventstore/db-client'; +import { + EventStoreDBClient, + type SubscribeToAllOptions, + type SubscribeToStreamOptions, +} from '@eventstore/db-client'; import { eventStoreDBEventStoreSubscription, type EventStoreDBEventStoreSubscription, @@ -15,12 +19,26 @@ import { export type EventStoreDBEventStoreConsumerOptions = { connectionString: string; + from?: EventStoreDBEventStoreConsumerType; subscriptions?: EventStoreDBEventStoreSubscription[]; pulling?: { batchSize?: number; }; }; +export type $all = '$all'; +export const $all = '$all'; + +export type EventStoreDBEventStoreConsumerType = + | { + stream: $all; + options?: Exclude; + } + | { + stream: string; + options?: Exclude; + }; + export type EventStoreDBEventStoreConsumer = Readonly<{ connectionString: string; isRunning: boolean; diff --git a/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts b/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts index c8313665..4a2c0c72 100644 --- a/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts +++ b/src/packages/emmett-esdb/src/eventStore/subscriptions/messageBatchProcessing/index.ts @@ -9,11 +9,15 @@ import { EventStoreDBClient, excludeSystemEvents, START, - type AllStreamJSONRecordedEvent, - type AllStreamResolvedEvent, + type JSONRecordedEvent, + type StreamSubscription, } from '@eventstore/db-client'; import { finished, Readable } from 'stream'; import { mapFromESDBEvent } from '../../eventstoreDBEventStore'; +import { + $all, + type EventStoreDBEventStoreConsumerType, +} from '../eventStoreDBEventStoreConsumer'; export const DefaultEventStoreDBEventStoreSubscriptionBatchSize = 100; export const DefaultEventStoreDBEventStoreSubscriptionPullingFrequencyInMs = 50; @@ -41,13 +45,14 @@ export type EventStoreDBEventStoreMessagesBatchHandler< export type EventStoreDBEventStoreMessageBatchPullerOptions< EventType extends Event = Event, > = { + from?: EventStoreDBEventStoreConsumerType; eventStoreDBClient: EventStoreDBClient; batchSize: number; eachBatch: EventStoreDBEventStoreMessagesBatchHandler; }; export type EventStoreDBEventStoreMessageBatchPullerStartFrom = - | { globalPosition: bigint } + | { position: bigint } | 'BEGINNING' | 'END'; @@ -63,10 +68,48 @@ export type EventStoreDBEventStoreMessageBatchPuller = { stop(): Promise; }; +const toGlobalPosition = ( + startFrom: EventStoreDBEventStoreMessageBatchPullerStartFrom, +) => + startFrom === 'BEGINNING' + ? START + : startFrom === 'END' + ? END + : { + prepare: startFrom.position, + commit: startFrom.position, + }; + +const toStreamPosition = ( + startFrom: EventStoreDBEventStoreMessageBatchPullerStartFrom, +) => + startFrom === 'BEGINNING' + ? START + : startFrom === 'END' + ? END + : startFrom.position; + +const subscribe = ( + eventStoreDBClient: EventStoreDBClient, + from: EventStoreDBEventStoreConsumerType | undefined, + options: EventStoreDBEventStoreMessageBatchPullerStartOptions, +) => + from == undefined || from.stream == $all + ? eventStoreDBClient.subscribeToAll({ + fromPosition: toGlobalPosition(options.startFrom), + filter: excludeSystemEvents(), + ...(from?.options ?? {}), + }) + : eventStoreDBClient.subscribeToStream(from.stream, { + fromRevision: toStreamPosition(options.startFrom), + ...(from.options ?? {}), + }); + export const eventStoreDBEventStoreMessageBatchPuller = < EventType extends Event = Event, >({ eventStoreDBClient, + from, //batchSize, eachBatch, }: EventStoreDBEventStoreMessageBatchPullerOptions): EventStoreDBEventStoreMessageBatchPuller => { @@ -74,42 +117,28 @@ export const eventStoreDBEventStoreMessageBatchPuller = < let start: Promise; + let subscription: StreamSubscription; + const pullMessages = async ( options: EventStoreDBEventStoreMessageBatchPullerStartOptions, ) => { - const fromPosition = - options.startFrom === 'BEGINNING' - ? START - : options.startFrom === 'END' - ? END - : { - prepare: options.startFrom.globalPosition, - commit: options.startFrom.globalPosition, - }; - - const subscription = eventStoreDBClient.subscribeToAll({ - fromPosition, - filter: excludeSystemEvents(), - }); + subscription = subscribe(eventStoreDBClient, from, options); return new Promise((resolve, reject) => { finished( - subscription.on( - 'data', - async (resolvedEvent: AllStreamResolvedEvent) => { - if (!resolvedEvent.event) return; - - const event = mapFromESDBEvent( - resolvedEvent.event as AllStreamJSONRecordedEvent, - ); - - const result = await eachBatch({ messages: [event] }); - - if (result && result.type === 'STOP') { - subscription.destroy(); - } - }, - ) as unknown as Readable, + subscription.on('data', async (resolvedEvent) => { + if (!resolvedEvent.event) return; + + const event = mapFromESDBEvent( + resolvedEvent.event as JSONRecordedEvent, + ); + + const result = await eachBatch({ messages: [event] }); + + if (result && result.type === 'STOP') { + await subscription.unsubscribe(); + } + }) as unknown as Readable, (error) => { if (!error) { console.info(`Stopping subscription.`); @@ -121,33 +150,6 @@ export const eventStoreDBEventStoreMessageBatchPuller = < }, ); }); - //return subscription; - - // let waitTime = 100; - - // do { - // const { messages, currentGlobalPosition, areEventsLeft } = - // await readMessagesBatch(executor, readMessagesOptions); - - // if (messages.length > 0) { - // const result = await eachBatch({ messages }); - - // if (result && result.type === 'STOP') { - // isRunning = false; - // break; - // } - // } - - // readMessagesOptions.after = currentGlobalPosition; - - // await new Promise((resolve) => setTimeout(resolve, waitTime)); - - // if (!areEventsLeft) { - // waitTime = Math.min(waitTime * 2, 1000); - // } else { - // waitTime = pullingFrequencyInMs; - // } - // } while (isRunning); }; return { @@ -168,6 +170,7 @@ export const eventStoreDBEventStoreMessageBatchPuller = < stop: async () => { if (!isRunning) return; isRunning = false; + await subscription?.unsubscribe(); await start; }, };