Skip to content

Commit

Permalink
Added message type to allow store not only events but also commands
Browse files Browse the repository at this point in the history
This is the first step to enable message storing and workflows
  • Loading branch information
oskardudycz committed Feb 27, 2025
1 parent b939074 commit 65405ae
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
const eventsToAppend: ReadEvent<EventType, MongoDBReadEventMetadata>[] =
events.map((event) => {
const metadata: MongoDBReadEventMetadata = {
eventId: uuid(),
messageId: uuid(),
streamName,
streamPosition: ++streamOffset,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ export const PostgreSQLProjectionSpec = {
globalPosition: ++globalPosition,
streamPosition: globalPosition,
streamName: `test-${uuid()}`,
eventId: uuid(),
messageId: uuid(),
};

allEvents.push({
...event,
kind: 'Event',
metadata: {
...metadata,
...('metadata' in event ? (event.metadata ?? {}) : {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ export const appendToStream = (

const eventsToAppend: ReadEvent[] = events.map((e, i) => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(i),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
Expand Down Expand Up @@ -252,7 +253,7 @@ const appendEventsRaw = (
%s::bigint,
%L::text
)`,
events.map((e) => sql('%L', e.metadata.eventId)).join(','),
events.map((e) => sql('%L', e.metadata.messageId)).join(','),
events.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
events
.map((e) => sql('%L', JSONParser.stringify(e.metadata ?? {})))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ export const readMessagesBatch = async <

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
eventId: row.event_id,
messageId: row.event_id,
streamName: row.stream_id,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
...rawEvent,
kind: 'Event',
metadata: metadata as CombinedReadEventMetadata<
MessageType,
ReadEventMetadataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ export const readStream = async <EventType extends Event>(

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
eventId: row.event_id,
messageId: row.event_id,
streamName: streamId,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
...rawEvent,
kind: 'Event',
metadata: metadata as CombinedReadEventMetadata<
EventType,
ReadEventMetadataWithGlobalPosition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ export const appendToStream = async (
const eventsToAppend: ReadEvent[] = events.map(
(e: Event, i: number): ReadEvent => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(i + 1),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
Expand Down Expand Up @@ -268,7 +269,7 @@ const buildEventInsertQuery = (
JSONParser.stringify(event.metadata),
expectedStreamVersion?.toString() ?? 0,
event.type,
event.metadata.eventId,
event.metadata.messageId,
false,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ export const readStream = async <EventType extends Event>(

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
eventId: row.event_id,
messageId: row.event_id,
streamName: streamId,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
kind: 'Event',
...rawEvent,
metadata: metadata as CombinedReadEventMetadata<
EventType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -76,22 +78,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand All @@ -100,22 +104,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
];
const nextEvents: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -152,22 +158,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -79,22 +81,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand All @@ -103,22 +107,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
];
const nextEvents: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -151,22 +157,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down
3 changes: 2 additions & 1 deletion src/packages/emmett/src/eventStore/inMemoryEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,13 @@ export const getInMemoryEventStore = (
>[] = events.map((event, index) => {
const metadata: ReadEventMetadataWithGlobalPosition = {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(currentEvents.length + index + 1),
globalPosition: BigInt(getAllEventsCount() + index + 1),
};
return {
...event,
kind: event.kind ?? 'Event',
metadata: {
...('metadata' in event ? (event.metadata ?? {}) : {}),
...metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ void describe('CaughtUpTransformStream', () => {
globalPosition: bigint,
): ReadEvent<ShoppingCartOpened, ReadEventMetadataWithGlobalPosition> => ({
type: 'ShoppingCartOpened',
kind: 'Event',
data: { cartId: 'cartId' },
metadata: {
eventId: uuid(),
messageId: uuid(),
globalPosition,
streamPosition: globalPosition,
streamName: 'test',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ const createMockEvent = (
position: bigint,
): ReadEvent<MockEvent, ReadEventMetadataWithGlobalPosition> => ({
type: 'Mocked',
kind: 'Event',
data: { mocked: true },
metadata: {
streamName: 'testStream',
eventId: `event-${position}`,
messageId: `message-${position}`,
streamPosition: position,
globalPosition: position,
},
Expand Down
Loading

0 comments on commit 65405ae

Please sign in to comment.