Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from aspecto-io/fix/events
Browse files Browse the repository at this point in the history
Fix/events
  • Loading branch information
mzahor authored Mar 25, 2020
2 parents 73e1eb8 + 9b1d7a8 commit d48c31a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 15 deletions.
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ await sqsProducer.sendJSON({
### SQS Consumer

```ts
import { SqsConsumer } from 'sns-sqs-big-payload';
import { SqsConsumer, SqsConsumerEvents } from 'sns-sqs-big-payload';

const sqsConsumer = SqsConsumer.create({
queueUrl: '...',
Expand All @@ -95,6 +95,11 @@ const sqsConsumer = SqsConsumer.create({
},
});

// to subscribe for events
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
// ...
});

sqsConsumer.start();

// to stop processing
Expand Down Expand Up @@ -140,7 +145,14 @@ consumer.start();

## Events and logging

SqsConsumer has an [EventEmitter](https://nodejs.org/api/events.html) and send the following events:
SqsConsumer has an internal [EventEmitter](https://nodejs.org/api/events.html), you can subscribe for events like this:
```ts
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
// ...
});
```

It sends the following events:

| Event | Params | Description |
| ------------------- | ---------------- | ----------------------------------------------------------------------------------- |
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sns-sqs-big-payload",
"version": "0.0.3",
"version": "0.0.4",
"license": "MIT",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
12 changes: 10 additions & 2 deletions src/sqs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ export interface SqsConsumerOptions {
transformMessageBody?(messageBody: any): any;
}

enum SqsConsumerEvents {
export enum SqsConsumerEvents {
started = 'started',
messageReceived = 'message-received',
messageParsed = 'message-parsed',
messageProcessed = 'message-processed',
stopped = 'stopped',
error = 'error',
Expand Down Expand Up @@ -96,6 +97,10 @@ export class SqsConsumer {
this.events.emit(SqsConsumerEvents.stopped);
}

on(event: string | symbol, handler: (...args: any) => void): void {
this.events.on(event, handler);
}

private poll(): void {
if (!this.started) return;
let currentPollingInterval = this.pollingInterval;
Expand Down Expand Up @@ -141,7 +146,10 @@ export class SqsConsumer {
const messageBody = this.transformMessageBody ? this.transformMessageBody(message.Body) : message.Body;
const rawPayload = await this.getMessagePayload(messageBody);
const payload = this.parseMessagePayload(rawPayload);
await this.handleMessage({ payload, message });
this.events.emit(SqsConsumerEvents.messageParsed, { message, payload });
if (this.handleMessage) {
await this.handleMessage({ payload, message });
}
await this.deleteMessage(message);
this.events.emit(SqsConsumerEvents.messageProcessed, message);
} catch (err) {
Expand Down
99 changes: 89 additions & 10 deletions tests/sns-sqs.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
SqsConsumerEvents,
SqsProducer,
SqsConsumer,
SqsProducerOptions,
Expand Down Expand Up @@ -93,28 +94,51 @@ async function publishMessage(msg: any, options: Partial<SnsProducerOptions> = {
await snsProducer.publishJSON(msg);
}

async function receiveMessages(expectedMsgsCount: number, options: Partial<SqsConsumerOptions> = {}): Promise<any> {
async function receiveMessages(
expectedMsgsCount: number,
options: Partial<SqsConsumerOptions> = {},
eventHandlers?: Record<string | symbol, (...args) => void>
): Promise<any> {
const { s3 } = getClients();
return new Promise((res, rej) => {
return new Promise((resolve, rej) => {
const messages = [];
let timeoutId;

const sqsConsumer = SqsConsumer.create({
queueUrl: TEST_QUEUE_URL,
region: TEST_REGION,
parsePayload: (raw) => JSON.parse(raw),
handleMessage: async ({ payload }) => {
messages.push(payload);
if (messages.length === expectedMsgsCount) {
sqsConsumer.stop();
clearTimeout(timeoutId);
res(messages);
}
},
...options,
s3,
});

sqsConsumer.on(SqsConsumerEvents.messageParsed, ({ payload }) => {
messages.push(payload);
});

sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
if (messages.length === expectedMsgsCount) {
sqsConsumer.stop();
clearTimeout(timeoutId);
resolve(messages);
}
});

sqsConsumer.on(SqsConsumerEvents.error, () => {
sqsConsumer.stop();
clearTimeout(timeoutId);
});

sqsConsumer.on(SqsConsumerEvents.processingError, () => {
sqsConsumer.stop();
clearTimeout(timeoutId);
resolve();
});

if (eventHandlers) {
Object.entries(eventHandlers).forEach(([event, handler]) => sqsConsumer.on(event, handler));
}

timeoutId = setTimeout(() => {
rej(new Error("Timeout: SqsConsumer didn't get any messages for 5 seconds."));
sqsConsumer.stop();
Expand Down Expand Up @@ -153,6 +177,61 @@ describe('sns-sqs-big-payload', () => {
});
});

describe('events', () => {
function getEventHandlers() {
const handlers = Object.keys(SqsConsumerEvents).reduce((acc, key) => {
acc[SqsConsumerEvents[key]] = jest.fn();
return acc;
}, {});
return handlers;
}

it('should trigger success events event', async () => {
const message = { it: 'works' };
const handlers = getEventHandlers();
sendMessage(message);
const [receivedMessage] = await receiveMessages(1, {}, handlers);
expect(receivedMessage).toEqual(message);

// success
expect(handlers[SqsConsumerEvents.started]).toBeCalled();
expect(handlers[SqsConsumerEvents.messageReceived]).toBeCalled();
expect(handlers[SqsConsumerEvents.messageParsed]).toBeCalled();
expect(handlers[SqsConsumerEvents.messageProcessed]).toBeCalled();
expect(handlers[SqsConsumerEvents.stopped]).toBeCalled();
// errors
expect(handlers[SqsConsumerEvents.error]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.processingError]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.payloadParseError]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.s3PayloadError]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.connectionError]).not.toBeCalled();
});

it('should should trigger processingError event', async () => {
const message = { it: 'works' };
const handlers = getEventHandlers();
sendMessage(message);
await receiveMessages(
1,
{
handleMessage: () => {
throw new Error('Processing error');
},
},
handlers
);

// errors
expect(handlers[SqsConsumerEvents.messageReceived]).toBeCalled();
expect(handlers[SqsConsumerEvents.messageParsed]).toBeCalled();
expect(handlers[SqsConsumerEvents.processingError]).toBeCalled();
expect(handlers[SqsConsumerEvents.messageProcessed]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.error]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.connectionError]).not.toBeCalled();
expect(handlers[SqsConsumerEvents.payloadParseError]).not.toBeCalled();
});
});

describe('sending message through s3', () => {
it('should send all message though s3 if configured', async () => {
const message = { it: 'works' };
Expand Down

0 comments on commit d48c31a

Please sign in to comment.