Skip to content

Commit e6ded8b

Browse files
committed
Added first draft of EventStoreDB consumer
1 parent eeae558 commit e6ded8b

File tree

8 files changed

+1022
-12
lines changed

8 files changed

+1022
-12
lines changed

src/packages/emmett-esdb/src/eventStore/eventstoreDBEventStore.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
StreamNotFoundError,
2525
WrongExpectedVersionError,
2626
jsonEvent,
27+
type AllStreamJSONRecordedEvent,
2728
type AppendExpectedRevision,
2829
type ReadStreamOptions as ESDBReadStreamOptions,
2930
type JSONRecordedEvent,
@@ -54,7 +55,14 @@ export type EventStoreDBReadEvent<EventType extends Event = Event> = ReadEvent<
5455
EventStoreDBReadEventMetadata
5556
>;
5657

57-
export type EventStoreDBEventStore = EventStore<EventStoreDBReadEventMetadata>;
58+
export interface EventStoreDBEventStore
59+
extends EventStore<EventStoreDBReadEventMetadata> {
60+
appendToStream<EventType extends Event>(
61+
streamName: string,
62+
events: EventType[],
63+
options?: AppendToStreamOptions,
64+
): Promise<AppendToStreamResultWithGlobalPosition>;
65+
}
5866

5967
export const getEventStoreDBEventStore = (
6068
eventStore: EventStoreDBClient,
@@ -199,8 +207,8 @@ export const getEventStoreDBEventStore = (
199207
};
200208
};
201209

