Skip to content

Commit

Permalink
Added SQLite append to stream
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Jan 1, 2025
1 parent 1552a73 commit 70dc842
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import {
assertEqual,
assertFalse,
assertIsNotNull,
assertTrue,
type Event,
} from '@event-driven-io/emmett';
import { after, before, describe, it } from 'node:test';
import sqlite3 from 'sqlite3';
import { v4 as uuid } from 'uuid';
import { createEventStoreSchema } from '.';
import { dbConn, type SQLiteConnection } from '../../sqliteConnection';
import { appendToStream } from './appendToStream';

export type PricedProductItem = {
productId: string;
quantity: number;
price: number;
};

export type ShoppingCart = {
productItems: PricedProductItem[];
totalAmount: number;
};

export type ProductItemAdded = Event<
'ProductItemAdded',
{ productItem: PricedProductItem }
>;
export type DiscountApplied = Event<'DiscountApplied', { percent: number }>;

export type ShoppingCartEvent = ProductItemAdded | DiscountApplied;

void describe('appendEvent', () => {
let db: SQLiteConnection;
let conn: sqlite3.Database;

before(async () => {
conn = new sqlite3.Database(':memory:');

db = dbConn(conn);
await createEventStoreSchema(db);
});

after(() => {
conn.close();
});

const events: ShoppingCartEvent[] = [
{
type: 'ProductItemAdded',
data: { productItem: { productId: '1', quantity: 2, price: 30 } },
metadata: { meta: 'data1' },
},
{
type: 'DiscountApplied',
data: { percent: 10 },
metadata: { meta: 'data2' },
},
];

void it('should append events correctly', async () => {
const result = await appendToStream(db, uuid(), 'shopping_cart', events, {
expectedStreamVersion: 0n,
});

assertTrue(result.success);
assertEqual(result.nextStreamPosition, 2n);
assertIsNotNull(result.lastGlobalPosition);
assertTrue(result.lastGlobalPosition > 0n);
});

void it('should append events correctly without expected stream position', async () => {
const result = await appendToStream(
db,
uuid(),
'shopping_cart',
events,
{},
);

assertTrue(result.success);
assertEqual(result.nextStreamPosition, 2n);
assertIsNotNull(result.lastGlobalPosition);
assertTrue(result.lastGlobalPosition > 0n);
});

void it('should append events correctly without optimistic concurrency', async () => {
const streamId = uuid();
await appendToStream(db, streamId, 'shopping_cart', events);
const result = await appendToStream(db, streamId, 'shopping_cart', events);
const resultEvents = await db.query(
'SELECT * FROM emt_events WHERE stream_id = $1',
[streamId],
);

assertEqual(4, resultEvents.length);
assertTrue(result.success);
});

void it('should handle stream position conflict correctly when two streams are created', async () => {
// Given
const streamId = uuid();

const firstResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion: 0n,
},
);
assertTrue(firstResult.success);

// When
const secondResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion: 0n,
},
);

// Then
assertFalse(secondResult.success);

const resultEvents = await db.query(
'SELECT * FROM emt_events WHERE stream_id = $1',
[streamId],
);

assertEqual(events.length, resultEvents.length);
});

void it('should handle stream position conflict correctly when version mismatches', async () => {
// Given
const streamId = uuid();

const creationResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
);
assertTrue(creationResult.success);
const expectedStreamVersion = creationResult.nextStreamPosition;

const firstResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion,
},
);

assertTrue(firstResult.success);

// When
const secondResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion,
},
);

// Then
assertFalse(secondResult.success);

const resultEvents = await db.query(
'SELECT * FROM emt_events WHERE stream_id = $1',
[streamId],
);

assertEqual(events.length * 2, resultEvents.length);
});

void it('should not have stream position conflict when version matches', async () => {
// Given
const streamId = uuid();
const expectedStreamVersion = 0n;

const firstResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion,
},
);
assertTrue(firstResult.success);

// When
const secondResult = await appendToStream(
db,
streamId,
'shopping_cart',
events,
{
expectedStreamVersion: firstResult.nextStreamPosition,
},
);

// Then
assertTrue(secondResult.success);

const resultEvents = await db.query(
'SELECT * FROM emt_events WHERE stream_id = $1',
[streamId],
);

assertEqual(events.length * 2, resultEvents.length);
});

void it('should handle appending an empty events array gracefully', async () => {
const result = await appendToStream(db, uuid(), 'shopping_cart', []);

assertFalse(result.success);
});
});
Loading

0 comments on commit 70dc842

Please sign in to comment.