diff --git a/.gitignore b/.gitignore index a32634ee..778bd3e4 100644 --- a/.gitignore +++ b/.gitignore @@ -126,4 +126,6 @@ lib */.output e2e/esmCompatibility/.output src/e2e/esmCompatibility/.output -**/0x \ No newline at end of file +**/0x + +**/*.db \ No newline at end of file diff --git a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts index 14ff4fe9..c58dc1f5 100644 --- a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts @@ -5,10 +5,12 @@ import { assertThrowsAsync, ExpectedVersionConflictError, } from '@event-driven-io/emmett'; +import fs from 'fs'; import { afterEach, beforeEach, describe, it } from 'node:test'; -import sqlite3 from 'sqlite3'; +import path from 'path'; +import { fileURLToPath } from 'url'; import { v4 as uuid } from 'uuid'; -import { sqliteConnection, type SQLiteConnection } from '../sqliteConnection'; +import { InMemorySQLiteDatabase, sqliteConnection } from '../sqliteConnection'; import { type DiscountApplied, type PricedProductItem, @@ -16,111 +18,155 @@ import { type ShoppingCartEvent, } from '../testing/shoppingCart.domain'; import { createEventStoreSchema } from './schema'; -import { getSQLiteEventStore } from './SQLiteEventStore'; - -void describe('EventStoreDBEventStore', () => { - let db: SQLiteConnection; - let conn: sqlite3.Database; - - beforeEach(() => { - conn = new sqlite3.Database(':memory:'); - db = sqliteConnection(conn); - }); +import { + getSQLiteEventStore, + type SQLiteEventStoreOptions, +} from './SQLiteEventStore'; + +void describe('SQLiteEventStore', () => { + const testDatabasePath = path.resolve( + path.dirname(fileURLToPath(import.meta.url)), + '..', + 'testing', + ); + const fileName = path.resolve(testDatabasePath, 'test.db'); afterEach(() => { - conn.close(); + if (!fs.existsSync(fileName)) { + return; + } + fs.unlinkSync(fileName); }); - void it('should append events', async () => { - await createEventStoreSchema(db); - const eventStore = getSQLiteEventStore(db); - - const productItem: PricedProductItem = { - productId: '123', - quantity: 10, - price: 3, + void describe('With manual Schema Creation', () => { + const config: SQLiteEventStoreOptions = { + schema: { + autoMigration: 'None', + }, + fileName, }; - const discount = 10; - const shoppingCartId = `shopping_cart-${uuid()}`; - const result = await eventStore.appendToStream( - shoppingCartId, - [{ type: 'ProductItemAdded', data: { productItem } }], - ); - - const result2 = await eventStore.appendToStream( - shoppingCartId, - [{ type: 'ProductItemAdded', data: { productItem } }], - { expectedStreamVersion: result.nextExpectedStreamVersion }, - ); - - await eventStore.appendToStream( - shoppingCartId, - [ - { - type: 'DiscountApplied', - data: { percent: discount, couponId: uuid() }, - }, - ], - { expectedStreamVersion: result2.nextExpectedStreamVersion }, - ); + beforeEach(() => createEventStoreSchema(sqliteConnection({ fileName }))); - const { events } = await eventStore.readStream(shoppingCartId); + void it('should append events', async () => { + const eventStore = getSQLiteEventStore(config); - assertIsNotNull(events); - assertEqual(3, events.length); - }); + const productItem: PricedProductItem = { + productId: '123', + quantity: 10, + price: 3, + }; + const discount = 10; + const shoppingCartId = `shopping_cart-${uuid()}`; - void it('should aggregate stream', async () => { - await createEventStoreSchema(db); - const eventStore = getSQLiteEventStore(db); + const result = await eventStore.appendToStream( + shoppingCartId, + [{ type: 'ProductItemAdded', data: { productItem } }], + ); - const productItem: PricedProductItem = { - productId: '123', - quantity: 10, - price: 3, - }; - const discount = 10; - const shoppingCartId = `shopping_cart-${uuid()}`; + const result2 = await eventStore.appendToStream( + shoppingCartId, + [{ type: 'ProductItemAdded', data: { productItem } }], + { expectedStreamVersion: result.nextExpectedStreamVersion }, + ); - const result = await eventStore.appendToStream( - shoppingCartId, - [{ type: 'ProductItemAdded', data: { productItem } }], - ); - - const result2 = await eventStore.appendToStream( - shoppingCartId, - [{ type: 'ProductItemAdded', data: { productItem } }], - { expectedStreamVersion: result.nextExpectedStreamVersion }, - ); - - await eventStore.appendToStream( - shoppingCartId, - [ - { - type: 'DiscountApplied', - data: { percent: discount, couponId: uuid() }, - }, - ], - { expectedStreamVersion: result2.nextExpectedStreamVersion }, - ); + await eventStore.appendToStream( + shoppingCartId, + [ + { + type: 'DiscountApplied', + data: { percent: discount, couponId: uuid() }, + }, + ], + { expectedStreamVersion: result2.nextExpectedStreamVersion }, + ); - const aggregation = await eventStore.aggregateStream(shoppingCartId, { - evolve, - initialState: () => null, + const { events } = await eventStore.readStream(shoppingCartId); + + assertIsNotNull(events); + assertEqual(3, events.length); }); - assertDeepEqual( - { totalAmount: 54, productItemsCount: 20 }, - aggregation.state, - ); + void it('should aggregate stream', async () => { + const eventStore = getSQLiteEventStore(config); + + const productItem: PricedProductItem = { + productId: '123', + quantity: 10, + price: 3, + }; + const discount = 10; + const shoppingCartId = `shopping_cart-${uuid()}`; + + const result = await eventStore.appendToStream( + shoppingCartId, + [{ type: 'ProductItemAdded', data: { productItem } }], + ); + + const result2 = await eventStore.appendToStream( + shoppingCartId, + [{ type: 'ProductItemAdded', data: { productItem } }], + { expectedStreamVersion: result.nextExpectedStreamVersion }, + ); + + await eventStore.appendToStream( + shoppingCartId, + [ + { + type: 'DiscountApplied', + data: { percent: discount, couponId: uuid() }, + }, + ], + { expectedStreamVersion: result2.nextExpectedStreamVersion }, + ); + + const aggregation = await eventStore.aggregateStream(shoppingCartId, { + evolve, + initialState: () => null, + }); + + assertDeepEqual( + { totalAmount: 54, productItemsCount: 20 }, + aggregation.state, + ); + }); + + void it('should throw an error if concurrency check has failed when appending stream', async () => { + const eventStore = getSQLiteEventStore(config); + + const productItem: PricedProductItem = { + productId: '123', + quantity: 10, + price: 3, + }; + + const shoppingCartId = `shopping_cart-${uuid()}`; + + await assertThrowsAsync>( + async () => { + await eventStore.appendToStream( + shoppingCartId, + [ + { + type: 'ProductItemAdded', + data: { productItem }, + }, + ], + { + expectedStreamVersion: 5n, + }, + ); + }, + ); + }); }); void it('should automatically create schema', async () => { - const eventStore = getSQLiteEventStore(db, { + const eventStore = getSQLiteEventStore({ schema: { autoMigration: 'CreateOrUpdate', }, + fileName, }); const productItem: PricedProductItem = { @@ -141,13 +187,13 @@ void describe('EventStoreDBEventStore', () => { assertEqual(1, events.length); }); - void it('should not overwrite event store if it exists', async () => { - const eventStore = getSQLiteEventStore(db, { + void it('should create the sqlite connection in memory, and not close the connection', async () => { + const eventStore = getSQLiteEventStore({ schema: { autoMigration: 'CreateOrUpdate', }, + fileName: InMemorySQLiteDatabase, }); - const productItem: PricedProductItem = { productId: '123', quantity: 10, @@ -164,23 +210,14 @@ void describe('EventStoreDBEventStore', () => { assertIsNotNull(events); assertEqual(1, events.length); - - const sameEventStore = getSQLiteEventStore(db, { - schema: { - autoMigration: 'CreateOrUpdate', - }, - }); - - const stream = await sameEventStore.readStream(shoppingCartId); - assertIsNotNull(stream.events); - assertEqual(1, stream.events.length); }); - void it('should throw an error if concurrency check has failed when appending stream', async () => { - const eventStore = getSQLiteEventStore(db, { + void it('should not overwrite event store if it exists', async () => { + const eventStore = getSQLiteEventStore({ schema: { autoMigration: 'CreateOrUpdate', }, + fileName, }); const productItem: PricedProductItem = { @@ -191,20 +228,25 @@ void describe('EventStoreDBEventStore', () => { const shoppingCartId = `shopping_cart-${uuid()}`; - await assertThrowsAsync>(async () => { - await eventStore.appendToStream( - shoppingCartId, - [ - { - type: 'ProductItemAdded', - data: { productItem }, - }, - ], - { - expectedStreamVersion: 5n, - }, - ); + await eventStore.appendToStream(shoppingCartId, [ + { type: 'ProductItemAdded', data: { productItem } }, + ]); + + const { events } = await eventStore.readStream(shoppingCartId); + + assertIsNotNull(events); + assertEqual(1, events.length); + const sameEventStore = getSQLiteEventStore({ + schema: { + autoMigration: 'CreateOrUpdate', + }, + fileName, }); + + const stream = await sameEventStore.readStream(shoppingCartId); + + assertIsNotNull(stream.events); + assertEqual(1, stream.events.length); }); }); diff --git a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts index a3fc826d..e4462bed 100644 --- a/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts +++ b/src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts @@ -1,4 +1,5 @@ import type { + AppendToStreamResultWithGlobalPosition, BigIntStreamPosition, Event, ReadEvent, @@ -12,12 +13,15 @@ import { type AggregateStreamOptions, type AggregateStreamResult, type AppendToStreamOptions, - type AppendToStreamResult, type EventStore, type ReadStreamOptions, type ReadStreamResult, } from '@event-driven-io/emmett'; -import type { SQLiteConnection } from '../sqliteConnection'; +import { + InMemorySQLiteDatabase, + sqliteConnection, + type SQLiteConnection, +} from '../sqliteConnection'; import { createEventStoreSchema } from './schema'; import { appendToStream } from './schema/appendToStream'; import { readStream } from './schema/readStream'; @@ -41,26 +45,68 @@ export type SQLiteEventStoreOptions = { schema?: { autoMigration?: 'None' | 'CreateOrUpdate'; }; + // eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents + fileName: InMemorySQLiteDatabase | string | undefined; }; export const getSQLiteEventStore = ( - db: SQLiteConnection, - options?: SQLiteEventStoreOptions, + options: SQLiteEventStoreOptions, ): SQLiteEventStore => { let schemaMigrated = false; - let autoGenerateSchema = false; + let database: SQLiteConnection | null; + const fileName = options.fileName ?? InMemorySQLiteDatabase; + + const isInMemory: boolean = fileName === InMemorySQLiteDatabase; + + const createConnection = () => { + if (database != null) { + return database; + } + + return sqliteConnection({ + fileName, + }); + }; + + const closeConnection = () => { + if (isInMemory) { + return; + } + if (database != null) { + database.close(); + database = null; + } + }; + + const withConnection = async ( + handler: (db: SQLiteConnection) => Promise, + ): Promise => { + if (database == null) { + database = createConnection(); + } + + try { + await ensureSchemaExists(database); + return await handler(database); + } finally { + closeConnection(); + } + }; + if (options) { autoGenerateSchema = options.schema?.autoMigration === undefined || options.schema?.autoMigration !== 'None'; } - const ensureSchemaExists = async (): Promise => { + const ensureSchemaExists = async ( + connection: SQLiteConnection, + ): Promise => { if (!autoGenerateSchema) return Promise.resolve(); if (!schemaMigrated) { - await createEventStoreSchema(db); + await createEventStoreSchema(connection); schemaMigrated = true; } @@ -83,9 +129,16 @@ export const getSQLiteEventStore = ( let state = initialState(); if (typeof streamName !== 'string') { - throw new Error('not string'); + throw new Error('Stream name is not string'); + } + + if (database == null) { + database = createConnection(); } - const result = await this.readStream(streamName, options.read); + + const result = await withConnection((db) => + readStream(db, streamName, options.read), + ); const currentStreamVersion = result.currentStreamVersion; @@ -113,29 +166,25 @@ export const getSQLiteEventStore = ( options?: ReadStreamOptions, ): Promise< ReadStreamResult - > => { - await ensureSchemaExists(); - return await readStream(db, streamName, options); - }, + > => withConnection((db) => readStream(db, streamName, options)), appendToStream: async ( streamName: string, events: EventType[], options?: AppendToStreamOptions, - ): Promise => { - await ensureSchemaExists(); + ): Promise => { + if (database == null) { + database = createConnection(); + } + // TODO: This has to be smarter when we introduce urn-based resolution const [firstPart, ...rest] = streamName.split('-'); const streamType = firstPart && rest.length > 0 ? firstPart : 'emt:unknown'; - const appendResult = await appendToStream( - db, - streamName, - streamType, - events, - options, + const appendResult = await withConnection((db) => + appendToStream(db, streamName, streamType, events, options), ); if (!appendResult.success) diff --git a/src/packages/emmett-sqlite/src/eventStore/db.sqlite b/src/packages/emmett-sqlite/src/eventStore/db.sqlite new file mode 100644 index 00000000..aa387b9a Binary files /dev/null and b/src/packages/emmett-sqlite/src/eventStore/db.sqlite differ diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts index 510cfdc0..4b5631a6 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/appendToStream.int.spec.ts @@ -6,10 +6,10 @@ import { type Event, } from '@event-driven-io/emmett'; import { after, before, describe, it } from 'node:test'; -import sqlite3 from 'sqlite3'; import { v4 as uuid } from 'uuid'; import { createEventStoreSchema } from '.'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -28,25 +28,27 @@ export type ShoppingCart = { export type ProductItemAdded = Event< 'ProductItemAdded', - { productItem: PricedProductItem } + { productItem: PricedProductItem }, + { meta: string } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number }, + { meta: string } >; -export type DiscountApplied = Event<'DiscountApplied', { percent: number }>; export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; void describe('appendEvent', () => { let db: SQLiteConnection; - let conn: sqlite3.Database; before(async () => { - conn = new sqlite3.Database(':memory:'); - - db = sqliteConnection(conn); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); after(() => { - conn.close(); + db.close(); }); const events: ShoppingCartEvent[] = [ diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts index 30c9c76a..8f3e18d2 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts @@ -1,7 +1,7 @@ import assert from 'assert'; import { after, before, describe, it } from 'node:test'; -import sqlite3 from 'sqlite3'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -23,19 +23,16 @@ const tableExists = async ( }; void describe('createEventStoreSchema', () => { - let conn: sqlite3.Database; let db: SQLiteConnection; before(async () => { - conn = new sqlite3.Database(':memory:'); - - db = sqliteConnection(conn); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); after(() => { - conn.close(); + db.close(); }); void describe('creates tables', () => { diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts index 95b611f3..c1ee9ae0 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.int.spec.ts @@ -6,10 +6,10 @@ import { type Event, } from '@event-driven-io/emmett'; import { after, before, describe, it } from 'node:test'; -import sqlite3 from 'sqlite3'; import { v4 as uuid } from 'uuid'; import { createEventStoreSchema } from '.'; import { + InMemorySQLiteDatabase, sqliteConnection, type SQLiteConnection, } from '../../sqliteConnection'; @@ -29,25 +29,27 @@ export type ShoppingCart = { export type ProductItemAdded = Event< 'ProductItemAdded', - { productItem: PricedProductItem } + { productItem: PricedProductItem }, + { meta: string } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number }, + { meta: string } >; -export type DiscountApplied = Event<'DiscountApplied', { percent: number }>; export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; void describe('appendEvent', () => { let db: SQLiteConnection; - let conn: sqlite3.Database; before(async () => { - conn = new sqlite3.Database(':memory:'); - - db = sqliteConnection(conn); + db = sqliteConnection({ fileName: InMemorySQLiteDatabase }); await createEventStoreSchema(db); }); after(() => { - conn.close(); + db.close(); }); const events: ShoppingCartEvent[] = [ diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts index 43d18cfc..baf1105f 100644 --- a/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts +++ b/src/packages/emmett-sqlite/src/eventStore/schema/readStream.ts @@ -1,10 +1,7 @@ import { - event, JSONParser, + type CombinedReadEventMetadata, type Event, - type EventDataOf, - type EventMetaDataOf, - type EventTypeOf, type ReadEvent, type ReadEventMetadataWithGlobalPosition, type ReadStreamOptions, @@ -14,12 +11,12 @@ import { type SQLiteConnection } from '../../sqliteConnection'; import { SQLiteEventStoreDefaultStreamVersion } from '../SQLiteEventStore'; import { defaultTag, eventsTable } from './typing'; -type ReadStreamSqlResult = { +type ReadStreamSqlResult = { stream_position: string; - event_data: EventDataOf; - event_metadata: EventMetaDataOf; + event_data: string; + event_metadata: string; event_schema_version: string; - event_type: EventTypeOf; + event_type: string; event_id: string; global_position: string; created: string; @@ -47,7 +44,7 @@ export const readStream = async ( const toCondition = !isNaN(to) ? `AND stream_position <= ${to}` : ''; - const results = await db.query>( + const results = await db.query( `SELECT stream_id, stream_position, global_position, event_data, event_metadata, event_schema_version, event_type, event_id FROM ${eventsTable.name} WHERE stream_id = ? AND partition = ? AND is_archived = FALSE ${fromCondition} ${toCondition}`, @@ -56,21 +53,26 @@ export const readStream = async ( const events: ReadEvent[] = results.map((row) => { - const rawEvent = event( - row.event_type, - JSONParser.parse(row.event_data), - JSONParser.parse(row.event_metadata), - ); + const rawEvent = { + type: row.event_type, + data: JSONParser.parse(row.event_data), + metadata: JSONParser.parse(row.event_metadata), + } as unknown as EventType; + + const metadata: ReadEventMetadataWithGlobalPosition = { + ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), + eventId: row.event_id, + streamName: streamId, + streamPosition: BigInt(row.stream_position), + globalPosition: BigInt(row.global_position), + }; return { ...rawEvent, - metadata: { - ...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}), - eventId: row.event_id, - streamName: streamId, - streamPosition: BigInt(row.stream_position), - globalPosition: BigInt(row.global_position), - }, + metadata: metadata as CombinedReadEventMetadata< + EventType, + ReadEventMetadataWithGlobalPosition + >, }; }); @@ -79,6 +81,7 @@ export const readStream = async ( currentStreamVersion: events[events.length - 1]!.metadata.streamPosition, events, + streamExists: true, } : { currentStreamVersion: SQLiteEventStoreDefaultStreamVersion, diff --git a/src/packages/emmett-sqlite/src/sqliteConnection.ts b/src/packages/emmett-sqlite/src/sqliteConnection.ts index 2742f6de..7591dd67 100644 --- a/src/packages/emmett-sqlite/src/sqliteConnection.ts +++ b/src/packages/emmett-sqlite/src/sqliteConnection.ts @@ -1,8 +1,9 @@ -import type sqlite3 from 'sqlite3'; +import sqlite3 from 'sqlite3'; export type Parameters = object | string | bigint | number | boolean | null; export type SQLiteConnection = { + close: () => void; command: (sql: string, values?: Parameters[]) => Promise; query: (sql: string, values?: Parameters[]) => Promise; querySingle: (sql: string, values?: Parameters[]) => Promise; @@ -20,10 +21,21 @@ export const isSQLiteError = (error: unknown): error is SQLiteError => { return false; }; -export const sqliteConnection = (conn: sqlite3.Database): SQLiteConnection => { - const db = conn; +export type InMemorySQLiteDatabase = ':memory:'; +export const InMemorySQLiteDatabase = ':memory:'; + +type SQLiteConnectionOptions = { + // eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents + fileName: InMemorySQLiteDatabase | string | undefined; +}; + +export const sqliteConnection = ( + options: SQLiteConnectionOptions, +): SQLiteConnection => { + const db = new sqlite3.Database(options.fileName ?? InMemorySQLiteDatabase); return { + close: (): void => db.close(), command: (sql: string, params?: Parameters[]) => new Promise((resolve, reject) => { db.run(sql, params ?? [], (err: Error | null) => { diff --git a/src/packages/emmett-tests/tsconfig.json b/src/packages/emmett-tests/tsconfig.json index 943f6f5f..be0ee341 100644 --- a/src/packages/emmett-tests/tsconfig.json +++ b/src/packages/emmett-tests/tsconfig.json @@ -9,6 +9,7 @@ "@event-driven-io/emmett": ["../packages/emmett"], "@event-driven-io/emmett-esdb": ["../packages/emmett-esdb"], "@event-driven-io/emmett-postgresql": ["../packages/emmett-postgresql"], + "@event-driven-io/emmett-sqlite": ["../packages/emmett-sqlite"], "@event-driven-io/emmett-mongodb": ["../packages/emmett-mongodb"], "@event-driven-io/emmett-testcontainers": [ "../packages/emmett-testcontainers" @@ -25,6 +26,9 @@ { "path": "../emmett-postgresql/" }, + { + "path": "../emmett-sqlite/" + }, { "path": "../emmett-mongodb/" }, diff --git a/src/tsconfig.json b/src/tsconfig.json index d48e55ae..24a74e39 100644 --- a/src/tsconfig.json +++ b/src/tsconfig.json @@ -32,6 +32,9 @@ { "path": "./packages/emmett-postgresql/" }, + { + "path": "./packages/emmett-sqlite/" + }, { "path": "./packages/emmett-mongodb/" },