Skip to content

Commit 9979a44

Browse files
davecosecoskardudycz
authored andcommitted
Added database table creation and tests
1 parent 0b10c42 commit 9979a44

File tree

5 files changed

+179
-0
lines changed

5 files changed

+179
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import assert from 'assert';
2+
import { after, before, describe, it } from 'node:test';
3+
import sqlite3 from 'sqlite3';
4+
import { dbConn, type SQLiteConnection } from '../../sqliteConnection';
5+
import { createEventStoreSchema } from '../schema';
6+
7+
type TableExists = {
8+
name: string;
9+
};
10+
11+
const tableExists = async (
12+
db: SQLiteConnection,
13+
tableName: string,
14+
): Promise<boolean> => {
15+
const result = await db.querySingle<TableExists>(
16+
`SELECT name FROM sqlite_master WHERE type='table' AND name='${tableName}';`,
17+
);
18+
19+
return result?.name ? true : false;
20+
};
21+
22+
void describe('createEventStoreSchema', () => {
23+
let conn: sqlite3.Database;
24+
let db: SQLiteConnection;
25+
26+
before(async () => {
27+
conn = new sqlite3.Database(':memory:');
28+
29+
db = dbConn(conn);
30+
31+
await createEventStoreSchema(db);
32+
});
33+
34+
after(() => {
35+
conn.close();
36+
});
37+
38+
void describe('creates tables', () => {
39+
void it('creates the streams table', async () => {
40+
assert.ok(await tableExists(db, 'emt_streams'));
41+
});
42+
43+
void it('creates the events table', async () => {
44+
assert.ok(await tableExists(db, 'emt_events'));
45+
});
46+
});
47+
});
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { type SQLiteConnection } from '../../sqliteConnection';
2+
import { eventsTableSQL, streamsTableSQL } from './tables';
3+
4+
export * from './tables';
5+
6+
export const schemaSQL: string[] = [streamsTableSQL, eventsTableSQL];
7+
8+
export const createEventStoreSchema = async (
9+
db: SQLiteConnection,
10+
): Promise<void> => {
11+
for (const sql of schemaSQL) {
12+
try {
13+
await db.command(sql);
14+
} catch (error) {
15+
console.log(error);
16+
}
17+
}
18+
};
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { eventsTable, globalTag, streamsTable } from './typing';
2+
3+
export const sql = (sql: string) => sql;
4+
5+
export const streamsTableSQL = sql(
6+
`CREATE TABLE IF NOT EXISTS ${streamsTable.name}(
7+
stream_id TEXT NOT NULL,
8+
stream_position BIGINT NOT NULL DEFAULT 0,
9+
partition TEXT NOT NULL DEFAULT '${globalTag}__${globalTag}',
10+
stream_type TEXT NOT NULL,
11+
stream_metadata JSONB NOT NULL,
12+
is_archived BOOLEAN NOT NULL DEFAULT FALSE,
13+
PRIMARY KEY (stream_id, stream_position, partition, is_archived),
14+
UNIQUE (stream_id, partition, is_archived)
15+
);`,
16+
);
17+
18+
export const eventsTableSQL = sql(
19+
`CREATE TABLE IF NOT EXISTS ${eventsTable.name}(
20+
stream_id TEXT NOT NULL,
21+
stream_position BIGINT NOT NULL,
22+
partition TEXT NOT NULL DEFAULT '${globalTag}',
23+
event_data JSONB NOT NULL,
24+
event_metadata JSONB NOT NULL,
25+
event_schema_version TEXT NOT NULL,
26+
event_type TEXT NOT NULL,
27+
event_id TEXT NOT NULL,
28+
is_archived BOOLEAN NOT NULL DEFAULT FALSE,
29+
global_position BIGINT ,
30+
created DATETIME DEFAULT CURRENT_TIMESTAMP,
31+
PRIMARY KEY (stream_id, stream_position, partition, is_archived)
32+
);
33+
`,
34+
);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
export const emmettPrefix = 'emt';
2+
3+
export const globalTag = 'global';
4+
export const defaultTag = 'emt:default';
5+
6+
export const globalNames = {
7+
module: `${emmettPrefix}:module:${globalTag}`,
8+
};
9+
10+
const columns = {
11+
partition: {
12+
name: 'partition',
13+
},
14+
isArchived: { name: 'is_archived' },
15+
};
16+
17+
export const streamsTable = {
18+
name: `${emmettPrefix}_streams`,
19+
columns: {
20+
partition: columns.partition,
21+
isArchived: columns.isArchived,
22+
},
23+
};
24+
25+
export const eventsTable = {
26+
name: `${emmettPrefix}_events`,
27+
columns: {
28+
partition: columns.partition,
29+
isArchived: columns.isArchived,
30+
},
31+
};
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import type sqlite3 from 'sqlite3';
2+
3+
type Parameters = object | string | number | null;
4+
5+
export type SQLiteConnection = {
6+
command: (sql: string, values?: Parameters[]) => Promise<void>;
7+
query: <T>(sql: string, values?: Parameters[]) => Promise<T[]>;
8+
querySingle: <T>(sql: string, values?: Parameters[]) => Promise<T | null>;
9+
};
10+
11+
export const dbConn = (conn: sqlite3.Database): SQLiteConnection => {
12+
const db = conn;
13+
14+
return {
15+
command: (sql: string, params?: Parameters[]) =>
16+
new Promise((resolve, reject) => {
17+
db.run(sql, params ?? [], (err: Error | null) => {
18+
if (err) {
19+
reject(err);
20+
return;
21+
}
22+
23+
resolve();
24+
});
25+
}),
26+
query: <T>(sql: string, params?: Parameters[]): Promise<T[]> =>
27+
new Promise((resolve, reject) => {
28+
db.all(sql, params ?? [], (err: Error | null, result: T[]) => {
29+
if (err) {
30+
reject(err);
31+
return;
32+
}
33+
34+
resolve(result);
35+
});
36+
}),
37+
querySingle: <T>(sql: string, params?: Parameters[]): Promise<T | null> =>
38+
new Promise((resolve, reject) => {
39+
db.get(sql, params ?? [], (err: Error | null, result: T | null) => {
40+
if (err) {
41+
reject(err);
42+
return;
43+
}
44+
45+
resolve(result);
46+
});
47+
}),
48+
};
49+
};

0 commit comments

Comments
 (0)