diff --git a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts index b8dc4f66..0944cedb 100644 --- a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts @@ -7,10 +7,9 @@ import { } from '@event-driven-io/emmett'; import fs from 'fs'; import { afterEach, describe, it } from 'node:test'; -import { dirname } from 'path'; -import { fileURLToPath } from 'url'; +import path from 'path'; import { v4 as uuid } from 'uuid'; -import { sqliteConnection, type AbsolutePath } from '../sqliteConnection'; +import { InMemorySQLiteDatabase, sqliteConnection } from '../sqliteConnection'; import { type DiscountApplied, type PricedProductItem, @@ -20,26 +19,21 @@ import { import { createEventStoreSchema } from './schema'; import { getSQLiteEventStore } from './SQLiteEventStore'; -const __dirname = dirname(fileURLToPath(import.meta.url)) as AbsolutePath; - void describe('SQLiteEventStore', () => { - const testDatabasePath: AbsolutePath = __dirname + '/../testing/database/'; + const testDatabasePath = path.resolve(process.cwd(), 'src', 'testing'); + const dbLocation = path.resolve(testDatabasePath, 'db.sqlite'); afterEach(() => { - if (!fs.existsSync(`${testDatabasePath}/test.db`)) { + if (!fs.existsSync(dbLocation)) { return; } - fs.unlink(`${testDatabasePath}/test.db`, (err) => { - if (err) console.error('Error deleting file:', err); - }); + fs.unlinkSync(dbLocation); }); void it('should append events', async () => { - await createEventStoreSchema( - sqliteConnection({ location: `/${testDatabasePath}/test.db` }), - ); + await createEventStoreSchema(sqliteConnection({ fileName: dbLocation })); const eventStore = getSQLiteEventStore({ - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const productItem: PricedProductItem = { @@ -80,10 +74,10 @@ void describe('SQLiteEventStore', () => { void it('should aggregate stream', async () => { await createEventStoreSchema( - sqliteConnection({ location: `${testDatabasePath}/test.db` }), + sqliteConnection({ fileName: `${testDatabasePath}/test.db` }), ); const eventStore = getSQLiteEventStore({ - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const productItem: PricedProductItem = { @@ -132,7 +126,7 @@ void describe('SQLiteEventStore', () => { schema: { autoMigration: 'CreateOrUpdate', }, - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const productItem: PricedProductItem = { @@ -158,7 +152,7 @@ void describe('SQLiteEventStore', () => { schema: { autoMigration: 'CreateOrUpdate', }, - databaseLocation: ':memory:', + fileName: InMemorySQLiteDatabase, }); const productItem: PricedProductItem = { productId: '123', @@ -183,7 +177,7 @@ void describe('SQLiteEventStore', () => { schema: { autoMigration: 'CreateOrUpdate', }, - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const productItem: PricedProductItem = { @@ -206,7 +200,7 @@ void describe('SQLiteEventStore', () => { schema: { autoMigration: 'CreateOrUpdate', }, - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const stream = await sameEventStore.readStream(shoppingCartId); @@ -220,7 +214,7 @@ void describe('SQLiteEventStore', () => { schema: { autoMigration: 'CreateOrUpdate', }, - databaseLocation: `${testDatabasePath}/test.db`, + fileName: `${testDatabasePath}/test.db`, }); const productItem: PricedProductItem = { diff --git a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts index da4f70d6..e4462bed 100644 --- a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts +++ b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts @@ -1,4 +1,5 @@ import type { + AppendToStreamResultWithGlobalPosition, BigIntStreamPosition, Event, ReadEvent, @@ -12,15 +13,13 @@ import { type AggregateStreamOptions, type AggregateStreamResult, type AppendToStreamOptions, - type AppendToStreamResult, type EventStore, type ReadStreamOptions, type ReadStreamResult, } from '@event-driven-io/emmett'; import { + InMemorySQLiteDatabase, sqliteConnection, - type AbsolutePath, - type RelativePath, type SQLiteConnection, } from '../sqliteConnection'; import { createEventStoreSchema } from './schema'; @@ -46,7 +45,8 @@ export type SQLiteEventStoreOptions = { schema?: { autoMigration?: 'None' | 'CreateOrUpdate'; }; - databaseLocation: AbsolutePath | RelativePath | ':memory:'; + // eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents + fileName: InMemorySQLiteDatabase | string | undefined; }; export const getSQLiteEventStore = ( @@ -54,18 +54,18 @@ export const getSQLiteEventStore = ( ): SQLiteEventStore => { let schemaMigrated = false; let autoGenerateSchema = false; - let db: SQLiteConnection | null; - const databaseLocation = options.databaseLocation ?? null; + let database: SQLiteConnection | null; + const fileName = options.fileName ?? InMemorySQLiteDatabase; - const isInMemory: boolean = databaseLocation === ':memory:'; + const isInMemory: boolean = fileName === InMemorySQLiteDatabase; const createConnection = () => { - if (db != null) { - return db; + if (database != null) { + return database; } return sqliteConnection({ - location: databaseLocation, + fileName, }); }; @@ -73,9 +73,24 @@ export const getSQLiteEventStore = ( if (isInMemory) { return; } - if (db != null) { - db.close(); - db = null; + if (database != null) { + database.close(); + database = null; + } + }; + + const withConnection = async ( + handler: (db: SQLiteConnection) => Promise, + ): Promise => { + if (database == null) { + database = createConnection(); + } + + try { + await ensureSchemaExists(database); + return await handler(database); + } finally { + closeConnection(); } }; @@ -85,13 +100,13 @@ export const getSQLiteEventStore = ( options.schema?.autoMigration !== 'None'; } - const ensureSchemaExists = async (): Promise => { + const ensureSchemaExists = async ( + connection: SQLiteConnection, + ): Promise => { if (!autoGenerateSchema) return Promise.resolve(); - if (db == null) { - throw new Error('Database connection does not exist'); - } + if (!schemaMigrated) { - await createEventStoreSchema(db); + await createEventStoreSchema(connection); schemaMigrated = true; } @@ -117,19 +132,13 @@ export const getSQLiteEventStore = ( throw new Error('Stream name is not string'); } - if (db == null) { - db = createConnection(); - } - - let result; - try { - result = await readStream(db, streamName, options.read); - } catch (err: Error) { - closeConnection(); - throw err; + if (database == null) { + database = createConnection(); } - closeConnection(); + const result = await withConnection((db) => + readStream(db, streamName, options.read), + ); const currentStreamVersion = result.currentStreamVersion; @@ -157,32 +166,15 @@ export const getSQLiteEventStore = ( options?: ReadStreamOptions, ): Promise< ReadStreamResult - > => { - if (db == null) { - db = createConnection(); - } - - let stream; - try { - await ensureSchemaExists(); - stream = await readStream(db, streamName, options); - } catch (err: Error) { - closeConnection(); - throw err; - } - - closeConnection(); - - return stream; - }, + > => withConnection((db) => readStream(db, streamName, options)), appendToStream: async ( streamName: string, events: EventType[], options?: AppendToStreamOptions, - ): Promise => { - if (db == null) { - db = createConnection(); + ): Promise => { + if (database == null) { + database = createConnection(); } // TODO: This has to be smarter when we introduce urn-based resolution @@ -191,24 +183,9 @@ export const getSQLiteEventStore = ( const streamType = firstPart && rest.length > 0 ? firstPart : 'emt:unknown'; - let appendResult; - - try { - await ensureSchemaExists(); - - appendResult = await appendToStream( - db, - streamName, - streamType, - events, - options, - ); - } catch (err: Error) { - closeConnection(); - throw err; - } - - closeConnection(); + const appendResult = await withConnection((db) => + appendToStream(db, streamName, streamType, events, options), + ); if (!appendResult.success) throw new ExpectedVersionConflictError( diff --git a/src/packages/emmett-sqlite/src/eventStore/db.sqlite b/src/packages/emmett-sqlite/src/eventStore/db.sqlite new file mode 100644 index 00000000..aa387b9a Binary files /dev/null and b/src/packages/emmett-sqlite/src/eventStore/db.sqlite differ diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts index efcb2d6a..4b5631a6 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -9,6 +9,7 @@ import { after, before, describe, it } from 'node:test'; import { v4 as uuid } from 'uuid'; import { createEventStoreSchema } from '.'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -27,9 +28,14 @@ export type ShoppingCart = { export type ProductItemAdded = Event< 'ProductItemAdded', - { productItem: PricedProductItem } + { productItem: PricedProductItem }, + { meta: string } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number }, + { meta: string } >; -export type DiscountApplied = Event<'DiscountApplied', { percent: number }>; export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; @@ -37,7 +43,7 @@ void describe('appendEvent', () => { let db: SQLiteConnection; before(async () => { - db = sqliteConnection({ location: ':memory:' }); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts index 6e518321..8f3e18d2 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts @@ -1,6 +1,7 @@ import assert from 'assert'; import { after, before, describe, it } from 'node:test'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -25,7 +26,7 @@ void describe('createEventStoreSchema', () => { let db: SQLiteConnection; before(async () => { - db = sqliteConnection({ location: ':memory:' }); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts index 2719f7a4..c1ee9ae0 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts @@ -9,6 +9,7 @@ import { after, before, describe, it } from 'node:test'; import { v4 as uuid } from 'uuid'; import { createEventStoreSchema } from '.'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -28,9 +29,14 @@ export type ShoppingCart = { export type ProductItemAdded = Event< 'ProductItemAdded', - { productItem: PricedProductItem } + { productItem: PricedProductItem }, + { meta: string } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number }, + { meta: string } >; -export type DiscountApplied = Event<'DiscountApplied', { percent: number }>; export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; @@ -38,7 +44,7 @@ void describe('appendEvent', () => { let db: SQLiteConnection; before(async () => { - db = sqliteConnection({ location: ':memory:' }); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts index 43d18cfc..baf1105f 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts @@ -1,10 +1,7 @@ import { - event, JSONParser, + type CombinedReadEventMetadata, type Event, - type EventDataOf, - type EventMetaDataOf, - type EventTypeOf, type ReadEvent, type ReadEventMetadataWithGlobalPosition, type ReadStreamOptions, @@ -14,12 +11,12 @@ import { type SQLiteConnection } from '../../sqliteConnection'; import { SQLiteEventStoreDefaultStreamVersion } from '../SQLiteEventStore'; import { defaultTag, eventsTable } from './typing'; -type ReadStreamSqlResult = { +type ReadStreamSqlResult = { stream_position: string; - event_data: EventDataOf; - event_metadata: EventMetaDataOf; + event_data: string; + event_metadata: string; event_schema_version: string; - event_type: EventTypeOf; + event_type: string; event_id: string; global_position: string; created: string; @@ -47,7 +44,7 @@ export const readStream = async ( const toCondition = !isNaN(to) ? `AND stream_position <= ${to}` : ''; - const results = await db.query>( + const results = await db.query( `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id FROM ${eventsTable.name} WHERE stream_id = ? AND partition = ? AND is_archived = FALSE ${fromCondition} ${toCondition}`, @@ -56,21 +53,26 @@ export const readStream = async ( const events: ReadEvent[] = results.map((row) => { - const rawEvent = event( - row.event_type, - JSONParser.parse(row.event_data), - JSONParser.parse(row.event_metadata), - ); + const rawEvent = { + type: row.event_type, + data: JSONParser.parse(row.event_data), + metadata: JSONParser.parse(row.event_metadata), + } as unknown as EventType; + + const metadata: ReadEventMetadataWithGlobalPosition = { + ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), + eventId: row.event_id, + streamName: streamId, + streamPosition: BigInt(row.stream_position), + globalPosition: BigInt(row.global_position), + }; return { ...rawEvent, - metadata: { - ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), - eventId: row.event_id, - streamName: streamId, - streamPosition: BigInt(row.stream_position), - globalPosition: BigInt(row.global_position), - }, + metadata: metadata as CombinedReadEventMetadata< + EventType, + ReadEventMetadataWithGlobalPosition + >, }; }); @@ -79,6 +81,7 @@ export const readStream = async ( currentStreamVersion: events[events.length - 1]!.metadata.streamPosition, events, + streamExists: true, } : { currentStreamVersion: SQLiteEventStoreDefaultStreamVersion, diff --git a/src/packages/emmett-sqlite/src/sqliteConnection.ts b/src/packages/emmett-sqlite/src/sqliteConnection.ts index 7eabbb21..7591dd67 100644 --- a/src/packages/emmett-sqlite/src/sqliteConnection.ts +++ b/src/packages/emmett-sqlite/src/sqliteConnection.ts @@ -21,21 +21,18 @@ export const isSQLiteError = (error: unknown): error is SQLiteError => { return false; }; -export type AbsolutePath = `/${string}`; -export type RelativePath = `${'.' | '..'}/${string}`; +export type InMemorySQLiteDatabase = ':memory:'; +export const InMemorySQLiteDatabase = ':memory:'; type SQLiteConnectionOptions = { - location: AbsolutePath | RelativePath | ':memory:'; + // eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents + fileName: InMemorySQLiteDatabase | string | undefined; }; export const sqliteConnection = ( options: SQLiteConnectionOptions, ): SQLiteConnection => { - if (typeof options.location !== 'string') { - throw new Error('Path for sqlite database must be given'); - } - - const db = new sqlite3.Database(options.location); + const db = new sqlite3.Database(options.fileName ?? InMemorySQLiteDatabase); return { close: (): void => db.close(), diff --git a/src/packages/emmett-sqlite/src/testing/test.db b/src/packages/emmett-sqlite/src/testing/test.db new file mode 100644 index 00000000..a8c248c6 Binary files /dev/null and b/src/packages/emmett-sqlite/src/testing/test.db differ diff --git a/src/packages/emmett-tests/tsconfig.json b/src/packages/emmett-tests/tsconfig.json index 943f6f5f..be0ee341 100644 --- a/src/packages/emmett-tests/tsconfig.json +++ b/src/packages/emmett-tests/tsconfig.json @@ -9,6 +9,7 @@ "@event-driven-io/emmett": ["../packages/emmett"], "@event-driven-io/emmett-esdb": ["../packages/emmett-esdb"], "@event-driven-io/emmett-postgresql": ["../packages/emmett-postgresql"], + "@event-driven-io/emmett-sqlite": ["../packages/emmett-sqlite"], "@event-driven-io/emmett-mongodb": ["../packages/emmett-mongodb"], "@event-driven-io/emmett-testcontainers": [ "../packages/emmett-testcontainers" @@ -25,6 +26,9 @@ { "path": "../emmett-postgresql/" }, + { + "path": "../emmett-sqlite/" + }, { "path": "../emmett-mongodb/" }, diff --git a/src/tsconfig.json b/src/tsconfig.json index d48e55ae..24a74e39 100644 --- a/src/tsconfig.json +++ b/src/tsconfig.json @@ -32,6 +32,9 @@ { "path": "./packages/emmett-postgresql/" }, + { + "path": "./packages/emmett-sqlite/" + }, { "path": "./packages/emmett-mongodb/" },