Skip to content

Commit

Permalink
Fixing type errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Jan 3, 2025
1 parent cc0fd38 commit b5732d6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
47 changes: 27 additions & 20 deletions src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const appendToStream = async (
streamName,
eventId: uuid(),
streamPosition: BigInt(i + 1),
...e.metadata,
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
}),
);
Expand Down Expand Up @@ -123,10 +123,9 @@ const appendEventsRaw = async (
options?.partition?.toString() ?? defaultTag,
);

const returningId = await db.querySingle<{ global_position: string }>(
sqlString,
values,
);
const returningId = await db.querySingle<{
global_position: string;
} | null>(sqlString, values);

if (returningId?.global_position == null) {
throw new Error('Could not find global position');
Expand Down Expand Up @@ -185,7 +184,7 @@ async function getLastStreamPosition(
db: SQLiteConnection,
streamId: string,
expectedStreamVersion: bigint | null,
) {
): Promise<bigint> {
const result = await db.querySingle<{ stream_position: string } | null>(
`SELECT CAST(MAX(stream_position) AS VARCHAR) AS stream_position FROM ${streamsTable.name} WHERE stream_id = ?`,
[streamId],
Expand All @@ -200,32 +199,40 @@ async function getLastStreamPosition(
}

const buildEventInsertQuery = (
events: Event[],
expectedStreamVersion: bigint | null,
events: ReadEvent[],
expectedStreamVersion: bigint,
streamId: string,
streamType: string,
partition: string | null | undefined,
) => {
): {
sqlString: string;
values: Parameters[];
} => {
const query = events.reduce(
(
queryBuilder: {
parameterMarkers: string[];
values: Parameters[];
},
e: ReadEvent,
queryBuilder: { parameterMarkers: string[]; values: Parameters[] },
event: ReadEvent,
) => {
const streamPosition = e.metadata.streamPosition + expectedStreamVersion;
if (
event.metadata?.streamPosition == null ||
typeof event.metadata.streamPosition !== 'bigint'
) {
throw new Error('Stream position is required');
}

const streamPosition =
BigInt(event.metadata.streamPosition) + BigInt(expectedStreamVersion);

queryBuilder.parameterMarkers.push(`(?,?,?,?,?,?,?,?,?)`);
queryBuilder.values.push(
streamId,
streamPosition.toString(),
streamPosition.toString() ?? 0,
partition ?? defaultTag,
JSONParser.stringify(e.data),
JSONParser.stringify({ streamType: streamType, ...e.metadata }),
JSONParser.stringify(event.data),
JSONParser.stringify({ streamType: streamType, ...event.metadata }),
expectedStreamVersion?.toString() ?? 0,
e.type,
e.metadata.eventId,
event.type,
event.metadata.eventId,
false,
);

Expand Down
6 changes: 1 addition & 5 deletions src/packages/emmett-sqlite/src/eventStore/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ export const createEventStoreSchema = async (
db: SQLiteConnection,
): Promise<void> => {
for (const sql of schemaSQL) {
try {
await db.command(sql);
} catch (err: unknown) {
throw err;
}
await db.command(sql);
}
};

0 comments on commit b5732d6

Please sign in to comment.