Skip to content

Commit

Permalink
Feature(RDB): add support to RDB, Generate RDB by SAVE command is rea…
Browse files Browse the repository at this point in the history
…dy, refactor in the methods of the SharedStorage, add Thread workers for RDB lifeCycle
  • Loading branch information
proXDhiya committed Aug 25, 2024
1 parent 7d03ce5 commit 398d5d3
Show file tree
Hide file tree
Showing 23 changed files with 258 additions and 44 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
node_modules/
.idea/
.idea/
dumps/
6 changes: 3 additions & 3 deletions app/commands/dataManipulation/COPY.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ const COPY = (data: IRESP[]): ICommand => {
const destination = data[2].data.toString();
const replaceFlag = data[3] ? /REPLACE/gim.test(data[3].data.toString()) : false;

const isDestinationExists = SharedStorage.find(destination);
const value = SharedStorage.get(source);
const isDestinationExists = SharedStorage.findKey(destination);
const value = SharedStorage.getKey(source);

if ((isDestinationExists && !replaceFlag) || !value)
return <ICommand>{
type: 'integer',
data: 0
};

SharedStorage.set(destination, value);
SharedStorage.setKey(destination, value);

return <ICommand>{
type: 'integer',
Expand Down
2 changes: 1 addition & 1 deletion app/commands/dataManipulation/DEL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const DEL = (data: IRESP[]): ICommand => {
let deleted = 0;

keys.map(key => {
if (SharedStorage.delete(key))
if (SharedStorage.deleteKey(key))
deleted++;
});

Expand Down
2 changes: 1 addition & 1 deletion app/commands/dataManipulation/EXISTS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const EXISTS = (data: IRESP[]): ICommand => {
let exists = 0;

keys.map(key => {
if (SharedStorage.find(key))
if (SharedStorage.findKey(key))
exists++;
});

Expand Down
2 changes: 1 addition & 1 deletion app/commands/dataManipulation/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const GET = (data: IRESP[]): ICommand => {
if (data.length !== 2)
return NUMBER_ARGS(data);

const value = SharedStorage.get(data[1].data.toString());
const value = SharedStorage.getKey(data[1].data.toString());

if (!value) return <ICommand>{
type: 'bulk',
Expand Down
6 changes: 3 additions & 3 deletions app/commands/dataManipulation/RENAME.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ const RENAME = (data: IRESP[]): ICommand => {
const key = data[1].data.toString();
const newKey = data[2].data.toString();

if (!SharedStorage.find(key)) {
if (!SharedStorage.findKey(key)) {
return <ICommand>{
type: 'error',
data: 'ERR'
}
}

SharedStorage.set(newKey, SharedStorage.get(key));
SharedStorage.delete(key);
SharedStorage.setKey(newKey, SharedStorage.getKey(key));
SharedStorage.deleteKey(key);

return <ICommand>{
type: 'string',
Expand Down
8 changes: 4 additions & 4 deletions app/commands/dataManipulation/RENAMENX.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ const RENAMENX = (data: IRESP[]): ICommand => {
const key = data[1].data.toString();
const newKey = data[2].data.toString();

if (!SharedStorage.find(key)) {
if (!SharedStorage.findKey(key)) {
return <ICommand>{
type: 'error',
data: 'ERR'
}
}

if (SharedStorage.find(newKey))
if (SharedStorage.findKey(newKey))
return <ICommand>{
type: 'integer',
data: 0
}

SharedStorage.set(newKey, SharedStorage.get(key));
SharedStorage.delete(key);
SharedStorage.setKey(newKey, SharedStorage.getKey(key));
SharedStorage.deleteKey(key);

return <ICommand>{
type: 'integer',
Expand Down
2 changes: 1 addition & 1 deletion app/commands/dataManipulation/SET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const SET = (data: IRESP[]): ICommand => {
data: 'ERR'
};

SharedStorage.set(key, value, duration);
SharedStorage.setKey(key, value, duration);

return <ICommand>{
type: 'string',
Expand Down
14 changes: 14 additions & 0 deletions app/commands/presistence/SAVE.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import ICommand from "../../interfaces/command.interface";
import SharedStorage from "../../storage/SharedStorage";
import {subject} from "../../workers/worker.observer";

const SAVE = (): ICommand => {
subject.send('RDB.SAVE', SharedStorage.copyAll());

return <ICommand>{
type: 'string',
data: 'OK'
}
};

export default SAVE;
2 changes: 2 additions & 0 deletions app/config/_setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './worker.config';
export * from './args.config';
2 changes: 1 addition & 1 deletion app/setup.ts → app/config/args.config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import SharedEnv from './storage/SharedEnv';
import SharedEnv from '../storage/SharedEnv';

(
() => {
Expand Down
7 changes: 7 additions & 0 deletions app/config/worker.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {WorkerObserver, subject} from "../workers/worker.observer";

const rdbUrl = new URL('../workers/rdb/rdb.worker.ts', import.meta.url).pathname;
const RDBWorker = new WorkerObserver('RDBWorker', rdbUrl);
subject.attach(RDBWorker);

export {RDBWorker}
6 changes: 5 additions & 1 deletion app/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {} from "./setup";
import {RDBWorker} from "./config/_setup";

import RESP from "./protocol/RESP";
import Router from "./router";
Expand All @@ -12,4 +12,8 @@ const server: net.Server = net.createServer((connection: net.Socket) => {
});
});

RDBWorker.onmessage((event: MessageEvent) => {
console.log(`[${new Date().toLocaleTimeString()}] [${RDBWorker.name}] ${event.data}`);
});

server.listen(6379, "127.0.0.1");
4 changes: 4 additions & 0 deletions app/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import TTL from "./commands/dataManipulation/TTL";
import UNKNOWN from "./commands/errors/unknown";
import PING from "./commands/basicUtility/PING";
import ECHO from "./commands/basicUtility/ECHO";
import SAVE from "./commands/presistence/SAVE";
import IRESP from "./protocol/IRESP";

const router = (commands: IRESP[]): ICommand => {
Expand All @@ -37,6 +38,9 @@ const router = (commands: IRESP[]): ICommand => {
// Configuration commands
if (/config/gim.test(command)) return CONFIG(commands);

// Persistence commands
if (/save$/gim.test(command)) return SAVE();

return UNKNOWN(commands[0].data);
}

Expand Down
36 changes: 26 additions & 10 deletions app/storage/SharedStorage.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
import {isNumber} from "node:util";

interface ISharedStorage {
set(key: string, value: any, expiration?: number): void;
get(key: string): any | undefined;
find(key: string): boolean;
delete(key: string): void;
setKey(key: string, value: any, expiration?: number): void;
getKey(key: string): any | undefined;
findKey(key: string): boolean;
deleteKey(key: string): void;
search(pattern: string): string[];
getTTL(key: string): number;
copyAll(): Map<string, any>;
deleteAll(): void;
}

interface IStorage {
value: any;
type: 'string' | 'number';
expiredAt?: number;
createdAt?: number;
}

class SharedStorage implements ISharedStorage {
private storage: Map<string, IStorage>;
private readonly storage: Map<string, IStorage>;

constructor() {
this.storage = new Map<string, IStorage>();
}

public set(key: string, value: any, expiration?: number): void {
this.storage.set(key, { value, expiredAt: expiration ? Date.now() + expiration : undefined });
public setKey(key: string, value: any, expiration?: number): void {
this.storage.set(
key, {
value,
type: !isNaN(Number(value)) ? 'number' : 'string',
expiredAt: expiration ? Date.now() + expiration : undefined,
createdAt: Date.now()
}
);

if (expiration) {
setTimeout(() => {
Expand All @@ -30,15 +42,15 @@ class SharedStorage implements ISharedStorage {
}
}

public get(key: string): any | undefined {
public getKey(key: string): any | undefined {
return this.storage.get(key)?.value;
}

public find(key: string): boolean {
public findKey(key: string): boolean {
return this.storage.has(key);
}

public delete(key: string): boolean {
public deleteKey(key: string): boolean {
return this.storage.delete(key);
}

Expand All @@ -57,6 +69,10 @@ class SharedStorage implements ISharedStorage {
return Math.max(0, Math.ceil((data.expiredAt - Date.now()) / 1000));
}

public copyAll(): Map<string, any> {
return new Map(this.storage);
}

public deleteAll(): void {
this.storage.clear();
}
Expand Down
13 changes: 13 additions & 0 deletions app/workers/rdb/rdb.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import generateRDB from "./utils/generateRDB";
declare var self: Worker;

self.onmessage = (event: MessageEvent) => {
const message: string = event.data.signal;

switch (message) {
case 'RDB.SAVE':
const result = generateRDB(event.data.content);
postMessage(result);
break;
}
};
72 changes: 72 additions & 0 deletions app/workers/rdb/utils/generateRDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unixTimeEncoder from "./unixTimeEncoder";
import stringEncoder from "./stringEncoder";
import fs from "fs";

enum OP_CODES {
REDIS_WITH_MAGIC_NUMBER = 'REDIS0007',
REDIS_VERSION = '6.0.16',
METADATA = 'FA',
DATA_SECTION = 'FE',
HASH_INFORMATION_SECTION = 'FB',
END_OF_FILE_SECTION = 'FF',
}

enum DATA_TYPES {
STRING = '00',
}

const generateRDB = (data: Map<string, any>): string => {
const folderPath = process.cwd() + '/dumps';
const fileName = 'dump.rdb';

if (!fs.existsSync(folderPath)) fs.mkdirSync(folderPath);

// this will generate the header
const header = Buffer.from(OP_CODES.REDIS_WITH_MAGIC_NUMBER).toString('hex');

// this will generate the metadata section
const metadata =
OP_CODES.METADATA +
Buffer.from('redis-ver').toString('hex') +
Buffer.from(OP_CODES.REDIS_VERSION).toString('hex');

// this will generate the data section
let dataSection = OP_CODES.DATA_SECTION + '00' + OP_CODES.HASH_INFORMATION_SECTION

// get the encoding size
dataSection += (data.size + '').padStart(2, '0');
dataSection += Array.from(data.values())
.filter((item: any) => item.expiredAt !== undefined)
.length.toString(16).padStart(2, '0');

// for each key-value pair, generate the data section
data.forEach((value, key) => {
const type = value.type;
const expiredAt = value.expiredAt;
const data = value.value;

let result: string | null = null;
if (type === 'string') result = DATA_TYPES.STRING;
else return;

result += stringEncoder(key);
result += stringEncoder(data);

if (expiredAt) result += 'FC' + unixTimeEncoder(expiredAt);
dataSection += result;
});

// this will generate the end of the file section
dataSection += OP_CODES.END_OF_FILE_SECTION;

// Temporary CRC64 checksum
const CRC64CheckSum = '0000000000000000';
dataSection += CRC64CheckSum;

const buffer = Buffer.from(header + metadata + dataSection, 'hex');
fs.writeFileSync(folderPath + '/' + fileName, buffer);

return `RDB file generated ${fileName}`;
};

export default generateRDB;
16 changes: 16 additions & 0 deletions app/workers/rdb/utils/stringEncoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Encodes a string to a hex string with a length prefix.
* @param str
* @returns string
* @example
* stringEncoder('foobar');
* // => '06 66 6F 6F 62 61 72'
*/
const stringEncoder = (str: string): string => {
const length = (str.length + '').padStart(2, '0');
const encoded = Buffer.from(str).toString('hex');

return length + encoded;
}

export default stringEncoder;
13 changes: 13 additions & 0 deletions app/workers/rdb/utils/unixTimeEncoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* Encodes a unix time into a 8-byte unsigned long, in little-endian.
* @param time
* @returns string
* @example
* unixTimeEncoder(1713824559637);
* // => '15 72 E7 07 8F 01 00 00'
*/
const unixTimeEncoder = (time: number): string => {
return time.toString(16).padStart(16, '0');
};

export default unixTimeEncoder;
Loading

0 comments on commit 398d5d3

Please sign in to comment.