From 7a5a900c6086225d4ed1eb69f15bb298cc8200e2 Mon Sep 17 00:00:00 2001 From: sondremaereoverskaug Date: Mon, 10 Feb 2025 09:29:59 +0100 Subject: [PATCH] fix(unsubscribe): only punsubscribe when a pattern matcher actually has been used Instead of both unsubscribing and punsubcsribing towards redis, we select the method based on the configuration when the subscribe-method was issued. This is important for clients wanting to use a serverless redis or valkey with AWS, as they do not support the PUNSUBSCRIBE command at all. --- src/redis-pubsub.ts | 16 ++++++++++------ src/test/tests.ts | 21 ++++++++++++++++++++- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/redis-pubsub.ts b/src/redis-pubsub.ts index 6e25e4f..bead346 100644 --- a/src/redis-pubsub.ts +++ b/src/redis-pubsub.ts @@ -102,7 +102,8 @@ export class RedisPubSub implements PubSubEngine { const triggerName: string = this.triggerTransform(trigger, options); const id = this.currentSubscriptionId++; - this.subscriptionMap[id] = [triggerName, onMessage]; + const patternSubscription: boolean = !!options['pattern']; + this.subscriptionMap[id] = [triggerName, onMessage, patternSubscription]; if (!this.subsRefsMap.has(triggerName)) { this.subsRefsMap.set(triggerName, new Set()); @@ -127,7 +128,7 @@ export class RedisPubSub implements PubSubEngine { subsPendingRefsMap.set(triggerName, { refs: [], pending }); const sub = new Promise((resolve, reject) => { - const subscribeFn = options['pattern'] ? this.redisSubscriber.psubscribe : this.redisSubscriber.subscribe; + const subscribeFn = patternSubscription ? this.redisSubscriber.psubscribe : this.redisSubscriber.subscribe; subscribeFn.call(this.redisSubscriber, triggerName, err => { if (err) { @@ -151,15 +152,18 @@ export class RedisPubSub implements PubSubEngine { } public unsubscribe(subId: number): void { - const [triggerName = null] = this.subscriptionMap[subId] || []; + const [triggerName = null,, patternSubscription] = this.subscriptionMap[subId] || []; const refs = this.subsRefsMap.get(triggerName); if (!refs) throw new Error(`There is no subscription of id "${subId}"`); if (refs.size === 1) { // unsubscribe from specific channel and pattern match - this.redisSubscriber.unsubscribe(triggerName); - this.redisSubscriber.punsubscribe(triggerName); + if (patternSubscription) { + this.redisSubscriber.punsubscribe(triggerName); + } else { + this.redisSubscriber.unsubscribe(triggerName); + } this.subsRefsMap.delete(triggerName); } else { @@ -198,7 +202,7 @@ export class RedisPubSub implements PubSubEngine { private readonly redisPublisher: RedisClient; private readonly reviver: Reviver; - private readonly subscriptionMap: { [subId: number]: [string, OnMessage] }; + private readonly subscriptionMap: { [subId: number]: [string, OnMessage, patternSubscription: boolean] }; private readonly subsRefsMap: Map>; private readonly subsPendingRefsMap: Map }>; private currentSubscriptionId: number; diff --git a/src/test/tests.ts b/src/test/tests.ts index 759b6b0..23228cb 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -96,7 +96,7 @@ describe('RedisPubSub', () => { }); }); - it('can unsubscribe from specific redis channel', done => { + it('can unsubscribe from specific redis channel with pattern matching disabled', done => { const pubSub = new RedisPubSub(mockOptions); pubSub.subscribe('Posts', () => null).then(subId => { pubSub.unsubscribe(subId); @@ -106,6 +106,25 @@ describe('RedisPubSub', () => { expect(unsubscribeSpy.callCount).to.equals(1); expect(unsubscribeSpy.lastCall.args).to.have.members(['Posts']); + expect(punsubscribeSpy.callCount).to.equals(0); + + done(); + + } catch (e) { + done(e); + } + }); + }); + + it('can unsubscribe from specific redis channel with pattern matching enabled', done => { + const pubSub = new RedisPubSub(mockOptions); + pubSub.subscribe('Posts', () => null, { pattern: true }).then(subId => { + pubSub.unsubscribe(subId); + + try { + + expect(unsubscribeSpy.callCount).to.equals(0); + expect(punsubscribeSpy.callCount).to.equals(1); expect(punsubscribeSpy.lastCall.args).to.have.members(['Posts']);