diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts new file mode 100644 index 00000000..6d82beb3 --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/createEventStoreSchema.int.spec.ts @@ -0,0 +1,47 @@ +import assert from 'assert'; +import { after, before, describe, it } from 'node:test'; +import sqlite3 from 'sqlite3'; +import { dbConn, type SQLiteConnection } from '../../sqliteConnection'; +import { createEventStoreSchema } from '../schema'; + +type TableExists = { + name: string; +}; + +const tableExists = async ( + db: SQLiteConnection, + tableName: string, +): Promise => { + const result = await db.querySingle( + `SELECT name FROM sqlite_master WHERE type='table' AND name='${tableName}';`, + ); + + return result?.name ? true : false; +}; + +void describe('createEventStoreSchema', () => { + let conn: sqlite3.Database; + let db: SQLiteConnection; + + before(async () => { + conn = new sqlite3.Database(':memory:'); + + db = dbConn(conn); + + await createEventStoreSchema(db); + }); + + after(() => { + conn.close(); + }); + + void describe('creates tables', () => { + void it('creates the streams table', async () => { + assert.ok(await tableExists(db, 'emt_streams')); + }); + + void it('creates the events table', async () => { + assert.ok(await tableExists(db, 'emt_events')); + }); + }); +}); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/index.ts b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts new file mode 100644 index 00000000..8f6ddea0 --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/index.ts @@ -0,0 +1,18 @@ +import { type SQLiteConnection } from '../../sqliteConnection'; +import { eventsTableSQL, streamsTableSQL } from './tables'; + +export * from './tables'; + +export const schemaSQL: string[] = [streamsTableSQL, eventsTableSQL]; + +export const createEventStoreSchema = async ( + db: SQLiteConnection, +): Promise => { + for (const sql of schemaSQL) { + try { + await db.command(sql); + } catch (error) { + console.log(error); + } + } +}; diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts new file mode 100644 index 00000000..1523f412 --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/tables.ts @@ -0,0 +1,34 @@ +import { eventsTable, globalTag, streamsTable } from './typing'; + +export const sql = (sql: string) => sql; + +export const streamsTableSQL = sql( + `CREATE TABLE IF NOT EXISTS ${streamsTable.name}( + stream_id TEXT NOT NULL, + stream_position BIGINT NOT NULL DEFAULT 0, + partition TEXT NOT NULL DEFAULT '${globalTag}__${globalTag}', + stream_type TEXT NOT NULL, + stream_metadata JSONB NOT NULL, + is_archived BOOLEAN NOT NULL DEFAULT FALSE, + PRIMARY KEY (stream_id, stream_position, partition, is_archived), + UNIQUE (stream_id, partition, is_archived) + );`, +); + +export const eventsTableSQL = sql( + `CREATE TABLE IF NOT EXISTS ${eventsTable.name}( + stream_id TEXT NOT NULL, + stream_position BIGINT NOT NULL, + partition TEXT NOT NULL DEFAULT '${globalTag}', + event_data JSONB NOT NULL, + event_metadata JSONB NOT NULL, + event_schema_version TEXT NOT NULL, + event_type TEXT NOT NULL, + event_id TEXT NOT NULL, + is_archived BOOLEAN NOT NULL DEFAULT FALSE, + global_position BIGINT , + created DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (stream_id, stream_position, partition, is_archived) + ); +`, +); diff --git a/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts b/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts new file mode 100644 index 00000000..d1b13a9a --- /dev/null +++ b/src/packages/emmett-sqlite/src/eventStore/schema/typing.ts @@ -0,0 +1,31 @@ +export const emmettPrefix = 'emt'; + +export const globalTag = 'global'; +export const defaultTag = 'emt:default'; + +export const globalNames = { + module: `${emmettPrefix}:module:${globalTag}`, +}; + +const columns = { + partition: { + name: 'partition', + }, + isArchived: { name: 'is_archived' }, +}; + +export const streamsTable = { + name: `${emmettPrefix}_streams`, + columns: { + partition: columns.partition, + isArchived: columns.isArchived, + }, +}; + +export const eventsTable = { + name: `${emmettPrefix}_events`, + columns: { + partition: columns.partition, + isArchived: columns.isArchived, + }, +}; diff --git a/src/packages/emmett-sqlite/src/sqliteConnection.ts b/src/packages/emmett-sqlite/src/sqliteConnection.ts new file mode 100644 index 00000000..0a95d1b6 --- /dev/null +++ b/src/packages/emmett-sqlite/src/sqliteConnection.ts @@ -0,0 +1,49 @@ +import type sqlite3 from 'sqlite3'; + +type Parameters = object | string | number | null; + +export type SQLiteConnection = { + command: (sql: string, values?: Parameters[]) => Promise; + query: (sql: string, values?: Parameters[]) => Promise; + querySingle: (sql: string, values?: Parameters[]) => Promise; +}; + +export const dbConn = (conn: sqlite3.Database): SQLiteConnection => { + const db = conn; + + return { + command: (sql: string, params?: Parameters[]) => + new Promise((resolve, reject) => { + db.run(sql, params ?? [], (err: Error | null) => { + if (err) { + reject(err); + return; + } + + resolve(); + }); + }), + query: (sql: string, params?: Parameters[]): Promise => + new Promise((resolve, reject) => { + db.all(sql, params ?? [], (err: Error | null, result: T[]) => { + if (err) { + reject(err); + return; + } + + resolve(result); + }); + }), + querySingle: (sql: string, params?: Parameters[]): Promise => + new Promise((resolve, reject) => { + db.get(sql, params ?? [], (err: Error | null, result: T | null) => { + if (err) { + reject(err); + return; + } + + resolve(result); + }); + }), + }; +};