Skip to content

Commit ad5f656

Browse files
mgravellvandyvillaatakavci
authored
Support sharded pubsub commands (#2887)
* Support sharded pubsub commands (#2498) Co-authored-by: vandyvilla <[email protected]> Co-authored-by: xli <[email protected]> Co-authored-by: atakavci <[email protected]>
1 parent cfbd474 commit ad5f656

File tree

20 files changed

+396
-69
lines changed

20 files changed

+396
-69
lines changed

docs/ReleaseNotes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Current package versions:
99
## Unreleased
1010
No pending unreleased changes
1111

12+
- Add support for sharded pub/sub via `RedisChannel.Sharded` - ([#2887 by vandyvilla, atakavci and mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2887))
13+
1214
## 2.8.37
1315

1416
- Add `ConfigurationOptions.SetUserPemCertificate(...)` and `ConfigurationOptions.SetUserPfxCertificate(...)` methods to simplify using client certificates ([#2873 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2873))

docs/Timeouts.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
8888
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
8989
|aw | Active-Writer: {bool}||
9090
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
91-
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
91+
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
9292
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
9393
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
9494
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|

src/StackExchange.Redis/ClientInfo.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,15 @@ public sealed class ClientInfo
129129
public string? Name { get; private set; }
130130

131131
/// <summary>
132-
/// Number of pattern matching subscriptions.
132+
/// Number of pattern-matching subscriptions.
133133
/// </summary>
134134
public int PatternSubscriptionCount { get; private set; }
135135

136+
/// <summary>
137+
/// Number of sharded subscriptions.
138+
/// </summary>
139+
public int ShardedSubscriptionCount { get; private set; }
140+
136141
/// <summary>
137142
/// The port of the client.
138143
/// </summary>
@@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
236241
case "name": client.Name = value; break;
237242
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
238243
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
244+
case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
239245
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
240246
case "cmd": client.LastCommand = value; break;
241247
case "flags":

src/StackExchange.Redis/CommandMap.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public sealed class CommandMap
3131

3232
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
3333

34-
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
34+
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
3535

3636
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
3737

@@ -57,7 +57,9 @@ public sealed class CommandMap
5757

5858
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
5959

60-
RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
60+
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
61+
62+
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
6163

6264
RedisCommand.SCRIPT,
6365

src/StackExchange.Redis/Enums/RedisCommand.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,16 @@ internal enum RedisCommand
181181
SORT,
182182
SORT_RO,
183183
SPOP,
184+
SPUBLISH,
184185
SRANDMEMBER,
185186
SREM,
186187
STRLEN,
187188
SUBSCRIBE,
188189
SUNION,
189190
SUNIONSTORE,
190191
SSCAN,
192+
SSUBSCRIBE,
193+
SUNSUBSCRIBE,
191194
SWAPDB,
192195
SYNC,
193196

@@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
447450
case RedisCommand.SMEMBERS:
448451
case RedisCommand.SMISMEMBER:
449452
case RedisCommand.SORT_RO:
453+
case RedisCommand.SPUBLISH:
450454
case RedisCommand.SRANDMEMBER:
455+
case RedisCommand.SSUBSCRIBE:
451456
case RedisCommand.STRLEN:
452457
case RedisCommand.SUBSCRIBE:
453458
case RedisCommand.SUNION:
459+
case RedisCommand.SUNSUBSCRIBE:
454460
case RedisCommand.SSCAN:
455461
case RedisCommand.SYNC:
456462
case RedisCommand.TIME:

src/StackExchange.Redis/Interfaces/ISubscriber.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public interface ISubscriber : IRedis
110110
/// See
111111
/// <seealso href="https://redis.io/commands/unsubscribe"/>,
112112
/// <seealso href="https://redis.io/commands/punsubscribe"/>.
113+
/// <seealso href="https://redis.io/commands/sunsubscribe"/>.
113114
/// </remarks>
114115
void UnsubscribeAll(CommandFlags flags = CommandFlags.None);
115116

src/StackExchange.Redis/Message.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
569569
case RedisCommand.SLAVEOF:
570570
case RedisCommand.SLOWLOG:
571571
case RedisCommand.SUBSCRIBE:
572+
case RedisCommand.SPUBLISH:
573+
case RedisCommand.SSUBSCRIBE:
574+
case RedisCommand.SUNSUBSCRIBE:
572575
case RedisCommand.SWAPDB:
573576
case RedisCommand.SYNC:
574577
case RedisCommand.TIME:

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,12 @@ public enum State : byte
124124
public RedisCommand LastCommand { get; private set; }
125125

126126
/// <summary>
127-
/// If we have a connection, report the protocol being used.
127+
/// If we have (or had) a connection, report the protocol being used.
128128
/// </summary>
129-
public RedisProtocol? Protocol => physical?.Protocol;
129+
/// <remarks>The value remains after disconnect, so that appropriate follow-up actions (pub/sub etc) can work reliably.</remarks>
130+
public RedisProtocol? Protocol => _protocol == 0 ? default(RedisProtocol?) : _protocol;
131+
private RedisProtocol _protocol; // note starts at zero, not RESP2
132+
internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol;
130133

131134
public void Dispose()
132135
{

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable
2929

3030
private const int DefaultRedisDatabaseCount = 16;
3131

32-
private static readonly CommandBytes message = "message", pmessage = "pmessage";
32+
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
3333

3434
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
3535
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
@@ -276,7 +276,11 @@ private enum ReadMode : byte
276276
private RedisProtocol _protocol; // note starts at **zero**, not RESP2
277277
public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;
278278

279-
internal void SetProtocol(RedisProtocol value) => _protocol = value;
279+
internal void SetProtocol(RedisProtocol value)
280+
{
281+
_protocol = value;
282+
BridgeCouldBeNull?.SetProtocol(value);
283+
}
280284

281285
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
282286
internal void Shutdown()
@@ -384,7 +388,7 @@ public void RecordConnectionFailed(
384388
bool isInitialConnect = false,
385389
IDuplexPipe? connectingPipe = null)
386390
{
387-
bool weAskedForThis = false;
391+
bool weAskedForThis;
388392
Exception? outerException = innerException;
389393
IdentifyFailureType(innerException, ref failureType);
390394
var bridge = BridgeCouldBeNull;
@@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)
16441648

16451649
// out of band message does not match to a queued message
16461650
var items = result.GetItems();
1647-
if (items.Length >= 3 && items[0].IsEqual(message))
1651+
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
16481652
{
1649-
_readStatus = ReadStatus.PubSubMessage;
1653+
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
16501654

16511655
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
16521656
var configChanged = muxer.ConfigurationChangedChannel;
@@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result)
16681672
}
16691673

16701674
// invoke the handlers
1671-
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
1672-
Trace("MESSAGE: " + channel);
1675+
RedisChannel channel;
1676+
if (items[0].IsEqual(message))
1677+
{
1678+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
1679+
Trace("MESSAGE: " + channel);
1680+
}
1681+
else // see check on outer-if that restricts to message / smessage
1682+
{
1683+
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
1684+
Trace("SMESSAGE: " + channel);
1685+
}
16731686
if (!channel.IsNull)
16741687
{
16751688
if (TryGetPubSubPayload(items[2], out var payload))
@@ -1690,27 +1703,30 @@ private void MatchResult(in RawResult result)
16901703
{
16911704
_readStatus = ReadStatus.PubSubPMessage;
16921705

1693-
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
1706+
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1707+
16941708
Trace("PMESSAGE: " + channel);
16951709
if (!channel.IsNull)
16961710
{
16971711
if (TryGetPubSubPayload(items[3], out var payload))
16981712
{
1699-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1713+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1714+
17001715
_readStatus = ReadStatus.InvokePubSub;
17011716
muxer.OnMessage(sub, channel, payload);
17021717
}
17031718
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
17041719
{
1705-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
1720+
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1721+
17061722
_readStatus = ReadStatus.InvokePubSub;
17071723
muxer.OnMessage(sub, channel, payloads);
17081724
}
17091725
}
17101726
return; // AND STOP PROCESSING!
17111727
}
17121728

1713-
// if it didn't look like "[p]message", then we still need to process the pending queue
1729+
// if it didn't look like "[p|s]message", then we still need to process the pending queue
17141730
}
17151731
Trace("Matching result...");
17161732

@@ -2110,6 +2126,7 @@ internal enum ReadStatus
21102126
MatchResult,
21112127
PubSubMessage,
21122128
PubSubPMessage,
2129+
PubSubSMessage,
21132130
Reconfigure,
21142131
InvokePubSub,
21152132
ResponseSequenceCheck, // high-integrity mode only

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,6 +1309,7 @@ StackExchange.Redis.RedisChannel
13091309
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
13101310
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
13111311
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
1312+
StackExchange.Redis.RedisChannel.IsSharded.get -> bool
13121313
StackExchange.Redis.RedisChannel.PatternMode
13131314
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
13141315
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
@@ -1893,4 +1894,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
18931894
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
18941895
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
18951896
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
1897+
StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
1898+
static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
1899+
static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
1900+
StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
18961901
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void

src/StackExchange.Redis/RawResult.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,20 +161,21 @@ public bool MoveNext()
161161
}
162162
public ReadOnlySequence<byte> Current { get; private set; }
163163
}
164-
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
164+
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
165165
{
166166
switch (Resp2TypeBulkString)
167167
{
168168
case ResultType.SimpleString:
169169
case ResultType.BulkString:
170170
if (channelPrefix == null)
171171
{
172-
return new RedisChannel(GetBlob(), mode);
172+
return new RedisChannel(GetBlob(), options);
173173
}
174174
if (StartsWith(channelPrefix))
175175
{
176176
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
177-
return new RedisChannel(copy, mode);
177+
178+
return new RedisChannel(copy, options);
178179
}
179180
return default;
180181
default:

src/StackExchange.Redis/RedisChannel.cs

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,18 @@ namespace StackExchange.Redis
99
public readonly struct RedisChannel : IEquatable<RedisChannel>
1010
{
1111
internal readonly byte[]? Value;
12-
internal readonly bool _isPatternBased;
12+
13+
internal readonly RedisChannelOptions Options;
14+
15+
[Flags]
16+
internal enum RedisChannelOptions
17+
{
18+
None = 0,
19+
Pattern = 1 << 0,
20+
Sharded = 1 << 1,
21+
}
22+
23+
internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
1324

1425
/// <summary>
1526
/// Indicates whether the channel-name is either null or a zero-length value.
@@ -19,7 +30,12 @@ namespace StackExchange.Redis
1930
/// <summary>
2031
/// Indicates whether this channel represents a wildcard pattern (see <c>PSUBSCRIBE</c>).
2132
/// </summary>
22-
public bool IsPattern => _isPatternBased;
33+
public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;
34+
35+
/// <summary>
36+
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>).
37+
/// </summary>
38+
public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;
2339

2440
internal bool IsNull => Value == null;
2541

@@ -59,19 +75,35 @@ public static bool UseImplicitAutoPattern
5975
/// </summary>
6076
/// <param name="value">The name of the channel to create.</param>
6177
/// <param name="mode">The mode for name matching.</param>
62-
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
78+
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
79+
{
80+
}
6381

6482
/// <summary>
6583
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
6684
/// </summary>
6785
/// <param name="value">The string name of the channel to create.</param>
6886
/// <param name="mode">The mode for name matching.</param>
69-
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
87+
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
88+
{
89+
}
90+
91+
/// <summary>
92+
/// Create a new redis channel from a buffer, representing a sharded channel.
93+
/// </summary>
94+
/// <param name="value">The name of the channel to create.</param>
95+
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
96+
97+
/// <summary>
98+
/// Create a new redis channel from a string, representing a sharded channel.
99+
/// </summary>
100+
/// <param name="value">The string name of the channel to create.</param>
101+
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
70102

71-
private RedisChannel(byte[]? value, bool isPatternBased)
103+
internal RedisChannel(byte[]? value, RedisChannelOptions options)
72104
{
73105
Value = value;
74-
_isPatternBased = isPatternBased;
106+
Options = options;
75107
}
76108

77109
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
@@ -123,7 +155,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
123155
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
124156
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
125157
public static bool operator ==(RedisChannel x, RedisChannel y) =>
126-
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
158+
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
127159

128160
/// <summary>
129161
/// Indicate whether two channel names are equal.
@@ -171,10 +203,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
171203
/// Indicate whether two channel names are equal.
172204
/// </summary>
173205
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
174-
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
206+
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
175207

176208
/// <inheritdoc/>
177-
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
209+
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
178210

179211
/// <summary>
180212
/// Obtains a string representation of the channel name.
@@ -203,7 +235,7 @@ internal RedisChannel Clone()
203235
return this;
204236
}
205237
var copy = (byte[])Value.Clone(); // defensive array copy
206-
return new RedisChannel(copy, _isPatternBased);
238+
return new RedisChannel(copy, Options);
207239
}
208240

209241
/// <summary>

0 commit comments

Comments
 (0)