202-
const mapFromESDBEvent = <EventType extends Event = Event>(
203-
event: JSONRecordedEvent<EventType>,
210+
export const mapFromESDBEvent = <EventType extends Event = Event>(
211+
event: JSONRecordedEvent<EventType> | AllStreamJSONRecordedEvent<EventType>,
204212
): ReadEvent<EventType, EventStoreDBReadEventMetadata> => {
205213
return <ReadEvent<EventType, EventStoreDBReadEventMetadata>>{
206214
type: event.type,
Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
import { assertThatArray, type Event } from '@event-driven-io/emmett';
2+
import {
3+
EventStoreDBContainer,
4+
StartedEventStoreDBContainer,
5+
} from '@event-driven-io/emmett-testcontainers';
6+
import { after, before, describe, it } from 'node:test';
7+
import { v4 as uuid } from 'uuid';
8+
import {
9+
type EventStoreDBEventStore,
10+
getEventStoreDBEventStore,
11+
} from '../eventstoreDBEventStore';
12+
import { eventStoreDBEventStoreConsumer } from './eventStoreDBEventStoreConsumer';
13+
import type { EventStoreDBEventStoreSubscriptionOptions } from './eventStoreDBEventStoreSubscription';
14+
15+
void describe('EventStoreDB event store started consumer', () => {
16+
let eventStoreDB: StartedEventStoreDBContainer;
17+
let connectionString: string;
18+
let eventStore: EventStoreDBEventStore;
19+
20+
before(async () => {
21+
eventStoreDB = await new EventStoreDBContainer().start();
22+
connectionString = eventStoreDB.getConnectionString();
23+
eventStore = getEventStoreDBEventStore(eventStoreDB.getClient());
24+
});
25+
26+
after(async () => {
27+
try {
28+
await eventStoreDB.stop();
29+
} catch (error) {
30+
console.log(error);
31+
}
32+
});
33+
34+
void describe('eachMessage', () => {
35+
void it('handles all events appended to event store BEFORE subscription was started', async () => {
36+
// Given
37+
const guestId = uuid();
38+
const streamName = `guestStay-${guestId}`;
39+
const events: GuestStayEvent[] = [
40+
{ type: 'GuestCheckedIn', data: { guestId } },
41+
{ type: 'GuestCheckedOut', data: { guestId } },
42+
];
43+
const appendResult = await eventStore.appendToStream(streamName, events);
44+
45+
const result: GuestStayEvent[] = [];
46+
47+
// When
48+
const consumer = eventStoreDBEventStoreConsumer({
49+
connectionString,
50+
});
51+
consumer.subscribe<GuestStayEvent>({
52+
subscriptionId: uuid(),
53+
stopAfter: (event) =>
54+
event.metadata.globalPosition ===
55+
appendResult.lastEventGlobalPosition,
56+
eachMessage: (event) => {
57+
result.push(event);
58+
},
59+
});
60+
61+
try {
62+
await consumer.start();
63+
64+
assertThatArray(result).containsElementsMatching(events);
65+
} finally {
66+
await consumer.close();
67+
}
68+
});
69+
70+
void it('handles all events appended to event store AFTER subscription was started', async () => {
71+
// Given
72+
73+
const result: GuestStayEvent[] = [];
74+
let stopAfterPosition: bigint | undefined = undefined;
75+
76+
// When
77+
const consumer = eventStoreDBEventStoreConsumer({
78+
connectionString,
79+
});
80+
consumer.subscribe<GuestStayEvent>({
81+
subscriptionId: uuid(),
82+
stopAfter: (event) =>
83+
event.metadata.globalPosition === stopAfterPosition,
84+
eachMessage: (event) => {
85+
result.push(event);
86+
},
87+
});
88+
89+
const guestId = uuid();
90+
const streamName = `guestStay-${guestId}`;
91+
const events: GuestStayEvent[] = [
92+
{ type: 'GuestCheckedIn', data: { guestId } },
93+
{ type: 'GuestCheckedOut', data: { guestId } },
94+
];
95+
96+
try {
97+
const consumerPromise = consumer.start();
98+
99+
const appendResult = await eventStore.appendToStream(
100+
streamName,
101+
events,
102+
);
103+
stopAfterPosition = appendResult.lastEventGlobalPosition;
104+
105+
await consumerPromise;
106+
107+
assertThatArray(result).containsElementsMatching(events);
108+
} finally {
109+
await consumer.close();
110+
}
111+
});
112+
113+
void it('handles ONLY events AFTER provided global position', async () => {
114+
// Given
115+
const guestId = uuid();
116+
const otherGuestId = uuid();
117+
const streamName = `guestStay-${guestId}`;
118+
119+
const initialEvents: GuestStayEvent[] = [
120+
{ type: 'GuestCheckedIn', data: { guestId } },
121+
{ type: 'GuestCheckedOut', data: { guestId } },
122+
];
123+
const { lastEventGlobalPosition: startPosition } =
124+
await eventStore.appendToStream(streamName, initialEvents);
125+
126+
const events: GuestStayEvent[] = [
127+
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
128+
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
129+
];
130+
131+
const result: GuestStayEvent[] = [];
132+
let stopAfterPosition: bigint | undefined = undefined;
133+
134+
// When
135+
const consumer = eventStoreDBEventStoreConsumer({
136+
connectionString,
137+
});
138+
consumer.subscribe<GuestStayEvent>({
139+
subscriptionId: uuid(),
140+
startFrom: { globalPosition: startPosition },
141+
stopAfter: (event) =>
142+
event.metadata.globalPosition === stopAfterPosition,
143+
eachMessage: (event) => {
144+
result.push(event);
145+
},
146+
});
147+
148+
try {
149+
const consumerPromise = consumer.start();
150+
151+
const appendResult = await eventStore.appendToStream(
152+
streamName,
153+
events,
154+
);
155+
stopAfterPosition = appendResult.lastEventGlobalPosition;
156+
157+
await consumerPromise;
158+
159+
assertThatArray(result).containsOnlyElementsMatching(events);
160+
} finally {
161+
await consumer.close();
162+
}
163+
});
164+
165+
void it('handles all events when CURRENT position is NOT stored', async () => {
166+
// Given
167+
const guestId = uuid();
168+
const otherGuestId = uuid();
169+
const streamName = `guestStay-${guestId}`;
170+
171+
const initialEvents: GuestStayEvent[] = [
172+
{ type: 'GuestCheckedIn', data: { guestId } },
173+
{ type: 'GuestCheckedOut', data: { guestId } },
174+
];
175+
176+
await eventStore.appendToStream(streamName, initialEvents);
177+
178+
const events: GuestStayEvent[] = [
179+
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
180+
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
181+
];
182+
183+
const result: GuestStayEvent[] = [];
184+
let stopAfterPosition: bigint | undefined = undefined;
185+
186+
// When
187+
const consumer = eventStoreDBEventStoreConsumer({
188+
connectionString,
189+
});
190+
consumer.subscribe<GuestStayEvent>({
191+
subscriptionId: uuid(),
192+
startFrom: 'CURRENT',
193+
stopAfter: (event) =>
194+
event.metadata.globalPosition === stopAfterPosition,
195+
eachMessage: (event) => {
196+
result.push(event);
197+
},
198+
});
199+
200+
try {
201+
const consumerPromise = consumer.start();
202+
203+
const appendResult = await eventStore.appendToStream(
204+
streamName,
205+
events,
206+
);
207+
stopAfterPosition = appendResult.lastEventGlobalPosition;
208+
209+
await consumerPromise;
210+
211+
assertThatArray(result).containsElementsMatching([
212+
...initialEvents,
213+
...events,
214+
]);
215+
} finally {
216+
await consumer.close();
217+
}
218+
});
219+
220+
void it('handles only new events when CURRENT position is stored for restarted consumer', async () => {
221+
// Given
222+
const guestId = uuid();
223+
const otherGuestId = uuid();
224+
const streamName = `guestStay-${guestId}`;
225+
226+
const initialEvents: GuestStayEvent[] = [
227+
{ type: 'GuestCheckedIn', data: { guestId } },
228+
{ type: 'GuestCheckedOut', data: { guestId } },
229+
];
230+
const { lastEventGlobalPosition } = await eventStore.appendToStream(
231+
streamName,
232+
initialEvents,
233+
);
234+
235+
const events: GuestStayEvent[] = [
236+
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
237+
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
238+
];
239+
240+
let result: GuestStayEvent[] = [];
241+
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;
242+
243+
// When
244+
const consumer = eventStoreDBEventStoreConsumer({
245+
connectionString,
246+
});
247+
consumer.subscribe<GuestStayEvent>({
248+
subscriptionId: uuid(),
249+
startFrom: 'CURRENT',
250+
stopAfter: (event) =>
251+
event.metadata.globalPosition === stopAfterPosition,
252+
eachMessage: (event) => {
253+
result.push(event);
254+
},
255+
});
256+
257+
await consumer.start();
258+
await consumer.stop();
259+
260+
result = [];
261+
262+
stopAfterPosition = undefined;
263+
264+
try {
265+
const consumerPromise = consumer.start();
266+
267+
const appendResult = await eventStore.appendToStream(
268+
streamName,
269+
events,
270+
);
271+
stopAfterPosition = appendResult.lastEventGlobalPosition;
272+
273+
await consumerPromise;
274+
275+
assertThatArray(result).containsOnlyElementsMatching(events);
276+
} finally {
277+
await consumer.close();
278+
}
279+
});
280+
281+
void it('handles only new events when CURRENT position is stored for a new consumer', async () => {
282+
// Given
283+
const guestId = uuid();
284+
const otherGuestId = uuid();
285+
const streamName = `guestStay-${guestId}`;
286+
287+
const initialEvents: GuestStayEvent[] = [
288+
{ type: 'GuestCheckedIn', data: { guestId } },
289+
{ type: 'GuestCheckedOut', data: { guestId } },
290+
];
291+
const { lastEventGlobalPosition } = await eventStore.appendToStream(
292+
streamName,
293+
initialEvents,
294+
);
295+
296+
const events: GuestStayEvent[] = [
297+
{ type: 'GuestCheckedIn', data: { guestId: otherGuestId } },
298+
{ type: 'GuestCheckedOut', data: { guestId: otherGuestId } },
299+
];
300+
301+
let result: GuestStayEvent[] = [];
302+
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;
303+
304+
const subscriptionOptions: EventStoreDBEventStoreSubscriptionOptions<GuestStayEvent> =
305+
{
306+
subscriptionId: uuid(),
307+
startFrom: 'CURRENT',
308+
stopAfter: (event) =>
309+
event.metadata.globalPosition === stopAfterPosition,
310+
eachMessage: (event) => {
311+
result.push(event);
312+
},
313+
};
314+
315+
// When
316+
const consumer = eventStoreDBEventStoreConsumer({
317+
connectionString,
318+
});
319+
try {
320+
consumer.subscribe<GuestStayEvent>(subscriptionOptions);
321+
322+
await consumer.start();
323+
} finally {
324+
await consumer.close();
325+
}
326+
327+
result = [];
328+
329+
stopAfterPosition = undefined;
330+
331+
const newConsumer = eventStoreDBEventStoreConsumer({
332+
connectionString,
333+
});
334+
newConsumer.subscribe<GuestStayEvent>(subscriptionOptions);
335+
336+
try {
337+
const consumerPromise = newConsumer.start();
338+
339+
const appendResult = await eventStore.appendToStream(
340+
streamName,
341+
events,
342+
);
343+
stopAfterPosition = appendResult.lastEventGlobalPosition;
344+
345+
await consumerPromise;
346+
347+
assertThatArray(result).containsOnlyElementsMatching(events);
348+
} finally {
349+
await newConsumer.close();
350+
}
351+
});
352+
});
353+
});
354+
355+
type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>;
356+
type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>;
357+
358+
type GuestStayEvent = GuestCheckedIn | GuestCheckedOut;

0 commit comments

Comments
 (0)