Skip to content

Commit 1b7f497

Browse files
kozlovicneilalexander
authored andcommitted
[FIXED] LeafNode: propagation of (no)interest issues
There were multiple issues, but basically the fact that we would not store the routed subscriptions with the origin of the LEAF they came from made the server unable to differentiate those compared to "local" routed subscriptions, which in some cases (like a server restart and the resend of subscriptions) could lead to servers sending incorrectly subscription interest to leaf connections. We are now storing the subscriptions with a sub type indicator and the origin (for leaf subscriptions) as part of the key. This allows to differentiate "regular" routed subs versus the ones on behalf of a leafnode. An INFO boolean is added `LNOCU` to indicate support for origin in the `LS-` protocol, which is required to properly handle the removal. Therefore, if a route does not have `LNOCU`, the server will behave like an old server, and store with the key that does not contain the origin, so that it can be removed when getting an LS- without the origin. Note that in the case of a mix of servers in the same cluster, some of the issues this PR is trying to fix will be present (since the server will basically behave like a server without the fix). Having a different routed subs for leaf connections allow to revisit the fix #5982 that was done for issue #5972, which was about a more fair queue distribution to a cluster of leaf connections. That fix actually introduced a change in that we always wanted to favor queue subscriptions of the cluster where the message is produced, which that fix possibly changed. With this current PR, the server can now know if a remote queue sub is for a "local" queue sub there or on behalf of a leaf and therefore will not favor that route compared to a leaf subscription that it may have directly attached. Resolves #5972 Resolves #6148 Signed-off-by: Ivan Kozlovic <[email protected]>
1 parent d0be6af commit 1b7f497

File tree

12 files changed

+951
-133
lines changed

12 files changed

+951
-133
lines changed

server/client.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4684,19 +4684,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
46844684
continue
46854685
}
46864686
// Remember that leaf in case we don't find any other candidate.
4687+
// We already start randomly in lqs slice, so we don't need
4688+
// to do a random swap if we already have an rsub like we do
4689+
// when src == ROUTER above.
46874690
if rsub == nil {
46884691
rsub = sub
46894692
}
46904693
continue
46914694
} else {
4692-
// We would be picking a route, but if we had remembered a "hub" leaf,
4693-
// then pick that one instead of the route.
4694-
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
4695-
break
4695+
// We want to favor qsubs in our own cluster. If the routed
4696+
// qsub has an origin, it means that is on behalf of a leaf.
4697+
// We need to treat it differently.
4698+
if len(sub.origin) > 0 {
4699+
// If we already have an rsub, nothing to do. Also, do
4700+
// not pick a routed qsub for a LEAF origin cluster
4701+
// that is the same than where the message comes from.
4702+
if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) {
4703+
rsub = sub
4704+
}
4705+
continue
46964706
}
4707+
// This is a qsub that is local on the remote server (or
4708+
// we are connected to an older server and we don't know).
4709+
// Pick this one and be done.
46974710
rsub = sub
4711+
break
46984712
}
4699-
break
47004713
}
47014714

47024715
// Assume delivery subject is normal subject to this point.

