Skip to content

Commit cc0fd38

Browse files
committed
Added extra check for too high of a expected stream position and moved global positioning to be auto increment in sqlite
1 parent 70dc842 commit cc0fd38

File tree

4 files changed

+127
-52
lines changed

4 files changed

+127
-52
lines changed

src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,43 @@ void describe('appendEvent', () => {
9898
assertTrue(result.success);
9999
});
100100

101+
void it('should handle stream position if expected version is too high', async () => {
102+
// Given
103+
const streamId = uuid();
104+
105+
const firstResult = await appendToStream(
106+
db,
107+
streamId,
108+
'shopping_cart',
109+
events,
110+
{
111+
expectedStreamVersion: 0n,
112+
},
113+
);
114+
assertTrue(firstResult.success);
115+
116+
// When
117+
const secondResult = await appendToStream(
118+
db,
119+
streamId,
120+
'shopping_cart',
121+
events,
122+
{
123+
expectedStreamVersion: 4n,
124+
},
125+
);
126+
127+
// Then
128+
assertFalse(secondResult.success);
129+
130+
const resultEvents = await db.query(
131+
'SELECT * FROM emt_events WHERE stream_id = $1',
132+
[streamId],
133+
);
134+
135+
assertEqual(events.length, resultEvents.length);
136+
});
137+
101138
void it('should handle stream position conflict correctly when two streams are created', async () => {
102139
// Given
103140
const streamId = uuid();

src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts

Lines changed: 86 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ const appendEventsRaw = async (
103103
): Promise<AppendEventResult> => {
104104
let streamPosition;
105105
let globalPosition;
106+
106107
try {
107108
let expectedStreamVersion = options?.expectedStreamVersion ?? null;
108109

@@ -114,56 +115,33 @@ const appendEventsRaw = async (
114115
);
115116
}
116117

117-
const buildQuery = `INSERT INTO ${eventsTable.name} (stream_id, stream_position, partition, event_data, event_metadata, event_schema_version, event_type, event_id, is_archived) VALUES `;
118-
119-
const query = events.reduce(
120-
(
121-
queryBuilder: {
122-
sql: string[];
123-
values: Parameters[];
124-
},
125-
e: ReadEvent,
126-
) => {
127-
const streamPosition =
128-
e.metadata.streamPosition + expectedStreamVersion;
129-
130-
queryBuilder.sql.push(`(?,?,?,?,?,?,?,?,?)`);
131-
queryBuilder.values.push(
132-
streamId,
133-
streamPosition.toString(),
134-
options?.partition?.toString() ?? defaultTag,
135-
JSONParser.stringify(e.data),
136-
JSONParser.stringify({ streamType: streamType, ...e.metadata }),
137-
expectedStreamVersion?.toString() ?? 0,
138-
e.type,
139-
e.metadata.eventId,
140-
false,
141-
);
142-
143-
return queryBuilder;
144-
},
145-
{
146-
sql: [],
147-
values: [],
148-
},
118+
const { sqlString, values } = buildEventInsertQuery(
119+
events,
120+
expectedStreamVersion,
121+
streamId,
122+
streamType,
123+
options?.partition?.toString() ?? defaultTag,
149124
);
150125

151-
const sqlString = buildQuery + query.sql.join(', ');
126+
const returningId = await db.querySingle<{ global_position: string }>(
127+
sqlString,
128+
values,
129+
);
152130

153-
await db.command(sqlString, query.values);
131+
if (returningId?.global_position == null) {
132+
throw new Error('Could not find global position');
133+
}
134+
135+
globalPosition = BigInt(returningId.global_position);
154136

155137
const positions = await db.querySingle<{
156138
stream_position: string;
157-
global_position: string;
158139
} | null>(
159140
`
160141
SELECT
161-
CAST(stream_position AS VARCHAR) AS stream_position,
162-
CAST(global_position AS VARCHAR) AS global_position
163-
FROM ${eventsTable.name}
164-
WHERE stream_id = ?
165-
ORDER BY stream_position DESC
166-
LIMIT 1`,
142+
CAST(stream_position AS VARCHAR) AS stream_position
143+
FROM ${streamsTable.name}
144+
WHERE stream_id = ?`,
167145
[streamId],
168146
);
169147

@@ -172,7 +150,16 @@ const appendEventsRaw = async (
172150
}
173151

174152
streamPosition = BigInt(positions.stream_position);
175-
globalPosition = BigInt(positions.global_position);
153+
154+
if (expectedStreamVersion != null) {
155+
const expectedStreamPositionAfterSave =
156+
BigInt(expectedStreamVersion) + BigInt(events.length);
157+
if (streamPosition !== expectedStreamPositionAfterSave) {
158+
return {
159+
success: false,
160+
};
161+
}
162+
}
176163
} catch (err: unknown) {
177164
if (isSQLiteError(err) && isOptimisticConcurrencyError(err)) {
178165
return {
@@ -211,3 +198,60 @@ async function getLastStreamPosition(
211198
}
212199
return expectedStreamVersion;
213200
}
201+
202+
const buildEventInsertQuery = (
203+
events: Event[],
204+
expectedStreamVersion: bigint | null,
205+
streamId: string,
206+
streamType: string,
207+
partition: string | null | undefined,
208+
) => {
209+
const query = events.reduce(
210+
(
211+
queryBuilder: {
212+
parameterMarkers: string[];
213+
values: Parameters[];
214+
},
215+
e: ReadEvent,
216+
) => {
217+
const streamPosition = e.metadata.streamPosition + expectedStreamVersion;
218+
219+
queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?)`);
220+
queryBuilder.values.push(
221+
streamId,
222+
streamPosition.toString(),
223+
partition ?? defaultTag,
224+
JSONParser.stringify(e.data),
225+
JSONParser.stringify({ streamType: streamType, ...e.metadata }),
226+
expectedStreamVersion?.toString() ?? 0,
227+
e.type,
228+
e.metadata.eventId,
229+
false,
230+
);
231+
232+
return queryBuilder;
233+
},
234+
{
235+
parameterMarkers: [],
236+
values: [],
237+
},
238+
);
239+
240+
const sqlString = `
241+
INSERT INTO ${eventsTable.name} (
242+
stream_id,
243+
stream_position,
244+
partition,
245+
event_data,
246+
event_metadata,
247+
event_schema_version,
248+
event_type,
249+
event_id,
250+
is_archived
251+
)
252+
VALUES ${query.parameterMarkers.join(', ')}
253+
RETURNING
254+
CAST(global_position as VARCHAR) AS global_position
255+
`;
256+
return { sqlString, values: query.values };
257+
};

src/packages/emmett-sqlite/src/eventStore/schema/index.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ export const createEventStoreSchema = async (
1515
for (const sql of schemaSQL) {
1616
try {
1717
await db.command(sql);
18-
} catch (error) {
19-
console.log(error);
20-
21-
return;
18+
} catch (err: unknown) {
19+
throw err;
2220
}
2321
}
2422
};

src/packages/emmett-sqlite/src/eventStore/schema/tables.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ export const eventsTableSQL = sql(
2626
event_type TEXT NOT NULL,
2727
event_id TEXT NOT NULL,
2828
is_archived BOOLEAN NOT NULL DEFAULT FALSE,
29-
global_position BIGINT ,
29+
global_position INTEGER PRIMARY KEY,
3030
created DATETIME DEFAULT CURRENT_TIMESTAMP,
31-
PRIMARY KEY (stream_id, stream_position, partition, is_archived)
31+
UNIQUE (stream_id, stream_position, partition, is_archived)
3232
);
3333
`,
3434
);
@@ -51,9 +51,5 @@ export const eventStreamTrigger = sql(
5151
)
5252
ON CONFLICT(stream_id, partition, is_archived)
5353
DO UPDATE SET stream_position=stream_position + 1;
54-
55-
UPDATE ${eventsTable.name}
56-
SET global_position = IFNULL((SELECT MAX(global_position) from ${eventsTable.name})+1, 1)
57-
WHERE (stream_id, stream_position, partition, is_archived) = (NEW.stream_id, NEW.stream_position, NEW.partition, NEW.is_archived);
5854
END;`,
5955
);

0 commit comments

Comments
 (0)