Skip to content

Commit

Permalink
feat(bus): implement EventBus for RabbitMQ (#6)
Browse files Browse the repository at this point in the history
closes #5
  • Loading branch information
RomanReznichenko authored Apr 10, 2022
1 parent 3a33da5 commit 5700409
Show file tree
Hide file tree
Showing 18 changed files with 1,912 additions and 83 deletions.
392 changes: 309 additions & 83 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
},
"homepage": "https://github.com/NeuraLegion/secbox-sdk-js#readme",
"dependencies": {
"amqp-connection-manager": "^4.1.1",
"amqplib": "^0.8.0",
"reflect-metadata": "^0.1.13",
"tslib": "~2.3.1",
"tsyringe": "^4.6.0",
Expand All @@ -92,6 +94,8 @@
"@nrwl/workspace": "13.9.4",
"@semantic-release/exec": "^6.0.3",
"@semantic-release/git": "^10.0.1",
"@types/amqp-connection-manager": "^2.0.12",
"@types/amqplib": "^0.8.2",
"@types/jest": "^27.4.0",
"@types/node": "~16.11.25",
"@types/uuid": "^8.3.4",
Expand Down
3 changes: 3 additions & 0 deletions packages/bus/.babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"presets": [["@nrwl/web/babel", { "useBuiltIns": "usage" }]]
}
21 changes: 21 additions & 0 deletions packages/bus/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"parserOptions": {
"project": ["packages/bus/tsconfig.*?.json"]
},
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
167 changes: 167 additions & 0 deletions packages/bus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# @secbox/bus

The package includes a simplified implementation of the `EventBus`, one based on `RabbitMQ`, to establish synchronous and asynchronous communication between services and agents.

## Setup

```bash
npm i -s @secbox/bus
```

## Usage

### Overview

To use the RabbitMQ Event Bus, pass the following options object to the constructor method:

```ts
import { RMQEventBus } from '@secbox/bus';

const config = new Configuration({
cluster: 'app.neuralegion.com'
});

const repeaterId = 'your Repeater ID';
const token = 'your API key';

const bus = new RMQEventBus(config.container, {
exchange: 'EventBus',
clientQueue: `agent:${repeaterId}`,
appQueue: 'app',
credentials: {
username: 'bot',
password: token
}
});
```

The options are specific to the chosen transporter. The `RabbitMQ` implementation exposes the properties described below:

| Option | Description |
| :------------------ | ------------------------------------------------------------------------------------ |
| `exchange` | Exchange name which routes a message to a particular queue. |
| `clientQueue` | Queue name which your bus will listen to. |
| `appQueue` | Queue name which application will listen to. |
| `prefetchCount` | Sets the prefetch count for the channel. By default, `1` |
| `connectTimeout` | Time to wait for initial connect. If not specified, defaults to `heartbeatInterval`. |
| `reconnectTime` | The time to wait before trying to reconnect. By default, `20` seconds. |
| `heartbeatInterval` | The interval, in seconds, to send heartbeats. By default, `30` seconds. |
| `credentials` | The `username` and `password` to perform authentication. |

Finally, to establish a connection with `RabbitMQ`, you have to the `init()` method.

```ts
await bus.init();
```

In case of unrecoverable or operational errors, you will get an exception while initial connecting.

### Subscribing to events

To subscribe an event handler to the particular event, you should use the `@bind()` decorator as follows:

```ts
import { bind, EventHandler } from '@secbox/core';
import { injectable } from 'tsyringe';

@bind(IssueDetected)
@injectable()
class IssueDetectedHandler implements EventHandler<Issue> {
public handle(payload: Issue): Promise<void> {
// implementation
}
}
```

> ⚡ Make sure that you use `@injectable()` decorator to register the corresponding provider in the IoC. Otherwise, you get an error while trying to register a handler in the `EventBus`.
Then you just need to register the handler in the `EvenBus`:

```ts
await bus.register(IssueDetectedHandler);
```

