diff --git a/README.md b/README.md index f639b94..238c271 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,35 @@ pubSub.subscribe('Test', message => { }); ``` +## Use a pipeline to batch publish calls + +If you need to publish a number of triggers at the same time, you can batch them in a single HTTP request to redis using the pub/sub pipeline: + +```javascript +const pubSub = new RedisPubSub({ ... }); + +// Create pipeline +const pipeline = pubSub.pipeline(); + +[1,2,3,4,5,6].forEach((id) => pipeline.publish('Test', {id})) + +// Execute the pipeline to redis +await pipeline.exec(); +``` + +The publish method is also chainable. + +```javascript +const pubSub = new RedisPubSub({ ... }); +const pipeline = pubSub.pipeline(); + +await pipeline + .publish('Test1', {}) + .publish('Test2', {}) + .publish('Test3', {}) + .exec(); +``` + ## Old Usage (Deprecated) ```javascript diff --git a/src/redis-pipeline.ts b/src/redis-pipeline.ts new file mode 100644 index 0000000..fd25661 --- /dev/null +++ b/src/redis-pipeline.ts @@ -0,0 +1,39 @@ +import { ChainableCommander } from "ioredis"; +import type { RedisClient, Serializer } from "./redis-pubsub"; + +export interface RedisPubSubPipelineOptions { + publisher: RedisClient; + serializer: Serializer; +} + +/** + * Create a pipelined publisher that will send all the pub/sub messages in a single batch + */ +export class RedisPubSubPipeline { + private pipeline: ChainableCommander; + private publisher: RedisClient; + private serializer: Serializer; + + constructor(options: RedisPubSubPipelineOptions) { + this.publisher = options.publisher; + this.serializer = options.serializer; + + // Start pipeline + this.pipeline = this.publisher.pipeline(); + } + + /** + * Publish to the redis pipeline + */ + public publish(trigger: string, payload: T): this { + this.pipeline.publish(trigger, this.serializer(payload)); + return this; + } + + /** + * Execute the entire pipeline + */ + public async exec() { + return this.pipeline.exec(); + } +} diff --git a/src/redis-pubsub.ts b/src/redis-pubsub.ts index 6e25e4f..49c0f02 100644 --- a/src/redis-pubsub.ts +++ b/src/redis-pubsub.ts @@ -1,11 +1,19 @@ import {Cluster, Redis, RedisOptions} from 'ioredis'; import {PubSubEngine} from 'graphql-subscriptions'; import {PubSubAsyncIterator} from './pubsub-async-iterator'; +import { RedisPubSubPipeline } from "./redis-pipeline"; -type RedisClient = Redis | Cluster; +export type RedisClient = Redis | Cluster; type OnMessage = (message: T) => void; type DeserializerContext = { channel: string, pattern?: string }; +function defaultSerializer(payload: unknown): string | Buffer { + if (payload instanceof Buffer) { + return payload; + } + return JSON.stringify(payload); +} + export interface PubSubRedisOptions { connection?: RedisOptions | string; triggerTransform?: TriggerTransform; @@ -29,8 +37,8 @@ export class RedisPubSub implements PubSubEngine { subscriber, publisher, reviver, - serializer, deserializer, + serializer = defaultSerializer, messageEventName = 'message', pmessageEventName = 'pmessage', } = options; @@ -85,13 +93,14 @@ export class RedisPubSub implements PubSubEngine { } public async publish(trigger: string, payload: T): Promise { - if(this.serializer) { - await this.redisPublisher.publish(trigger, this.serializer(payload)); - } else if (payload instanceof Buffer){ - await this.redisPublisher.publish(trigger, payload); - } else { - await this.redisPublisher.publish(trigger, JSON.stringify(payload)); - } + await this.redisPublisher.publish(trigger, this.serializer(payload)); + } + + public pipeline() { + return new RedisPubSubPipeline({ + publisher: this.redisPublisher, + serializer: this.serializer, + }); } public subscribe( @@ -191,7 +200,7 @@ export class RedisPubSub implements PubSubEngine { ]); } - private readonly serializer?: Serializer; + private readonly serializer: Serializer; private readonly deserializer?: Deserializer; private readonly triggerTransform: TriggerTransform; private readonly redisSubscriber: RedisClient; @@ -251,5 +260,5 @@ export type TriggerTransform = ( channelOptions?: unknown, ) => string; export type Reviver = (key: any, value: any) => any; -export type Serializer = (source: any) => string; +export type Serializer = (source: unknown) => string | Buffer; export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any; diff --git a/src/test/tests.ts b/src/test/tests.ts index 759b6b0..b357ccf 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -2,6 +2,7 @@ import * as chai from 'chai'; import * as chaiAsPromised from 'chai-as-promised'; import { spy, restore, stub } from 'simple-mock'; import { RedisPubSub } from '../redis-pubsub'; +import { RedisPubSubPipeline } from "../redis-pipeline"; import * as IORedis from 'ioredis'; chai.use(chaiAsPromised); @@ -17,6 +18,9 @@ const unsubscribeSpy = spy((channel, cb) => cb && cb(channel)); const psubscribeSpy = spy((channel, cb) => cb && cb(null, channel)); const punsubscribeSpy = spy((channel, cb) => cb && cb(channel)); +const pipelinePublishSpy = spy(() => {}); +const pipelineExecSpy = spy(() => Promise.resolve()); +const pipelineSpy = spy(() => mockPipelineClient); const quitSpy = spy(cb => cb); const mockRedisClient = { publish: publishSpy, @@ -24,6 +28,7 @@ const mockRedisClient = { unsubscribe: unsubscribeSpy, psubscribe: psubscribeSpy, punsubscribe: punsubscribeSpy, + pipeline: pipelineSpy, on: (event, cb) => { if (event === 'message') { listener = cb; @@ -31,6 +36,10 @@ const mockRedisClient = { }, quit: quitSpy, }; +const mockPipelineClient = { + publish: pipelinePublishSpy, + exec: pipelineExecSpy, +}; const mockOptions = { publisher: (mockRedisClient as any), subscriber: (mockRedisClient as any), @@ -459,6 +468,13 @@ describe('RedisPubSub', () => { }); + it('creates a pipline', (done) => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + expect(pipeline).to.be.an.instanceOf(RedisPubSubPipeline); + done(); + }); + // TODO pattern subs afterEach('Reset spy count', () => { @@ -562,3 +578,49 @@ describe('PubSubAsyncIterator', () => { }); }); + +describe('RedisPubSubPipeline', () => { + it('should publish to pipeline', () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + + pipeline.publish('TOPIC', 'test'); + expect(pipelinePublishSpy.lastCall.args).to.have.members([ + 'TOPIC', + '"test"', + ]); + }); + + it('should be chainable', () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + + const result = pipeline.publish('TOPIC', { hello: 'world' }); + expect(result).to.be.an.instanceOf(RedisPubSubPipeline); + }); + + it('should use the serializer to transform the payload before publishing', () => { + const serializer = stub(); + const serializedPayload = `{ 'hello': 'custom' }`; + serializer.returnWith(serializedPayload); + + const pipeline = new RedisPubSubPipeline({ + publisher: mockRedisClient as any, + serializer, + }); + pipeline.publish('TOPIC', { hello: 'world' }); + + expect(serializer.callCount).to.equal(1); + expect(pipelinePublishSpy.lastCall.args).to.have.members([ + 'TOPIC', + serializedPayload, + ]); + }); + + it('should execute the pipeline', async () => { + const pubSub = new RedisPubSub(mockOptions); + const pipeline = pubSub.pipeline(); + await pipeline.exec(); + expect(pipelineExecSpy.callCount).to.equal(1); + }); +});