From b7cf5155b3abcf541ee7967874ab597b0397c8df Mon Sep 17 00:00:00 2001 From: Jeremy Gillick Date: Tue, 30 Jul 2024 14:26:55 -0700 Subject: [PATCH 1/2] Add pipeline support for batching publish calls --- README.md | 29 +++ package-lock.json | 2 +- package.json | 2 +- src/redis-pipeline.ts | 39 +++ src/redis-pubsub.ts | 31 ++- src/test/tests.ts | 592 ++++++++++++++++++++++++------------------ 6 files changed, 436 insertions(+), 259 deletions(-) create mode 100644 src/redis-pipeline.ts diff --git a/README.md b/README.md index d0177e55..198a773b 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,35 @@ pubSub.subscribe('Test', message => { }); ``` +## Use a pipeline to batch publish calls + +If you need to publish a number of triggers at the same time, you can batch them in a single HTTP request to redis using the pub/sub pipeline: + +```javascript +const pubSub = new RedisPubSub({ ... }); + +// Create pipeline +const pipeline = pubSub.pipeline(); + +[1,2,3,4,5,6].forEach((id) => pipeline.publish('Test', {id})) + +// Execute the pipeline to redis +await pipeline.exec(); +``` + +The publish method is also chainable. + +```javascript +const pubSub = new RedisPubSub({ ... }); +const pipeline = pubSub.pipeline(); + +await pipeline + .publish('Test1', {}) + .publish('Test2', {}) + .publish('Test3', {}) + .exec(); +``` + ## Old Usage (Deprecated) ```javascript diff --git a/package-lock.json b/package-lock.json index 1b721abc..9fc64b79 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,7 @@ "@types/chai-as-promised": "^7.1.3", "@types/ioredis": "^5.0.0", "@types/mocha": "^9.1.1", - "@types/node": "20.12.9", + "@types/node": "^20.12.9", "@types/simple-mock": "^0.8.1", "@typescript-eslint/eslint-plugin": "^5.36.0", "@typescript-eslint/parser": "^5.36.0", diff --git a/package.json b/package.json index 3fb3902a..fe7e8ec5 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "@types/chai-as-promised": "^7.1.3", "@types/ioredis": "^5.0.0", "@types/mocha": "^9.1.1", - "@types/node": "20.12.9", + "@types/node": "^20.12.9", "@types/simple-mock": "^0.8.1", "@typescript-eslint/eslint-plugin": "^5.36.0", "@typescript-eslint/parser": "^5.36.0", diff --git a/src/redis-pipeline.ts b/src/redis-pipeline.ts new file mode 100644 index 00000000..fd256616 --- /dev/null +++ b/src/redis-pipeline.ts @@ -0,0 +1,39 @@ +import { ChainableCommander } from "ioredis"; +import type { RedisClient, Serializer } from "./redis-pubsub"; + +export interface RedisPubSubPipelineOptions { + publisher: RedisClient; + serializer: Serializer; +} + +/** + * Create a pipelined publisher that will send all the pub/sub messages in a single batch + */ +export class RedisPubSubPipeline { + private pipeline: ChainableCommander; + private publisher: RedisClient; + private serializer: Serializer; + + constructor(options: RedisPubSubPipelineOptions) { + this.publisher = options.publisher; + this.serializer = options.serializer; + + // Start pipeline + this.pipeline = this.publisher.pipeline(); + } + + /** + * Publish to the redis pipeline + */ + public publish(trigger: string, payload: T): this { + this.pipeline.publish(trigger, this.serializer(payload)); + return this; + } + + /** + * Execute the entire pipeline + */ + public async exec() { + return this.pipeline.exec(); + } +} diff --git a/src/redis-pubsub.ts b/src/redis-pubsub.ts index 9ca6f488..405fcc59 100644 --- a/src/redis-pubsub.ts +++ b/src/redis-pubsub.ts @@ -1,11 +1,19 @@ import {Cluster, Redis, RedisOptions} from 'ioredis'; import {PubSubEngine} from 'graphql-subscriptions'; import {PubSubAsyncIterator} from './pubsub-async-iterator'; +import { RedisPubSubPipeline } from "./redis-pipeline"; -type RedisClient = Redis | Cluster; +export type RedisClient = Redis | Cluster; type OnMessage = (message: T) => void; type DeserializerContext = { channel: string, pattern?: string }; +function defaultSerializer(payload: unknown): string | Buffer { + if (payload instanceof Buffer) { + return payload; + } + return JSON.stringify(payload); +} + export interface PubSubRedisOptions { connection?: RedisOptions | string; triggerTransform?: TriggerTransform; @@ -29,8 +37,8 @@ export class RedisPubSub implements PubSubEngine { subscriber, publisher, reviver, - serializer, deserializer, + serializer = defaultSerializer, messageEventName = 'message', pmessageEventName = 'pmessage', } = options; @@ -85,13 +93,14 @@ export class RedisPubSub implements PubSubEngine { } public async publish(trigger: string, payload: T): Promise { - if(this.serializer) { - await this.redisPublisher.publish(trigger, this.serializer(payload)); - } else if (payload instanceof Buffer){ - await this.redisPublisher.publish(trigger, payload); - } else { - await this.redisPublisher.publish(trigger, JSON.stringify(payload)); - } + await this.redisPublisher.publish(trigger, this.serializer(payload)); + } + + public pipeline() { + return new RedisPubSubPipeline({ + publisher: this.redisPublisher, + serializer: this.serializer, + }); } public subscribe( @@ -187,7 +196,7 @@ export class RedisPubSub implements PubSubEngine { ]); } - private readonly serializer?: Serializer; + private readonly serializer: Serializer; private readonly deserializer?: Deserializer; private readonly triggerTransform: TriggerTransform; private readonly redisSubscriber: RedisClient; @@ -247,5 +256,5 @@ export type TriggerTransform = ( channelOptions?: unknown, ) => string; export type Reviver = (key: any, value: any) => any; -export type Serializer = (source: any) => string; +export type Serializer = (source: unknown) => string | Buffer; export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any; diff --git a/src/test/tests.ts b/src/test/tests.ts index 331f0400..666b8d92 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -1,8 +1,9 @@ -import * as chai from 'chai'; -import * as chaiAsPromised from 'chai-as-promised'; -import { spy, restore, stub } from 'simple-mock'; -import { RedisPubSub } from '../redis-pubsub'; -import * as IORedis from 'ioredis'; +import * as chai from "chai"; +import * as chaiAsPromised from "chai-as-promised"; +import { spy, restore, stub } from "simple-mock"; +import { RedisPubSub } from "../redis-pubsub"; +import { RedisPubSubPipeline } from "../redis-pipeline"; +import * as IORedis from "ioredis"; chai.use(chaiAsPromised); const expect = chai.expect; @@ -11,38 +12,46 @@ const expect = chai.expect; let listener; -const publishSpy = spy((channel, message) => listener && listener(channel, message)); +const publishSpy = spy( + (channel, message) => listener && listener(channel, message) +); const subscribeSpy = spy((channel, cb) => cb && cb(null, channel)); const unsubscribeSpy = spy((channel, cb) => cb && cb(channel)); const psubscribeSpy = spy((channel, cb) => cb && cb(null, channel)); const punsubscribeSpy = spy((channel, cb) => cb && cb(channel)); -const quitSpy = spy(cb => cb); +const pipelinePublishSpy = spy(() => {}); +const pipelineExecSpy = spy(() => Promise.resolve()); +const pipelineSpy = spy(() => mockPipelineClient); + +const quitSpy = spy((cb) => cb); const mockRedisClient = { publish: publishSpy, subscribe: subscribeSpy, unsubscribe: unsubscribeSpy, psubscribe: psubscribeSpy, punsubscribe: punsubscribeSpy, + pipeline: pipelineSpy, on: (event, cb) => { - if (event === 'message') { + if (event === "message") { listener = cb; } }, quit: quitSpy, }; +const mockPipelineClient = { + publish: pipelinePublishSpy, + exec: pipelineExecSpy, +}; const mockOptions = { - publisher: (mockRedisClient as any), - subscriber: (mockRedisClient as any), + publisher: mockRedisClient as any, + subscriber: mockRedisClient as any, }; - - // -------------- Mocking Redis Client ------------------ -describe('RedisPubSub', () => { - - it('should create default ioredis clients if none were provided', done => { +describe("RedisPubSub", () => { + it("should create default ioredis clients if none were provided", (done) => { const pubSub = new RedisPubSub(); expect(pubSub.getSubscriber()).to.be.an.instanceOf(IORedis); expect(pubSub.getPublisher()).to.be.an.instanceOf(IORedis); @@ -50,10 +59,11 @@ describe('RedisPubSub', () => { done(); }); - it('should verify close calls pub and sub quit methods', done => { + it("should verify close calls pub and sub quit methods", (done) => { const pubSub = new RedisPubSub(mockOptions); - pubSub.close() + pubSub + .close() .then(() => { expect(quitSpy.callCount).to.equal(2); done(); @@ -61,148 +71,168 @@ describe('RedisPubSub', () => { .catch(done); }); - it('can subscribe to specific redis channel and called when a message is published on it', done => { + it("can subscribe to specific redis channel and called when a message is published on it", (done) => { const pubSub = new RedisPubSub(mockOptions); - pubSub.subscribe('Posts', message => { - try { - expect(message).to.equals('test'); - done(); - } catch (e) { - done(e); - } - - }).then(async subId => { - expect(subId).to.be.a('number'); - await pubSub.publish('Posts', 'test'); - pubSub.unsubscribe(subId); - }); + pubSub + .subscribe("Posts", (message) => { + try { + expect(message).to.equals("test"); + done(); + } catch (e) { + done(e); + } + }) + .then(async (subId) => { + expect(subId).to.be.a("number"); + await pubSub.publish("Posts", "test"); + pubSub.unsubscribe(subId); + }); }); - it('can subscribe to a redis channel pattern and called when a message is published on it', done => { + it("can subscribe to a redis channel pattern and called when a message is published on it", (done) => { const pubSub = new RedisPubSub(mockOptions); - pubSub.subscribe('Posts*', message => { - try { - expect(psubscribeSpy.callCount).to.equal(1); - expect(message).to.equals('test'); - done(); - } catch (e) { - done(e); - } - }, { pattern: true }).then(async subId => { - expect(subId).to.be.a('number'); - await pubSub.publish('Posts*', 'test'); - pubSub.unsubscribe(subId); - }); + pubSub + .subscribe( + "Posts*", + (message) => { + try { + expect(psubscribeSpy.callCount).to.equal(1); + expect(message).to.equals("test"); + done(); + } catch (e) { + done(e); + } + }, + { pattern: true } + ) + .then(async (subId) => { + expect(subId).to.be.a("number"); + await pubSub.publish("Posts*", "test"); + pubSub.unsubscribe(subId); + }); }); - it('can unsubscribe from specific redis channel', done => { + it("can unsubscribe from specific redis channel", (done) => { const pubSub = new RedisPubSub(mockOptions); - pubSub.subscribe('Posts', () => null).then(subId => { - pubSub.unsubscribe(subId); + pubSub + .subscribe("Posts", () => null) + .then((subId) => { + pubSub.unsubscribe(subId); - try { + try { + expect(unsubscribeSpy.callCount).to.equals(1); + expect(unsubscribeSpy.lastCall.args).to.have.members(["Posts"]); - expect(unsubscribeSpy.callCount).to.equals(1); - expect(unsubscribeSpy.lastCall.args).to.have.members(['Posts']); + expect(punsubscribeSpy.callCount).to.equals(1); + expect(punsubscribeSpy.lastCall.args).to.have.members(["Posts"]); - expect(punsubscribeSpy.callCount).to.equals(1); - expect(punsubscribeSpy.lastCall.args).to.have.members(['Posts']); + done(); + } catch (e) { + done(e); + } + }); + }); + it("cleans up correctly the memory when unsubscribing", (done) => { + const pubSub = new RedisPubSub(mockOptions); + Promise.all([ + pubSub.subscribe("Posts", () => null), + pubSub.subscribe("Posts", () => null), + ]).then(([subId, secondSubId]) => { + try { + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).not.to.be.an( + "undefined" + ); + pubSub.unsubscribe(subId); + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).to.be.an("undefined"); + expect(() => pubSub.unsubscribe(subId)).to.throw( + `There is no subscription of id "${subId}"` + ); + pubSub.unsubscribe(secondSubId); done(); - } catch (e) { done(e); } }); }); - it('cleans up correctly the memory when unsubscribing', done => { - const pubSub = new RedisPubSub(mockOptions); - Promise.all([ - pubSub.subscribe('Posts', () => null), - pubSub.subscribe('Posts', () => null), - ]) - .then(([subId, secondSubId]) => { - try { - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined'); - pubSub.unsubscribe(subId); - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined'); - expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`); - pubSub.unsubscribe(secondSubId); - done(); - - } catch (e) { - done(e); - } - }); - }); - - it('concurrent subscribe, unsubscribe first sub before second sub complete', done => { + it("concurrent subscribe, unsubscribe first sub before second sub complete", (done) => { const promises = { firstSub: null as Promise, secondSub: null as Promise, - } + }; - let firstCb, secondCb + let firstCb, secondCb; const redisSubCallback = (channel, cb) => { process.nextTick(() => { if (!firstCb) { - firstCb = () => cb(null, channel) + firstCb = () => cb(null, channel); // Handling first call, init second sub - promises.secondSub = pubSub.subscribe('Posts', () => null) + promises.secondSub = pubSub.subscribe("Posts", () => null); // Continue first sub callback - firstCb() + firstCb(); } else { - secondCb = () => cb(null, channel) + secondCb = () => cb(null, channel); } - }) - } + }); + }; const subscribeStub = stub().callFn(redisSubCallback); - const mockRedisClientWithSubStub = {...mockRedisClient, ...{subscribe: subscribeStub}}; - const mockOptionsWithSubStub = {...mockOptions, ...{subscriber: (mockRedisClientWithSubStub as any)}} + const mockRedisClientWithSubStub = { + ...mockRedisClient, + ...{ subscribe: subscribeStub }, + }; + const mockOptionsWithSubStub = { + ...mockOptions, + ...{ subscriber: mockRedisClientWithSubStub as any }, + }; const pubSub = new RedisPubSub(mockOptionsWithSubStub); // First leg of the test, init first sub and immediately unsubscribe. The second sub is triggered in the redis cb // before the first promise sub complete - promises.firstSub = pubSub.subscribe('Posts', () => null) - .then(subId => { + promises.firstSub = pubSub + .subscribe("Posts", () => null) + .then((subId) => { // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined'); + expect((pubSub as any).subscriptionMap[subId]).not.to.be.an( + "undefined" + ); pubSub.unsubscribe(subId); // Continue second sub callback - promises.firstSub.then(() => secondCb()) + promises.firstSub.then(() => secondCb()); return subId; }); // Second leg of the test, here we have unsubscribed from the first sub. We try unsubbing from the second sub // as soon it is ready - promises.firstSub - .then((subId) => { - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined'); - expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`); - - return promises.secondSub.then(secondSubId => { + promises.firstSub.then((subId) => { + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).to.be.an("undefined"); + expect(() => pubSub.unsubscribe(subId)).to.throw( + `There is no subscription of id "${subId}"` + ); + + return promises.secondSub + .then((secondSubId) => { pubSub.unsubscribe(secondSubId); }) - .then(done) - .catch(done) + .then(done) + .catch(done); }); }); - it('will not unsubscribe from the redis channel if there is another subscriber on it\'s subscriber list', done => { + it("will not unsubscribe from the redis channel if there is another subscriber on it's subscriber list", (done) => { const pubSub = new RedisPubSub(mockOptions); const subscriptionPromises = [ - pubSub.subscribe('Posts', () => { - done('Not supposed to be triggered'); + pubSub.subscribe("Posts", () => { + done("Not supposed to be triggered"); }), - pubSub.subscribe('Posts', (msg) => { + pubSub.subscribe("Posts", (msg) => { try { - expect(msg).to.equals('test'); + expect(msg).to.equals("test"); done(); } catch (e) { done(e); @@ -210,14 +240,14 @@ describe('RedisPubSub', () => { }), ]; - Promise.all(subscriptionPromises).then(async subIds => { + Promise.all(subscriptionPromises).then(async (subIds) => { try { expect(subIds.length).to.equals(2); pubSub.unsubscribe(subIds[0]); expect(unsubscribeSpy.callCount).to.equals(0); - await pubSub.publish('Posts', 'test'); + await pubSub.publish("Posts", "test"); pubSub.unsubscribe(subIds[1]); expect(unsubscribeSpy.callCount).to.equals(1); } catch (e) { @@ -226,15 +256,15 @@ describe('RedisPubSub', () => { }); }); - it('will subscribe to redis channel only once', done => { + it("will subscribe to redis channel only once", (done) => { const pubSub = new RedisPubSub(mockOptions); const onMessage = () => null; const subscriptionPromises = [ - pubSub.subscribe('Posts', onMessage), - pubSub.subscribe('Posts', onMessage), + pubSub.subscribe("Posts", onMessage), + pubSub.subscribe("Posts", onMessage), ]; - Promise.all(subscriptionPromises).then(subIds => { + Promise.all(subscriptionPromises).then((subIds) => { try { expect(subIds.length).to.equals(2); expect(subscribeSpy.callCount).to.equals(1); @@ -248,23 +278,23 @@ describe('RedisPubSub', () => { }); }); - it('can have multiple subscribers and all will be called when a message is published to this channel', done => { + it("can have multiple subscribers and all will be called when a message is published to this channel", (done) => { const pubSub = new RedisPubSub(mockOptions); const onMessageSpy = spy(() => null); const subscriptionPromises = [ - pubSub.subscribe('Posts', onMessageSpy), - pubSub.subscribe('Posts', onMessageSpy), + pubSub.subscribe("Posts", onMessageSpy), + pubSub.subscribe("Posts", onMessageSpy), ]; - Promise.all(subscriptionPromises).then(async subIds => { + Promise.all(subscriptionPromises).then(async (subIds) => { try { expect(subIds.length).to.equals(2); - await pubSub.publish('Posts', 'test'); + await pubSub.publish("Posts", "test"); expect(onMessageSpy.callCount).to.equals(2); - onMessageSpy.calls.forEach(call => { - expect(call.args).to.have.members(['test']); + onMessageSpy.calls.forEach((call) => { + expect(call.args).to.have.members(["test"]); }); pubSub.unsubscribe(subIds[0]); @@ -276,29 +306,32 @@ describe('RedisPubSub', () => { }); }); - it('can publish objects as well', done => { + it("can publish objects as well", (done) => { const pubSub = new RedisPubSub(mockOptions); - pubSub.subscribe('Posts', message => { - try { - expect(message).to.have.property('comment', 'This is amazing'); - done(); - } catch (e) { - done(e); - } - }).then(async subId => { - try { - await pubSub.publish('Posts', { comment: 'This is amazing' }); - pubSub.unsubscribe(subId); - } catch (e) { - done(e); - } - }); + pubSub + .subscribe("Posts", (message) => { + try { + expect(message).to.have.property("comment", "This is amazing"); + done(); + } catch (e) { + done(e); + } + }) + .then(async (subId) => { + try { + await pubSub.publish("Posts", { comment: "This is amazing" }); + pubSub.unsubscribe(subId); + } catch (e) { + done(e); + } + }); }); - it('can accept custom reviver option (eg. for Javascript Dates)', done => { + it("can accept custom reviver option (eg. for Javascript Dates)", (done) => { const dateReviver = (key, value) => { - const isISO8601Z = /^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)Z$/; - if (typeof value === 'string' && isISO8601Z.test(value)) { + const isISO8601Z = + /^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)Z$/; + if (typeof value === "string" && isISO8601Z.test(value)) { const tempDateNumber = Date.parse(value); if (!isNaN(tempDateNumber)) { return new Date(tempDateNumber); @@ -307,161 +340,185 @@ describe('RedisPubSub', () => { return value; }; - const pubSub = new RedisPubSub({...mockOptions, reviver: dateReviver}); + const pubSub = new RedisPubSub({ ...mockOptions, reviver: dateReviver }); const validTime = new Date(); - const invalidTime = '2018-13-01T12:00:00Z'; - pubSub.subscribe('Times', message => { - try { - expect(message).to.have.property('invalidTime', invalidTime); - expect(message).to.have.property('validTime'); - expect(message.validTime.getTime()).to.equals(validTime.getTime()); - done(); - } catch (e) { - done(e); - } - }).then(async subId => { - try { - await pubSub.publish('Times', { validTime, invalidTime }); - pubSub.unsubscribe(subId); - } catch (e) { - done(e); - } - }); + const invalidTime = "2018-13-01T12:00:00Z"; + pubSub + .subscribe("Times", (message) => { + try { + expect(message).to.have.property("invalidTime", invalidTime); + expect(message).to.have.property("validTime"); + expect(message.validTime.getTime()).to.equals(validTime.getTime()); + done(); + } catch (e) { + done(e); + } + }) + .then(async (subId) => { + try { + await pubSub.publish("Times", { validTime, invalidTime }); + pubSub.unsubscribe(subId); + } catch (e) { + done(e); + } + }); }); - it('refuses custom reviver with a deserializer', done => { + it("refuses custom reviver with a deserializer", (done) => { const reviver = stub(); const deserializer = stub(); try { - expect(() => new RedisPubSub({...mockOptions, reviver, deserializer})) - .to.throw("Reviver and deserializer can't be used together"); + expect( + () => new RedisPubSub({ ...mockOptions, reviver, deserializer }) + ).to.throw("Reviver and deserializer can't be used together"); done(); } catch (e) { done(e); } }); - it('allows to use a custom serializer', done => { + it("allows to use a custom serializer", (done) => { const serializer = stub(); const serializedPayload = `{ "hello": "custom" }`; serializer.returnWith(serializedPayload); - const pubSub = new RedisPubSub({...mockOptions, serializer }); + const pubSub = new RedisPubSub({ ...mockOptions, serializer }); try { - pubSub.subscribe('TOPIC', message => { - try { - expect(message).to.eql({hello: 'custom'}); - done(); - } catch (e) { - done(e); - } - }).then(() => { - pubSub.publish('TOPIC', {hello: 'world'}); - }); + pubSub + .subscribe("TOPIC", (message) => { + try { + expect(message).to.eql({ hello: "custom" }); + done(); + } catch (e) { + done(e); + } + }) + .then(() => { + pubSub.publish("TOPIC", { hello: "world" }); + }); } catch (e) { done(e); } }); - it('custom serializer can throw an error', done => { + it("custom serializer can throw an error", (done) => { const serializer = stub(); - serializer.throwWith(new Error('Custom serialization error')); + serializer.throwWith(new Error("Custom serialization error")); - const pubSub = new RedisPubSub({...mockOptions, serializer }); + const pubSub = new RedisPubSub({ ...mockOptions, serializer }); try { - pubSub.publish('TOPIC', {hello: 'world'}).then(() => { - done(new Error('Expected error to be thrown upon publish')); - }, err => { - expect(err.message).to.eql('Custom serialization error'); - done(); - }); + pubSub.publish("TOPIC", { hello: "world" }).then( + () => { + done(new Error("Expected error to be thrown upon publish")); + }, + (err) => { + expect(err.message).to.eql("Custom serialization error"); + done(); + } + ); } catch (e) { done(e); } }); - it('allows to use a custom deserializer', done => { + it("allows to use a custom deserializer", (done) => { const deserializer = stub(); - const deserializedPayload = { hello: 'custom' }; + const deserializedPayload = { hello: "custom" }; deserializer.returnWith(deserializedPayload); - const pubSub = new RedisPubSub({...mockOptions, deserializer }); + const pubSub = new RedisPubSub({ ...mockOptions, deserializer }); try { - pubSub.subscribe('TOPIC', message => { - try { - expect(message).to.eql({hello: 'custom'}); - done(); - } catch (e) { - done(e); - } - }).then(() => { - pubSub.publish('TOPIC', {hello: 'world'}); - }); + pubSub + .subscribe("TOPIC", (message) => { + try { + expect(message).to.eql({ hello: "custom" }); + done(); + } catch (e) { + done(e); + } + }) + .then(() => { + pubSub.publish("TOPIC", { hello: "world" }); + }); } catch (e) { done(e); } }); - it('unparsed payload is returned if custom deserializer throws an error', done => { + it("unparsed payload is returned if custom deserializer throws an error", (done) => { const deserializer = stub(); - deserializer.throwWith(new Error('Custom deserialization error')); + deserializer.throwWith(new Error("Custom deserialization error")); - const pubSub = new RedisPubSub({...mockOptions, deserializer }); + const pubSub = new RedisPubSub({ ...mockOptions, deserializer }); try { - pubSub.subscribe('TOPIC', message => { - try { - expect(message).to.be.a('string'); - expect(message).to.eql('{"hello":"world"}'); - done(); - } catch (e) { - done(e); - } - }).then(() => { - pubSub.publish('TOPIC', {hello: 'world'}); - }); + pubSub + .subscribe("TOPIC", (message) => { + try { + expect(message).to.be.a("string"); + expect(message).to.eql('{"hello":"world"}'); + done(); + } catch (e) { + done(e); + } + }) + .then(() => { + pubSub.publish("TOPIC", { hello: "world" }); + }); } catch (e) { done(e); } }); - it('throws if you try to unsubscribe with an unknown id', () => { + it("throws if you try to unsubscribe with an unknown id", () => { const pubSub = new RedisPubSub(mockOptions); - return expect(() => pubSub.unsubscribe(123)) - .to.throw('There is no subscription of id "123"'); + return expect(() => pubSub.unsubscribe(123)).to.throw( + 'There is no subscription of id "123"' + ); }); - it('can use transform function to convert the trigger name given into more explicit channel name', done => { - const triggerTransform = (trigger, { repoName }) => `${trigger}.${repoName}`; + it("can use transform function to convert the trigger name given into more explicit channel name", (done) => { + const triggerTransform = (trigger, { repoName }) => + `${trigger}.${repoName}`; const pubSub = new RedisPubSub({ triggerTransform, - publisher: (mockRedisClient as any), - subscriber: (mockRedisClient as any), + publisher: mockRedisClient as any, + subscriber: mockRedisClient as any, }); - const validateMessage = message => { + const validateMessage = (message) => { try { - expect(message).to.equals('test'); + expect(message).to.equals("test"); done(); } catch (e) { done(e); } }; - pubSub.subscribe('comments', validateMessage, { repoName: 'graphql-redis-subscriptions' }).then(async subId => { - await pubSub.publish('comments.graphql-redis-subscriptions', 'test'); - pubSub.unsubscribe(subId); - }); + pubSub + .subscribe("comments", validateMessage, { + repoName: "graphql-redis-subscriptions", + }) + .then(async (subId) => { + await pubSub.publish("comments.graphql-redis-subscriptions", "test"); + pubSub.unsubscribe(subId); + }); + }); + it("creates a pipline", (done) => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + expect(pipeline).to.be.an.instanceOf(RedisPubSubPipeline); + done(); }); // TODO pattern subs - afterEach('Reset spy count', () => { + afterEach("Reset spy count", () => { publishSpy.reset(); subscribeSpy.reset(); unsubscribeSpy.reset(); @@ -469,30 +526,28 @@ describe('RedisPubSub', () => { punsubscribeSpy.reset(); }); - after('Restore redis client', () => { + after("Restore redis client", () => { restore(); }); - }); -describe('PubSubAsyncIterator', () => { - - it('should expose valid asyncItrator for a specific event', () => { +describe("PubSubAsyncIterator", () => { + it("should expose valid asyncItrator for a specific event", () => { const pubSub = new RedisPubSub(mockOptions); - const eventName = 'test'; + const eventName = "test"; const iterator = pubSub.asyncIterator(eventName); // tslint:disable-next-line:no-unused-expression - expect(iterator).to.exist; - // tslint:disable-next-line:no-unused-expression - expect(iterator[Symbol.asyncIterator]).not.to.be.undefined; + expect(iterator).to.exist; + // tslint:disable-next-line:no-unused-expression + expect(iterator[Symbol.asyncIterator]).not.to.be.undefined; }); - it('should trigger event on asyncIterator when published', done => { + it("should trigger event on asyncIterator when published", (done) => { const pubSub = new RedisPubSub(mockOptions); - const eventName = 'test'; + const eventName = "test"; const iterator = pubSub.asyncIterator(eventName); - iterator.next().then(result => { + iterator.next().then((result) => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression @@ -505,10 +560,10 @@ describe('PubSubAsyncIterator', () => { pubSub.publish(eventName, { test: true }); }); - it('should not trigger event on asyncIterator when publishing other event', async () => { + it("should not trigger event on asyncIterator when publishing other event", async () => { const pubSub = new RedisPubSub(mockOptions); - const eventName = 'test2'; - const iterator = pubSub.asyncIterator('test'); + const eventName = "test2"; + const iterator = pubSub.asyncIterator("test"); const triggerSpy = spy(() => undefined); iterator.next().then(triggerSpy); @@ -516,10 +571,10 @@ describe('PubSubAsyncIterator', () => { expect(triggerSpy.callCount).to.equal(0); }); - it('register to multiple events', done => { + it("register to multiple events", (done) => { const pubSub = new RedisPubSub(mockOptions); - const eventName = 'test2'; - const iterator = pubSub.asyncIterator(['test', 'test2']); + const eventName = "test2"; + const iterator = pubSub.asyncIterator(["test", "test2"]); const triggerSpy = spy(() => undefined); iterator.next().then(() => { @@ -530,23 +585,23 @@ describe('PubSubAsyncIterator', () => { pubSub.publish(eventName, { test: true }); }); - it('should not trigger event on asyncIterator already returned', done => { + it("should not trigger event on asyncIterator already returned", (done) => { const pubSub = new RedisPubSub(mockOptions); - const eventName = 'test'; + const eventName = "test"; const iterator = pubSub.asyncIterator(eventName); - iterator.next().then(result => { + iterator.next().then((result) => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression expect(result.value).to.exist; - expect(result.value.test).to.equal('word'); + expect(result.value.test).to.equal("word"); // tslint:disable-next-line:no-unused-expression expect(result.done).to.be.false; }); - pubSub.publish(eventName, { test: 'word' }).then(() => { - iterator.next().then(result => { + pubSub.publish(eventName, { test: "word" }).then(() => { + iterator.next().then((result) => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression @@ -560,5 +615,50 @@ describe('PubSubAsyncIterator', () => { pubSub.publish(eventName, { test: true }); }); }); +}); + +describe("RedisPubSubPipeline", () => { + it("should publish to pipeline", () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + + pipeline.publish("TOPIC", "test"); + expect(pipelinePublishSpy.lastCall.args).to.have.members([ + "TOPIC", + '"test"', + ]); + }); + + it("should be chainable", () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + + const result = pipeline.publish("TOPIC", { hello: "world" }); + expect(result).to.be.an.instanceOf(RedisPubSubPipeline); + }); + it("should use the serializer to transform the payload before publishing", () => { + const serializer = stub(); + const serializedPayload = `{ "hello": "custom" }`; + serializer.returnWith(serializedPayload); + + const pipeline = new RedisPubSubPipeline({ + publisher: mockRedisClient as any, + serializer, + }); + pipeline.publish("TOPIC", { hello: "world" }); + + expect(serializer.callCount).to.equal(1); + expect(pipelinePublishSpy.lastCall.args).to.have.members([ + "TOPIC", + serializedPayload, + ]); + }); + + it("should execute the pipeline", async () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + await pipeline.exec(); + expect(pipelineExecSpy.callCount).to.equal(1); + }); }); From 55b204ad7ce674bd815d34585811b5cec4c61a88 Mon Sep 17 00:00:00 2001 From: Jeremy Gillick Date: Fri, 27 Dec 2024 10:57:53 -0800 Subject: [PATCH 2/2] Revert test file formatting --- src/test/tests.ts | 564 +++++++++++++++++++++------------------------- 1 file changed, 263 insertions(+), 301 deletions(-) diff --git a/src/test/tests.ts b/src/test/tests.ts index 666b8d92..b357ccf8 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -1,9 +1,9 @@ -import * as chai from "chai"; -import * as chaiAsPromised from "chai-as-promised"; -import { spy, restore, stub } from "simple-mock"; -import { RedisPubSub } from "../redis-pubsub"; +import * as chai from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; +import { spy, restore, stub } from 'simple-mock'; +import { RedisPubSub } from '../redis-pubsub'; import { RedisPubSubPipeline } from "../redis-pipeline"; -import * as IORedis from "ioredis"; +import * as IORedis from 'ioredis'; chai.use(chaiAsPromised); const expect = chai.expect; @@ -12,9 +12,7 @@ const expect = chai.expect; let listener; -const publishSpy = spy( - (channel, message) => listener && listener(channel, message) -); +const publishSpy = spy((channel, message) => listener && listener(channel, message)); const subscribeSpy = spy((channel, cb) => cb && cb(null, channel)); const unsubscribeSpy = spy((channel, cb) => cb && cb(channel)); const psubscribeSpy = spy((channel, cb) => cb && cb(null, channel)); @@ -23,8 +21,7 @@ const punsubscribeSpy = spy((channel, cb) => cb && cb(channel)); const pipelinePublishSpy = spy(() => {}); const pipelineExecSpy = spy(() => Promise.resolve()); const pipelineSpy = spy(() => mockPipelineClient); - -const quitSpy = spy((cb) => cb); +const quitSpy = spy(cb => cb); const mockRedisClient = { publish: publishSpy, subscribe: subscribeSpy, @@ -33,7 +30,7 @@ const mockRedisClient = { punsubscribe: punsubscribeSpy, pipeline: pipelineSpy, on: (event, cb) => { - if (event === "message") { + if (event === 'message') { listener = cb; } }, @@ -44,14 +41,17 @@ const mockPipelineClient = { exec: pipelineExecSpy, }; const mockOptions = { - publisher: mockRedisClient as any, - subscriber: mockRedisClient as any, + publisher: (mockRedisClient as any), + subscriber: (mockRedisClient as any), }; + + // -------------- Mocking Redis Client ------------------ -describe("RedisPubSub", () => { - it("should create default ioredis clients if none were provided", (done) => { +describe('RedisPubSub', () => { + + it('should create default ioredis clients if none were provided', done => { const pubSub = new RedisPubSub(); expect(pubSub.getSubscriber()).to.be.an.instanceOf(IORedis); expect(pubSub.getPublisher()).to.be.an.instanceOf(IORedis); @@ -59,11 +59,10 @@ describe("RedisPubSub", () => { done(); }); - it("should verify close calls pub and sub quit methods", (done) => { + it('should verify close calls pub and sub quit methods', done => { const pubSub = new RedisPubSub(mockOptions); - pubSub - .close() + pubSub.close() .then(() => { expect(quitSpy.callCount).to.equal(2); done(); @@ -71,168 +70,148 @@ describe("RedisPubSub", () => { .catch(done); }); - it("can subscribe to specific redis channel and called when a message is published on it", (done) => { + it('can subscribe to specific redis channel and called when a message is published on it', done => { const pubSub = new RedisPubSub(mockOptions); - pubSub - .subscribe("Posts", (message) => { - try { - expect(message).to.equals("test"); - done(); - } catch (e) { - done(e); - } - }) - .then(async (subId) => { - expect(subId).to.be.a("number"); - await pubSub.publish("Posts", "test"); - pubSub.unsubscribe(subId); - }); + pubSub.subscribe('Posts', message => { + try { + expect(message).to.equals('test'); + done(); + } catch (e) { + done(e); + } + + }).then(async subId => { + expect(subId).to.be.a('number'); + await pubSub.publish('Posts', 'test'); + pubSub.unsubscribe(subId); + }); }); - it("can subscribe to a redis channel pattern and called when a message is published on it", (done) => { + it('can subscribe to a redis channel pattern and called when a message is published on it', done => { const pubSub = new RedisPubSub(mockOptions); - pubSub - .subscribe( - "Posts*", - (message) => { - try { - expect(psubscribeSpy.callCount).to.equal(1); - expect(message).to.equals("test"); - done(); - } catch (e) { - done(e); - } - }, - { pattern: true } - ) - .then(async (subId) => { - expect(subId).to.be.a("number"); - await pubSub.publish("Posts*", "test"); - pubSub.unsubscribe(subId); - }); + pubSub.subscribe('Posts*', message => { + try { + expect(psubscribeSpy.callCount).to.equal(1); + expect(message).to.equals('test'); + done(); + } catch (e) { + done(e); + } + }, { pattern: true }).then(async subId => { + expect(subId).to.be.a('number'); + await pubSub.publish('Posts*', 'test'); + pubSub.unsubscribe(subId); + }); }); - it("can unsubscribe from specific redis channel", (done) => { + it('can unsubscribe from specific redis channel', done => { const pubSub = new RedisPubSub(mockOptions); - pubSub - .subscribe("Posts", () => null) - .then((subId) => { - pubSub.unsubscribe(subId); + pubSub.subscribe('Posts', () => null).then(subId => { + pubSub.unsubscribe(subId); - try { - expect(unsubscribeSpy.callCount).to.equals(1); - expect(unsubscribeSpy.lastCall.args).to.have.members(["Posts"]); + try { - expect(punsubscribeSpy.callCount).to.equals(1); - expect(punsubscribeSpy.lastCall.args).to.have.members(["Posts"]); + expect(unsubscribeSpy.callCount).to.equals(1); + expect(unsubscribeSpy.lastCall.args).to.have.members(['Posts']); - done(); - } catch (e) { - done(e); - } - }); - }); + expect(punsubscribeSpy.callCount).to.equals(1); + expect(punsubscribeSpy.lastCall.args).to.have.members(['Posts']); - it("cleans up correctly the memory when unsubscribing", (done) => { - const pubSub = new RedisPubSub(mockOptions); - Promise.all([ - pubSub.subscribe("Posts", () => null), - pubSub.subscribe("Posts", () => null), - ]).then(([subId, secondSubId]) => { - try { - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).not.to.be.an( - "undefined" - ); - pubSub.unsubscribe(subId); - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).to.be.an("undefined"); - expect(() => pubSub.unsubscribe(subId)).to.throw( - `There is no subscription of id "${subId}"` - ); - pubSub.unsubscribe(secondSubId); done(); + } catch (e) { done(e); } }); }); - it("concurrent subscribe, unsubscribe first sub before second sub complete", (done) => { + it('cleans up correctly the memory when unsubscribing', done => { + const pubSub = new RedisPubSub(mockOptions); + Promise.all([ + pubSub.subscribe('Posts', () => null), + pubSub.subscribe('Posts', () => null), + ]) + .then(([subId, secondSubId]) => { + try { + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined'); + pubSub.unsubscribe(subId); + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined'); + expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`); + pubSub.unsubscribe(secondSubId); + done(); + + } catch (e) { + done(e); + } + }); + }); + + it('concurrent subscribe, unsubscribe first sub before second sub complete', done => { const promises = { firstSub: null as Promise, secondSub: null as Promise, - }; + } - let firstCb, secondCb; + let firstCb, secondCb const redisSubCallback = (channel, cb) => { process.nextTick(() => { if (!firstCb) { - firstCb = () => cb(null, channel); + firstCb = () => cb(null, channel) // Handling first call, init second sub - promises.secondSub = pubSub.subscribe("Posts", () => null); + promises.secondSub = pubSub.subscribe('Posts', () => null) // Continue first sub callback - firstCb(); + firstCb() } else { - secondCb = () => cb(null, channel); + secondCb = () => cb(null, channel) } - }); - }; + }) + } const subscribeStub = stub().callFn(redisSubCallback); - const mockRedisClientWithSubStub = { - ...mockRedisClient, - ...{ subscribe: subscribeStub }, - }; - const mockOptionsWithSubStub = { - ...mockOptions, - ...{ subscriber: mockRedisClientWithSubStub as any }, - }; + const mockRedisClientWithSubStub = {...mockRedisClient, ...{subscribe: subscribeStub}}; + const mockOptionsWithSubStub = {...mockOptions, ...{subscriber: (mockRedisClientWithSubStub as any)}} const pubSub = new RedisPubSub(mockOptionsWithSubStub); // First leg of the test, init first sub and immediately unsubscribe. The second sub is triggered in the redis cb // before the first promise sub complete - promises.firstSub = pubSub - .subscribe("Posts", () => null) - .then((subId) => { + promises.firstSub = pubSub.subscribe('Posts', () => null) + .then(subId => { // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).not.to.be.an( - "undefined" - ); + expect((pubSub as any).subscriptionMap[subId]).not.to.be.an('undefined'); pubSub.unsubscribe(subId); // Continue second sub callback - promises.firstSub.then(() => secondCb()); + promises.firstSub.then(() => secondCb()) return subId; }); // Second leg of the test, here we have unsubscribed from the first sub. We try unsubbing from the second sub // as soon it is ready - promises.firstSub.then((subId) => { - // This assertion is done against a private member, if you change the internals, you may want to change that - expect((pubSub as any).subscriptionMap[subId]).to.be.an("undefined"); - expect(() => pubSub.unsubscribe(subId)).to.throw( - `There is no subscription of id "${subId}"` - ); - - return promises.secondSub - .then((secondSubId) => { + promises.firstSub + .then((subId) => { + // This assertion is done against a private member, if you change the internals, you may want to change that + expect((pubSub as any).subscriptionMap[subId]).to.be.an('undefined'); + expect(() => pubSub.unsubscribe(subId)).to.throw(`There is no subscription of id "${subId}"`); + + return promises.secondSub.then(secondSubId => { pubSub.unsubscribe(secondSubId); }) - .then(done) - .catch(done); + .then(done) + .catch(done) }); }); - it("will not unsubscribe from the redis channel if there is another subscriber on it's subscriber list", (done) => { + it('will not unsubscribe from the redis channel if there is another subscriber on it\'s subscriber list', done => { const pubSub = new RedisPubSub(mockOptions); const subscriptionPromises = [ - pubSub.subscribe("Posts", () => { - done("Not supposed to be triggered"); + pubSub.subscribe('Posts', () => { + done('Not supposed to be triggered'); }), - pubSub.subscribe("Posts", (msg) => { + pubSub.subscribe('Posts', (msg) => { try { - expect(msg).to.equals("test"); + expect(msg).to.equals('test'); done(); } catch (e) { done(e); @@ -240,14 +219,14 @@ describe("RedisPubSub", () => { }), ]; - Promise.all(subscriptionPromises).then(async (subIds) => { + Promise.all(subscriptionPromises).then(async subIds => { try { expect(subIds.length).to.equals(2); pubSub.unsubscribe(subIds[0]); expect(unsubscribeSpy.callCount).to.equals(0); - await pubSub.publish("Posts", "test"); + await pubSub.publish('Posts', 'test'); pubSub.unsubscribe(subIds[1]); expect(unsubscribeSpy.callCount).to.equals(1); } catch (e) { @@ -256,15 +235,15 @@ describe("RedisPubSub", () => { }); }); - it("will subscribe to redis channel only once", (done) => { + it('will subscribe to redis channel only once', done => { const pubSub = new RedisPubSub(mockOptions); const onMessage = () => null; const subscriptionPromises = [ - pubSub.subscribe("Posts", onMessage), - pubSub.subscribe("Posts", onMessage), + pubSub.subscribe('Posts', onMessage), + pubSub.subscribe('Posts', onMessage), ]; - Promise.all(subscriptionPromises).then((subIds) => { + Promise.all(subscriptionPromises).then(subIds => { try { expect(subIds.length).to.equals(2); expect(subscribeSpy.callCount).to.equals(1); @@ -278,23 +257,23 @@ describe("RedisPubSub", () => { }); }); - it("can have multiple subscribers and all will be called when a message is published to this channel", (done) => { + it('can have multiple subscribers and all will be called when a message is published to this channel', done => { const pubSub = new RedisPubSub(mockOptions); const onMessageSpy = spy(() => null); const subscriptionPromises = [ - pubSub.subscribe("Posts", onMessageSpy), - pubSub.subscribe("Posts", onMessageSpy), + pubSub.subscribe('Posts', onMessageSpy), + pubSub.subscribe('Posts', onMessageSpy), ]; - Promise.all(subscriptionPromises).then(async (subIds) => { + Promise.all(subscriptionPromises).then(async subIds => { try { expect(subIds.length).to.equals(2); - await pubSub.publish("Posts", "test"); + await pubSub.publish('Posts', 'test'); expect(onMessageSpy.callCount).to.equals(2); - onMessageSpy.calls.forEach((call) => { - expect(call.args).to.have.members(["test"]); + onMessageSpy.calls.forEach(call => { + expect(call.args).to.have.members(['test']); }); pubSub.unsubscribe(subIds[0]); @@ -306,32 +285,29 @@ describe("RedisPubSub", () => { }); }); - it("can publish objects as well", (done) => { + it('can publish objects as well', done => { const pubSub = new RedisPubSub(mockOptions); - pubSub - .subscribe("Posts", (message) => { - try { - expect(message).to.have.property("comment", "This is amazing"); - done(); - } catch (e) { - done(e); - } - }) - .then(async (subId) => { - try { - await pubSub.publish("Posts", { comment: "This is amazing" }); - pubSub.unsubscribe(subId); - } catch (e) { - done(e); - } - }); + pubSub.subscribe('Posts', message => { + try { + expect(message).to.have.property('comment', 'This is amazing'); + done(); + } catch (e) { + done(e); + } + }).then(async subId => { + try { + await pubSub.publish('Posts', { comment: 'This is amazing' }); + pubSub.unsubscribe(subId); + } catch (e) { + done(e); + } + }); }); - it("can accept custom reviver option (eg. for Javascript Dates)", (done) => { + it('can accept custom reviver option (eg. for Javascript Dates)', done => { const dateReviver = (key, value) => { - const isISO8601Z = - /^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)Z$/; - if (typeof value === "string" && isISO8601Z.test(value)) { + const isISO8601Z = /^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)Z$/; + if (typeof value === 'string' && isISO8601Z.test(value)) { const tempDateNumber = Date.parse(value); if (!isNaN(tempDateNumber)) { return new Date(tempDateNumber); @@ -340,176 +316,159 @@ describe("RedisPubSub", () => { return value; }; - const pubSub = new RedisPubSub({ ...mockOptions, reviver: dateReviver }); + const pubSub = new RedisPubSub({...mockOptions, reviver: dateReviver}); const validTime = new Date(); - const invalidTime = "2018-13-01T12:00:00Z"; - pubSub - .subscribe("Times", (message) => { - try { - expect(message).to.have.property("invalidTime", invalidTime); - expect(message).to.have.property("validTime"); - expect(message.validTime.getTime()).to.equals(validTime.getTime()); - done(); - } catch (e) { - done(e); - } - }) - .then(async (subId) => { - try { - await pubSub.publish("Times", { validTime, invalidTime }); - pubSub.unsubscribe(subId); - } catch (e) { - done(e); - } - }); + const invalidTime = '2018-13-01T12:00:00Z'; + pubSub.subscribe('Times', message => { + try { + expect(message).to.have.property('invalidTime', invalidTime); + expect(message).to.have.property('validTime'); + expect(message.validTime.getTime()).to.equals(validTime.getTime()); + done(); + } catch (e) { + done(e); + } + }).then(async subId => { + try { + await pubSub.publish('Times', { validTime, invalidTime }); + pubSub.unsubscribe(subId); + } catch (e) { + done(e); + } + }); }); - it("refuses custom reviver with a deserializer", (done) => { + it('refuses custom reviver with a deserializer', done => { const reviver = stub(); const deserializer = stub(); try { - expect( - () => new RedisPubSub({ ...mockOptions, reviver, deserializer }) - ).to.throw("Reviver and deserializer can't be used together"); + expect(() => new RedisPubSub({...mockOptions, reviver, deserializer})) + .to.throw("Reviver and deserializer can't be used together"); done(); } catch (e) { done(e); } }); - it("allows to use a custom serializer", (done) => { + it('allows to use a custom serializer', done => { const serializer = stub(); const serializedPayload = `{ "hello": "custom" }`; serializer.returnWith(serializedPayload); - const pubSub = new RedisPubSub({ ...mockOptions, serializer }); + const pubSub = new RedisPubSub({...mockOptions, serializer }); try { - pubSub - .subscribe("TOPIC", (message) => { - try { - expect(message).to.eql({ hello: "custom" }); - done(); - } catch (e) { - done(e); - } - }) - .then(() => { - pubSub.publish("TOPIC", { hello: "world" }); - }); + pubSub.subscribe('TOPIC', message => { + try { + expect(message).to.eql({hello: 'custom'}); + done(); + } catch (e) { + done(e); + } + }).then(() => { + pubSub.publish('TOPIC', {hello: 'world'}); + }); } catch (e) { done(e); } }); - it("custom serializer can throw an error", (done) => { + it('custom serializer can throw an error', done => { const serializer = stub(); - serializer.throwWith(new Error("Custom serialization error")); + serializer.throwWith(new Error('Custom serialization error')); - const pubSub = new RedisPubSub({ ...mockOptions, serializer }); + const pubSub = new RedisPubSub({...mockOptions, serializer }); try { - pubSub.publish("TOPIC", { hello: "world" }).then( - () => { - done(new Error("Expected error to be thrown upon publish")); - }, - (err) => { - expect(err.message).to.eql("Custom serialization error"); - done(); - } - ); + pubSub.publish('TOPIC', {hello: 'world'}).then(() => { + done(new Error('Expected error to be thrown upon publish')); + }, err => { + expect(err.message).to.eql('Custom serialization error'); + done(); + }); } catch (e) { done(e); } }); - it("allows to use a custom deserializer", (done) => { + it('allows to use a custom deserializer', done => { const deserializer = stub(); - const deserializedPayload = { hello: "custom" }; + const deserializedPayload = { hello: 'custom' }; deserializer.returnWith(deserializedPayload); - const pubSub = new RedisPubSub({ ...mockOptions, deserializer }); + const pubSub = new RedisPubSub({...mockOptions, deserializer }); try { - pubSub - .subscribe("TOPIC", (message) => { - try { - expect(message).to.eql({ hello: "custom" }); - done(); - } catch (e) { - done(e); - } - }) - .then(() => { - pubSub.publish("TOPIC", { hello: "world" }); - }); + pubSub.subscribe('TOPIC', message => { + try { + expect(message).to.eql({hello: 'custom'}); + done(); + } catch (e) { + done(e); + } + }).then(() => { + pubSub.publish('TOPIC', {hello: 'world'}); + }); } catch (e) { done(e); } }); - it("unparsed payload is returned if custom deserializer throws an error", (done) => { + it('unparsed payload is returned if custom deserializer throws an error', done => { const deserializer = stub(); - deserializer.throwWith(new Error("Custom deserialization error")); + deserializer.throwWith(new Error('Custom deserialization error')); - const pubSub = new RedisPubSub({ ...mockOptions, deserializer }); + const pubSub = new RedisPubSub({...mockOptions, deserializer }); try { - pubSub - .subscribe("TOPIC", (message) => { - try { - expect(message).to.be.a("string"); - expect(message).to.eql('{"hello":"world"}'); - done(); - } catch (e) { - done(e); - } - }) - .then(() => { - pubSub.publish("TOPIC", { hello: "world" }); - }); + pubSub.subscribe('TOPIC', message => { + try { + expect(message).to.be.a('string'); + expect(message).to.eql('{"hello":"world"}'); + done(); + } catch (e) { + done(e); + } + }).then(() => { + pubSub.publish('TOPIC', {hello: 'world'}); + }); } catch (e) { done(e); } }); - it("throws if you try to unsubscribe with an unknown id", () => { + it('throws if you try to unsubscribe with an unknown id', () => { const pubSub = new RedisPubSub(mockOptions); - return expect(() => pubSub.unsubscribe(123)).to.throw( - 'There is no subscription of id "123"' - ); + return expect(() => pubSub.unsubscribe(123)) + .to.throw('There is no subscription of id "123"'); }); - it("can use transform function to convert the trigger name given into more explicit channel name", (done) => { - const triggerTransform = (trigger, { repoName }) => - `${trigger}.${repoName}`; + it('can use transform function to convert the trigger name given into more explicit channel name', done => { + const triggerTransform = (trigger, { repoName }) => `${trigger}.${repoName}`; const pubSub = new RedisPubSub({ triggerTransform, - publisher: mockRedisClient as any, - subscriber: mockRedisClient as any, + publisher: (mockRedisClient as any), + subscriber: (mockRedisClient as any), }); - const validateMessage = (message) => { + const validateMessage = message => { try { - expect(message).to.equals("test"); + expect(message).to.equals('test'); done(); } catch (e) { done(e); } }; - pubSub - .subscribe("comments", validateMessage, { - repoName: "graphql-redis-subscriptions", - }) - .then(async (subId) => { - await pubSub.publish("comments.graphql-redis-subscriptions", "test"); - pubSub.unsubscribe(subId); - }); + pubSub.subscribe('comments', validateMessage, { repoName: 'graphql-redis-subscriptions' }).then(async subId => { + await pubSub.publish('comments.graphql-redis-subscriptions', 'test'); + pubSub.unsubscribe(subId); + }); + }); - it("creates a pipline", (done) => { + it('creates a pipline', (done) => { const pubSub = new RedisPubSub(mockOptions); const pipeline = pubSub.pipeline(); expect(pipeline).to.be.an.instanceOf(RedisPubSubPipeline); @@ -518,7 +477,7 @@ describe("RedisPubSub", () => { // TODO pattern subs - afterEach("Reset spy count", () => { + afterEach('Reset spy count', () => { publishSpy.reset(); subscribeSpy.reset(); unsubscribeSpy.reset(); @@ -526,28 +485,30 @@ describe("RedisPubSub", () => { punsubscribeSpy.reset(); }); - after("Restore redis client", () => { + after('Restore redis client', () => { restore(); }); + }); -describe("PubSubAsyncIterator", () => { - it("should expose valid asyncItrator for a specific event", () => { +describe('PubSubAsyncIterator', () => { + + it('should expose valid asyncItrator for a specific event', () => { const pubSub = new RedisPubSub(mockOptions); - const eventName = "test"; - const iterator = pubSub.asyncIterator(eventName); - // tslint:disable-next-line:no-unused-expression - expect(iterator).to.exist; + const eventName = 'test'; + const iterator = pubSub.asyncIterableIterator(eventName); // tslint:disable-next-line:no-unused-expression - expect(iterator[Symbol.asyncIterator]).not.to.be.undefined; + expect(iterator).to.exist; + // tslint:disable-next-line:no-unused-expression + expect(iterator[Symbol.asyncIterator]).not.to.be.undefined; }); - it("should trigger event on asyncIterator when published", (done) => { + it('should trigger event on asyncIterableIterator when published', done => { const pubSub = new RedisPubSub(mockOptions); - const eventName = "test"; - const iterator = pubSub.asyncIterator(eventName); + const eventName = 'test'; + const iterator = pubSub.asyncIterableIterator(eventName); - iterator.next().then((result) => { + iterator.next().then(result => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression @@ -560,10 +521,10 @@ describe("PubSubAsyncIterator", () => { pubSub.publish(eventName, { test: true }); }); - it("should not trigger event on asyncIterator when publishing other event", async () => { + it('should not trigger event on asyncIterableIterator when publishing other event', async () => { const pubSub = new RedisPubSub(mockOptions); - const eventName = "test2"; - const iterator = pubSub.asyncIterator("test"); + const eventName = 'test2'; + const iterator = pubSub.asyncIterableIterator('test'); const triggerSpy = spy(() => undefined); iterator.next().then(triggerSpy); @@ -571,10 +532,10 @@ describe("PubSubAsyncIterator", () => { expect(triggerSpy.callCount).to.equal(0); }); - it("register to multiple events", (done) => { + it('register to multiple events', done => { const pubSub = new RedisPubSub(mockOptions); - const eventName = "test2"; - const iterator = pubSub.asyncIterator(["test", "test2"]); + const eventName = 'test2'; + const iterator = pubSub.asyncIterableIterator(['test', 'test2']); const triggerSpy = spy(() => undefined); iterator.next().then(() => { @@ -585,23 +546,23 @@ describe("PubSubAsyncIterator", () => { pubSub.publish(eventName, { test: true }); }); - it("should not trigger event on asyncIterator already returned", (done) => { + it('should not trigger event on asyncIterableIterator already returned', done => { const pubSub = new RedisPubSub(mockOptions); - const eventName = "test"; - const iterator = pubSub.asyncIterator(eventName); + const eventName = 'test'; + const iterator = pubSub.asyncIterableIterator(eventName); - iterator.next().then((result) => { + iterator.next().then(result => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression expect(result.value).to.exist; - expect(result.value.test).to.equal("word"); + expect(result.value.test).to.equal('word'); // tslint:disable-next-line:no-unused-expression expect(result.done).to.be.false; }); - pubSub.publish(eventName, { test: "word" }).then(() => { - iterator.next().then((result) => { + pubSub.publish(eventName, { test: 'word' }).then(() => { + iterator.next().then(result => { // tslint:disable-next-line:no-unused-expression expect(result).to.exist; // tslint:disable-next-line:no-unused-expression @@ -615,47 +576,48 @@ describe("PubSubAsyncIterator", () => { pubSub.publish(eventName, { test: true }); }); }); + }); -describe("RedisPubSubPipeline", () => { - it("should publish to pipeline", () => { +describe('RedisPubSubPipeline', () => { + it('should publish to pipeline', () => { const pubSub = new RedisPubSub(mockOptions); const pipeline = pubSub.pipeline(); - pipeline.publish("TOPIC", "test"); + pipeline.publish('TOPIC', 'test'); expect(pipelinePublishSpy.lastCall.args).to.have.members([ - "TOPIC", + 'TOPIC', '"test"', ]); }); - it("should be chainable", () => { + it('should be chainable', () => { const pubSub = new RedisPubSub(mockOptions); const pipeline = pubSub.pipeline(); - const result = pipeline.publish("TOPIC", { hello: "world" }); + const result = pipeline.publish('TOPIC', { hello: 'world' }); expect(result).to.be.an.instanceOf(RedisPubSubPipeline); }); - it("should use the serializer to transform the payload before publishing", () => { + it('should use the serializer to transform the payload before publishing', () => { const serializer = stub(); - const serializedPayload = `{ "hello": "custom" }`; + const serializedPayload = `{ 'hello': 'custom' }`; serializer.returnWith(serializedPayload); const pipeline = new RedisPubSubPipeline({ publisher: mockRedisClient as any, serializer, }); - pipeline.publish("TOPIC", { hello: "world" }); + pipeline.publish('TOPIC', { hello: 'world' }); expect(serializer.callCount).to.equal(1); expect(pipelinePublishSpy.lastCall.args).to.have.members([ - "TOPIC", + 'TOPIC', serializedPayload, ]); }); - it("should execute the pipeline", async () => { + it('should execute the pipeline', async () => { const pubSub = new RedisPubSub(mockOptions); const pipeline = pubSub.pipeline(); await pipeline.exec();