Now the `IssueDetectedHandler` event handler listens for the `IssueDetected` event. As soon as the `IssueDetected` event appers,
the `EventBus` will call the `handle()` method with the payload passed from the application.

To remove subscription, and removes the event handler, you have to call the `unregister()` method:

```ts
await bus.unregister(IssueDetectedHandler);
```

#### Publishing events through the event bus

The `EventBus` exposes a `publish()` method. This method publishes an event to the message broker.

```ts
interface Payload {
status: 'connected' | 'disconnected';
}

class StatusChanged extends Event<Payload> {
constructor(payload: Payload) {
super(payload);
}
}

const event = new StatusChanged({ status: 'connected' });

await bus.publish(event);
```

The `publish()` method takes just a single argument, an instance of the derived class of the `Event`.

> ⚡ The class name should match one defined event in the application. Otherwise, you should override it by passing the expected name via the constructor.
For more information, please see `@secbox/core`.

#### Executing RPC methods

The `EventBus` exposes a `execute()` method. This method is intended to perform a command to the application and returns an `Promise` with its response.

```ts
interface Payload {
version: string;
}

interface Response {
lastVersion: string;
}

class CheckVersion extends Command<Payload, Response> {
constructor(payload: Payload) {
super(payload);
}
}

const command = new CheckVersion({ version: '0.0.1' });

const response = await bus.execute(command);
```

This method returns a `Promise` which will eventually be resolved as a response message.

For instance, if you do not expect any response, you can easily make the `EventBus` resolve a `Promise` immediately to undefined:

```ts
class Record extends Command<Payload> {
public readonly expectReply = false;

constructor(payload: Payload) {
super(payload);
}
}

const command = new Record({ version: '0.0.1' });

await bus.execute(command);
```

For more information, please see `@secbox/core`.

## License

Copyright © 2022 [NeuraLegion](https://github.com/NeuraLegion).

This project is licensed under the MIT License - see the [LICENSE file](LICENSE) for details.
15 changes: 15 additions & 0 deletions packages/bus/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = {
displayName: 'bus',
preset: '../../jest.preset.js',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json'
}
},
testEnvironment: 'node',
transform: {
'^.+\\.[tj]sx?$': 'ts-jest'
},
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx'],
coverageDirectory: '../../coverage/packages/bus'
};
41 changes: 41 additions & 0 deletions packages/bus/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "@secbox/bus",
"version": "0.0.1",
"description": "The package includes a simplified implementation of the `EventBus`, one based on `RabbitMQ`, to establish synchronous and asynchronous communication between services and agents.",
"repository": {
"type": "git",
"url": "git+https://github.com/NeuraLegion/secbox-sdk-js.git"
},
"engines": {
"node": ">=16",
"npm": "^8.1.0"
},
"author": {
"name": "Artem Derevnjuk",
"email": "[email protected]"
},
"license": "MIT",
"bugs": {
"url": "https://github.com/NeuraLegion/secbox-sdk-js/issues"
},
"publishConfig": {
"access": "public"
},
"keywords": [
"security",
"testing",
"e2e",
"test",
"typescript",
"appsec",
"pentesting",
"qa",
"brightsec",
"rmq",
"rabbitmq",
"bus"
],
"peerDependencies": {
"@secbox/core": "^0.2.0"
}
}
40 changes: 40 additions & 0 deletions packages/bus/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"root": "packages/bus",
"sourceRoot": "packages/bus/src",
"projectType": "library",
"targets": {
"build": {
"executor": "@nrwl/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/bus",
"tsConfig": "packages/bus/tsconfig.lib.json",
"packageJson": "packages/bus/package.json",
"main": "packages/bus/src/index.ts",
"assets": ["packages/bus/*.md", "LICENSE"]
}
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["packages/bus/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/packages/bus"],
"options": {
"jestConfig": "packages/bus/jest.config.js",
"passWithNoTests": true
}
},
"publish": {
"executor": "./tools/executors:publish",
"options": {
"dist": "dist/packages/bus"
}
}
},
"tags": []
}
Loading

0 comments on commit 5700409

Please sign in to comment.