server/const.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ const (
171171
// MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto.
172172
MAX_HPUB_ARGS = 4
173173

174+
// MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto.
175+
MAX_RSUB_ARGS = 6
176+
174177
// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
175178
DEFAULT_MAX_CLOSED_CLIENTS = 10000
176179

server/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1900,7 +1900,7 @@ func (c *client) processGatewayAccountSub(accName string) error {
19001900
// the sublist if present.
19011901
// <Invoked from outbound connection's readLoop>
19021902
func (c *client) processGatewayRUnsub(arg []byte) error {
1903-
accName, subject, queue, err := c.parseUnsubProto(arg)
1903+
_, accName, subject, queue, err := c.parseUnsubProto(arg, true, false)
19041904
if err != nil {
19051905
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
19061906
}

server/leafnode.go

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,6 +2271,42 @@ func keyFromSub(sub *subscription) string {
22712271
return sb.String()
22722272
}
22732273

2274+
const (
2275+
keyRoutedSub = "R"
2276+
keyRoutedSubByte = 'R'
2277+
keyRoutedLeafSub = "L"
2278+
keyRoutedLeafSubByte = 'L'
2279+
)
2280+
2281+
// Helper function to build the key that prevents collisions between normal
2282+
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2283+
// Keys will look like this:
2284+
// "R foo" -> plain routed sub on "foo"
2285+
// "R foo bar" -> queue routed sub on "foo", queue "bar"
2286+
// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar"
2287+
// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz"
2288+
func keyFromSubWithOrigin(sub *subscription) string {
2289+
var sb strings.Builder
2290+
sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
2291+
leaf := len(sub.origin) > 0
2292+
if leaf {
2293+
sb.WriteByte(keyRoutedLeafSubByte)
2294+
} else {
2295+
sb.WriteByte(keyRoutedSubByte)
2296+
}
2297+
sb.WriteByte(' ')
2298+
sb.Write(sub.subject)
2299+
if sub.queue != nil {
2300+
sb.WriteByte(' ')
2301+
sb.Write(sub.queue)
2302+
}
2303+
if leaf {
2304+
sb.WriteByte(' ')
2305+
sb.Write(sub.origin)
2306+
}
2307+
return sb.String()
2308+
}
2309+
22742310
// Lock should be held.
22752311
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
22762312
if key == _EMPTY_ {
@@ -2321,12 +2357,21 @@ func (c *client) processLeafSub(argo []byte) (err error) {
23212357
args := splitArg(arg)
23222358
sub := &subscription{client: c}
23232359

2360+
delta := int32(1)
23242361
switch len(args) {
23252362
case 1:
23262363
sub.queue = nil
23272364
case 3:
23282365
sub.queue = args[1]
23292366
sub.qw = int32(parseSize(args[2]))
2367+
// TODO: (ik) We should have a non empty queue name and a queue
2368+
// weight >= 1. For 2.11, we may want to return an error if that
2369+
// is not the case, but for now just overwrite `delta` if queue
2370+
// weight is greater than 1 (it is possible after a reconnect/
2371+
// server restart to receive a queue weight > 1 for a new sub).
2372+
if sub.qw > 1 {
2373+
delta = sub.qw
2374+
}
23302375
default:
23312376
return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
23322377
}
@@ -2391,7 +2436,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
23912436
key := bytesToString(sub.sid)
23922437
osub := c.subs[key]
23932438
updateGWs := false
2394-
delta := int32(1)
23952439
if osub == nil {
23962440
c.subs[key] = sub
23972441
// Now place into the account sl.
@@ -2472,6 +2516,10 @@ func (c *client) processLeafUnsub(arg []byte) error {
24722516
// We store local subs by account and subject and optionally queue name.
24732517
// LS- will have the arg exactly as the key.
24742518
sub, ok := c.subs[string(arg)]
2519+
delta := int32(1)
2520+
if ok && len(sub.queue) > 0 {
2521+
delta = sub.qw
2522+
}
24752523
c.mu.Unlock()
24762524

24772525
if ok {
@@ -2481,14 +2529,14 @@ func (c *client) processLeafUnsub(arg []byte) error {
24812529

24822530
if !spoke {
24832531
// If we are routing subtract from the route map for the associated account.
2484-
srv.updateRouteSubscriptionMap(acc, sub, -1)
2532+
srv.updateRouteSubscriptionMap(acc, sub, -delta)
24852533
// Gateways
24862534
if updateGWs {
2487-
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
2535+
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
24882536
}
24892537
}
24902538
// Now check on leafnode updates for other leaf nodes.
2491-
acc.updateLeafNodes(sub, -1)
2539+
acc.updateLeafNodes(sub, -delta)
24922540
return nil
24932541
}
24942542

0 commit comments

Comments
 (0)