Skip to content

Commit c473f94

Browse files
davecosecoskardudycz
authored andcommitted
Removed the upsert and replaced with seperate insert and update
1 parent 9bd77f9 commit c473f94

File tree

1 file changed

+46
-26
lines changed

1 file changed

+46
-26
lines changed

src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -133,35 +133,55 @@ const appendEventsRaw = async (
133133

134134
globalPosition = BigInt(returningId.global_position);
135135

136-
const positions = await db.querySingle<{ stream_position: string } | null>(
137-
`INSERT INTO ${streamsTable.name}
138-
(stream_id, stream_position, partition, stream_type, stream_metadata, is_archived)
139-
VALUES (
140-
?,
141-
?,
142-
?,
143-
?,
144-
'[]',
145-
false
146-
)
147-
ON CONFLICT(stream_id, partition, is_archived)
148-
DO UPDATE SET stream_position=stream_position + ?
149-
RETURNING stream_position;
150-
`,
151-
[
152-
streamId,
153-
events.length,
154-
options?.partition ?? '0',
155-
streamType,
156-
events.length,
157-
],
158-
);
136+
let position: { stream_position: string } | null;
137+
138+
if (expectedStreamVersion === 0n) {
139+
position = await db.querySingle<{
140+
stream_position: string;
141+
} | null>(
142+
`INSERT INTO ${streamsTable.name}
143+
(stream_id, stream_position, partition, stream_type, stream_metadata, is_archived)
144+
VALUES (
145+
?,
146+
?,
147+
?,
148+
?,
149+
'[]',
150+
false
151+
)
152+
RETURNING stream_position;
153+
`,
154+
[
155+
streamId,
156+
events.length,
157+
options?.partition ?? streamsTable.columns.partition,
158+
streamType,
159+
],
160+
);
161+
} else {
162+
position = await db.querySingle<{
163+
stream_position: string;
164+
} | null>(
165+
`UPDATE ${streamsTable.name}
166+
SET stream_position = stream_position + ?
167+
WHERE stream_id = ?
168+
AND partition = ?
169+
AND is_archived = false
170+
RETURNING stream_position;
171+
`,
172+
[
173+
events.length,
174+
streamId,
175+
options?.partition ?? streamsTable.columns.partition,
176+
],
177+
);
178+
}
159179

160-
if (positions == null) {
161-
throw new Error('Could not find stream positions');
180+
if (position == null) {
181+
throw new Error('Could not find stream position');
162182
}
163183

164-
streamPosition = BigInt(positions.stream_position);
184+
streamPosition = BigInt(position.stream_position);
165185

166186
if (expectedStreamVersion != null) {
167187
const expectedStreamPositionAfterSave =

0 commit comments

Comments
 (0)