Skip to content

Commit

Permalink
Made consumer options really optional when creating it from event stores
Browse files Browse the repository at this point in the history
By accident, there was a leftover forcing user to still provide even empty object, which doesn't make sense as it should be passed.
  • Loading branch information
oskardudycz committed Feb 14, 2025
1 parent 909b36c commit ef694ca
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
export type EventStoreDBEventStoreConsumerConfig<
ConsumerEventType extends Event = Event,
> = {
connectionString: string;
from?: EventStoreDBEventStoreConsumerType;
processors?: EventStoreDBEventStoreProcessor<ConsumerEventType>[];
pulling?: {
Expand All @@ -35,7 +34,7 @@ export type EventStoreDBEventStoreConsumerOptions<
| {
connectionString: string;
}
| { eventStoreDBClient: EventStoreDBClient }
| { client: EventStoreDBClient }
);

export type $all = '$all';
Expand All @@ -54,7 +53,6 @@ export type EventStoreDBEventStoreConsumerType =
export type EventStoreDBEventStoreConsumer<
ConsumerEventType extends Event = Event,
> = Readonly<{
connectionString: string;
isRunning: boolean;
processors: EventStoreDBEventStoreProcessor<ConsumerEventType>[];
processor: <EventType extends ConsumerEventType = ConsumerEventType>(
Expand All @@ -71,17 +69,17 @@ export const eventStoreDBEventStoreConsumer = <
options: EventStoreDBEventStoreConsumerOptions<ConsumerEventType>,
): EventStoreDBEventStoreConsumer<ConsumerEventType> => {
let isRunning = false;
const { connectionString, pulling } = options;
const { pulling } = options;
const processors = options.processors ?? [];

let start: Promise<void>;

let currentMessagePooler:
| EventStoreDBEventStoreMessageBatchPuller
| undefined;
let currentSubscription: EventStoreDBEventStoreMessageBatchPuller | undefined;

const eventStoreDBClient =
EventStoreDBClient.connectionString(connectionString);
const client =
'client' in options
? options.client
: EventStoreDBClient.connectionString(options.connectionString);

const eachBatch: EventStoreDBEventStoreMessagesBatchHandler<
ConsumerEventType
Expand All @@ -97,7 +95,7 @@ export const eventStoreDBEventStoreConsumer = <
const result = await Promise.allSettled(
activeProcessors.map((s) => {
// TODO: Add here filtering to only pass messages that can be handled by processor
return s.handle(messagesBatch, { eventStoreDBClient });
return s.handle(messagesBatch, { client });
}),
);

Expand All @@ -110,8 +108,8 @@ export const eventStoreDBEventStoreConsumer = <
};
};

const messagePuller = (currentMessagePooler = eventStoreDBSubscription({
eventStoreDBClient,
const subscription = (currentSubscription = eventStoreDBSubscription({
client,
eachBatch,
batchSize:
pulling?.batchSize ?? DefaultEventStoreDBEventStoreProcessorBatchSize,
Expand All @@ -120,15 +118,14 @@ export const eventStoreDBEventStoreConsumer = <
const stop = async () => {
if (!isRunning) return;
isRunning = false;
if (currentMessagePooler) {
await currentMessagePooler.stop();
currentMessagePooler = undefined;
if (currentSubscription) {
await currentSubscription.stop();
currentSubscription = undefined;
}
await start;
};

return {
connectionString,
processors,
get isRunning() {
return isRunning;
Expand Down Expand Up @@ -156,10 +153,10 @@ export const eventStoreDBEventStoreConsumer = <
isRunning = true;

const startFrom = zipEventStoreDBEventStoreMessageBatchPullerStartFrom(
await Promise.all(processors.map((o) => o.start(eventStoreDBClient))),
await Promise.all(processors.map((o) => o.start(client))),
);

return messagePuller.start({ startFrom });
return subscription.start({ startFrom });
})();

return start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ export type EventStoreDBEventStoreProcessorEventsBatch<
export type EventStoreDBEventStoreProcessor<EventType extends Event = Event> = {
id: string;
start: (
eventStoreDBClient: EventStoreDBClient,
client: EventStoreDBClient,
) => Promise<EventStoreDBSubscriptionStartFrom | undefined>;
isActive: boolean;
handle: (
messagesBatch: EventStoreDBEventStoreProcessorEventsBatch<EventType>,
context: { eventStoreDBClient: EventStoreDBClient },
context: { client: EventStoreDBClient },
) => Promise<EventStoreDBEventStoreProcessorMessageHandlerResult>;
};

Expand Down Expand Up @@ -88,7 +88,7 @@ export const eventStoreDBEventStoreProcessor = <
return {
id: options.processorId,
start: (
_eventStoreDBClient: EventStoreDBClient,
_client: EventStoreDBClient,
): Promise<EventStoreDBSubscriptionStartFrom | undefined> => {
isActive = true;
if (options.startFrom !== 'CURRENT')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type EventStoreDBEventStoreMessagesBatchHandler<

export type EventStoreDBSubscriptionOptions<EventType extends Event = Event> = {
from?: EventStoreDBEventStoreConsumerType;
eventStoreDBClient: EventStoreDBClient;
client: EventStoreDBClient;
batchSize: number;
eachBatch: EventStoreDBEventStoreMessagesBatchHandler<EventType>;
};
Expand Down Expand Up @@ -82,23 +82,23 @@ const toStreamPosition = (startFrom: EventStoreDBSubscriptionStartFrom) =>
: startFrom.position;

const subscribe = (
eventStoreDBClient: EventStoreDBClient,
client: EventStoreDBClient,
from: EventStoreDBEventStoreConsumerType | undefined,
options: EventStoreDBSubscriptionStartOptions,
) =>
from == undefined || from.stream == $all
? eventStoreDBClient.subscribeToAll({
? client.subscribeToAll({
fromPosition: toGlobalPosition(options.startFrom),
filter: excludeSystemEvents(),
...(from?.options ?? {}),
})
: eventStoreDBClient.subscribeToStream(from.stream, {
: client.subscribeToStream(from.stream, {
fromRevision: toStreamPosition(options.startFrom),
...(from.options ?? {}),
});

export const eventStoreDBSubscription = <EventType extends Event = Event>({
eventStoreDBClient,
client,
from,
//batchSize,
eachBatch,
Expand All @@ -112,7 +112,7 @@ export const eventStoreDBSubscription = <EventType extends Event = Event>({
const pullMessages = async (
options: EventStoreDBSubscriptionStartOptions,
) => {
subscription = subscribe(eventStoreDBClient, from, options);
subscription = subscribe(client, from, options);

return new Promise<void>((resolve, reject) => {
finished(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export interface EventStoreDBEventStore
options?: AppendToStreamOptions,
): Promise<AppendToStreamResultWithGlobalPosition>;
consumer<ConsumerEventType extends Event = Event>(
options: EventStoreDBEventStoreConsumerConfig<ConsumerEventType>,
options?: EventStoreDBEventStoreConsumerConfig<ConsumerEventType>,
): EventStoreDBEventStoreConsumer<ConsumerEventType>;
}

Expand Down Expand Up @@ -211,11 +211,11 @@ export const getEventStoreDBEventStore = (
},

consumer: <ConsumerEventType extends Event = Event>(
options: EventStoreDBEventStoreConsumerConfig<ConsumerEventType>,
options?: EventStoreDBEventStoreConsumerConfig<ConsumerEventType>,
): EventStoreDBEventStoreConsumer<ConsumerEventType> =>
eventStoreDBEventStoreConsumer<ConsumerEventType>({
...options,
eventStoreDBClient: eventStore,
...(options ?? {}),
client: eventStore,
}),

//streamEvents: streamEvents(eventStore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface PostgresEventStore
options?: AppendToStreamOptions,
): Promise<AppendToStreamResultWithGlobalPosition>;
consumer<ConsumerEventType extends Event = Event>(
options: PostgreSQLEventStoreConsumerConfig<ConsumerEventType>,
options?: PostgreSQLEventStoreConsumerConfig<ConsumerEventType>,
): PostgreSQLEventStoreConsumer<ConsumerEventType>;
close(): Promise<void>;
schema: {
Expand Down Expand Up @@ -284,9 +284,12 @@ export const getPostgreSQLEventStore = (
};
},
consumer: <ConsumerEventType extends Event = Event>(
options: PostgreSQLEventStoreConsumerConfig<ConsumerEventType>,
options?: PostgreSQLEventStoreConsumerConfig<ConsumerEventType>,
): PostgreSQLEventStoreConsumer<ConsumerEventType> =>
postgreSQLEventStoreConsumer<ConsumerEventType>({ ...options, pool }),
postgreSQLEventStoreConsumer<ConsumerEventType>({
...(options ?? {}),
pool,
}),
close: () => pool.close(),

async withSession<T = unknown>(
Expand Down

0 comments on commit ef694ca

Please sign in to comment.