Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQLite event store the ability to create connection #178

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import {
assertThrowsAsync,
ExpectedVersionConflictError,
} from '@event-driven-io/emmett';
import { afterEach, beforeEach, describe, it } from 'node:test';
import sqlite3 from 'sqlite3';
import fs from 'fs';
import { afterEach, describe, it } from 'node:test';
import { dirname } from 'path';
import { fileURLToPath } from 'url';
import { v4 as uuid } from 'uuid';
import { sqliteConnection, type SQLiteConnection } from '../sqliteConnection';
import { sqliteConnection, type AbsolutePath } from '../sqliteConnection';
import {
type DiscountApplied,
type PricedProductItem,
Expand All @@ -18,22 +20,27 @@ import {
import { createEventStoreSchema } from './schema';
import { getSQLiteEventStore } from './SQLiteEventStore';

void describe('EventStoreDBEventStore', () => {
let db: SQLiteConnection;
let conn: sqlite3.Database;
const __dirname = dirname(fileURLToPath(import.meta.url)) as AbsolutePath;

beforeEach(() => {
conn = new sqlite3.Database(':memory:');
db = sqliteConnection(conn);
});
void describe('SQLiteEventStore', () => {
const testDatabasePath: AbsolutePath = __dirname + '/../testing/database/';

afterEach(() => {
conn.close();
if (!fs.existsSync(`${testDatabasePath}/test.db`)) {
return;
}
fs.unlink(`${testDatabasePath}/test.db`, (err) => {
if (err) console.error('Error deleting file:', err);
});
});

void it('should append events', async () => {
await createEventStoreSchema(db);
const eventStore = getSQLiteEventStore(db);
await createEventStoreSchema(
sqliteConnection({ location: `/${testDatabasePath}/test.db` }),
);
const eventStore = getSQLiteEventStore({
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
productId: '123',
Expand Down Expand Up @@ -72,8 +79,12 @@ void describe('EventStoreDBEventStore', () => {
});

void it('should aggregate stream', async () => {
await createEventStoreSchema(db);
const eventStore = getSQLiteEventStore(db);
await createEventStoreSchema(
sqliteConnection({ location: `${testDatabasePath}/test.db` }),
);
const eventStore = getSQLiteEventStore({
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
productId: '123',
Expand Down Expand Up @@ -117,10 +128,11 @@ void describe('EventStoreDBEventStore', () => {
});

void it('should automatically create schema', async () => {
const eventStore = getSQLiteEventStore(db, {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
Expand All @@ -141,11 +153,37 @@ void describe('EventStoreDBEventStore', () => {
assertEqual(1, events.length);
});

void it('should create the sqlite connection in memory, and not close the connection', async () => {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: ':memory:',
});
const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};

const shoppingCartId = `shopping_cart-${uuid()}`;

await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
{ type: 'ProductItemAdded', data: { productItem } },
]);

const { events } = await eventStore.readStream(shoppingCartId);

assertIsNotNull(events);
assertEqual(1, events.length);
});

void it('should not overwrite event store if it exists', async () => {
const eventStore = getSQLiteEventStore(db, {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
Expand All @@ -164,23 +202,25 @@ void describe('EventStoreDBEventStore', () => {

assertIsNotNull(events);
assertEqual(1, events.length);

const sameEventStore = getSQLiteEventStore(db, {
const sameEventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
});

const stream = await sameEventStore.readStream(shoppingCartId);

assertIsNotNull(stream.events);
assertEqual(1, stream.events.length);
});

void it('should throw an error if concurrency check has failed when appending stream', async () => {
const eventStore = getSQLiteEventStore(db, {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
Expand Down
68 changes: 62 additions & 6 deletions src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import {
type ReadStreamOptions,
type ReadStreamResult,
} from '@event-driven-io/emmett';
import type { SQLiteConnection } from '../sqliteConnection';
import {
sqliteConnection,
type AbsolutePath,
type RelativePath,
type SQLiteConnection,
} from '../sqliteConnection';
import { createEventStoreSchema } from './schema';
import { appendToStream } from './schema/appendToStream';
import { readStream } from './schema/readStream';
Expand All @@ -41,15 +46,44 @@ export type SQLiteEventStoreOptions = {
schema?: {
autoMigration?: 'None' | 'CreateOrUpdate';
};
shouldManageClientLifetime?: boolean;
databaseLocation?: AbsolutePath | RelativePath | ':memory:';
};

export const getSQLiteEventStore = (
db: SQLiteConnection,
options?: SQLiteEventStoreOptions,
): SQLiteEventStore => {
let schemaMigrated = false;

let autoGenerateSchema = false;
let db: SQLiteConnection | null;
const databaseLocation = options?.databaseLocation ?? null;

const isInMemory: boolean = databaseLocation === ':memory:';

const createConnection = () => {
if (db != null) {
return db;
}

if (options?.databaseLocation == null) {
throw new Error('Database location is not set');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONCERN: Wouldn't it be better to make database location required? Then, on the compiler level, we would make it impossible to provide the wrong value. The alternative is to keep it optional and use in-memory if not provided, which I think is fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the fallback.

}

return sqliteConnection({
location: databaseLocation,
});
};

const closeConnection = () => {
if (isInMemory) {
return;
}
if (db != null) {
db.close();
db = null;
}
};

if (options) {
autoGenerateSchema =
options.schema?.autoMigration === undefined ||
Expand All @@ -58,7 +92,9 @@ export const getSQLiteEventStore = (

const ensureSchemaExists = async (): Promise<void> => {
if (!autoGenerateSchema) return Promise.resolve();

if (db == null) {
throw new Error('Database connection does not exist');
}
if (!schemaMigrated) {
await createEventStoreSchema(db);
schemaMigrated = true;
Expand All @@ -85,7 +121,12 @@ export const getSQLiteEventStore = (
if (typeof streamName !== 'string') {
throw new Error('not string');
}
const result = await this.readStream<EventType>(streamName, options.read);

if (db == null) {
db = createConnection();
}

const result = await readStream<EventType>(db, streamName, options.read);

const currentStreamVersion = result.currentStreamVersion;

Expand All @@ -101,6 +142,8 @@ export const getSQLiteEventStore = (
state = evolve(state, event);
}

closeConnection();

return {
currentStreamVersion: currentStreamVersion,
state,
Expand All @@ -114,15 +157,26 @@ export const getSQLiteEventStore = (
): Promise<
ReadStreamResult<EventType, ReadEventMetadataWithGlobalPosition>
> => {
if (db == null) {
db = createConnection();
}

await ensureSchemaExists();
return await readStream<EventType>(db, streamName, options);
const stream = await readStream<EventType>(db, streamName, options);

closeConnection();
return stream;
},

appendToStream: async <EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions,
): Promise<AppendToStreamResult> => {
if (db == null) {
db = createConnection();
}

await ensureSchemaExists();
// TODO: This has to be smarter when we introduce urn-based resolution
const [firstPart, ...rest] = streamName.split('-');
Expand All @@ -138,6 +192,8 @@ export const getSQLiteEventStore = (
options,
);

closeConnection();

if (!appendResult.success)
throw new ExpectedVersionConflictError<bigint>(
-1n, //TODO: Return actual version in case of error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
type Event,
} from '@event-driven-io/emmett';
import { after, before, describe, it } from 'node:test';
import sqlite3 from 'sqlite3';
import { v4 as uuid } from 'uuid';
import { createEventStoreSchema } from '.';
import {
Expand Down Expand Up @@ -36,17 +35,14 @@ export type ShoppingCartEvent = ProductItemAdded | DiscountApplied;

void describe('appendEvent', () => {
let db: SQLiteConnection;
let conn: sqlite3.Database;

before(async () => {
conn = new sqlite3.Database(':memory:');

db = sqliteConnection(conn);
db = sqliteConnection({ location: ':memory:' });
await createEventStoreSchema(db);
});

after(() => {
conn.close();
db.close();
});

const events: ShoppingCartEvent[] = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import assert from 'assert';
import { after, before, describe, it } from 'node:test';
import sqlite3 from 'sqlite3';
import {
sqliteConnection,
type SQLiteConnection,
Expand All @@ -23,19 +22,16 @@ const tableExists = async (
};

void describe('createEventStoreSchema', () => {
let conn: sqlite3.Database;
let db: SQLiteConnection;

before(async () => {
conn = new sqlite3.Database(':memory:');

db = sqliteConnection(conn);
db = sqliteConnection({ location: ':memory:' });

await createEventStoreSchema(db);
});

after(() => {
conn.close();
db.close();
});

void describe('creates tables', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
type Event,
} from '@event-driven-io/emmett';
import { after, before, describe, it } from 'node:test';
import sqlite3 from 'sqlite3';
import { v4 as uuid } from 'uuid';
import { createEventStoreSchema } from '.';
import {
Expand Down Expand Up @@ -37,17 +36,14 @@ export type ShoppingCartEvent = ProductItemAdded | DiscountApplied;

void describe('appendEvent', () => {
let db: SQLiteConnection;
let conn: sqlite3.Database;

before(async () => {
conn = new sqlite3.Database(':memory:');

db = sqliteConnection(conn);
db = sqliteConnection({ location: ':memory:' });
await createEventStoreSchema(db);
});

after(() => {
conn.close();
db.close();
});

const events: ShoppingCartEvent[] = [
Expand Down
Loading
Loading