Skip to content

Commit 60b2d33

Browse files
kozlovicneilalexander
authored andcommitted
Fix comments and improve one test
Signed-off-by: Ivan Kozlovic <[email protected]>
1 parent 1b7f497 commit 60b2d33

File tree

3 files changed

+74
-19
lines changed

3 files changed

+74
-19
lines changed

server/leafnode.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2281,10 +2281,10 @@ const (
22812281
// Helper function to build the key that prevents collisions between normal
22822282
// routed subscriptions and routed subscriptions on behalf of a leafnode.
22832283
// Keys will look like this:
2284-
// "R foo" -> plain routed sub on "foo"
2285-
// "R foo bar" -> queue routed sub on "foo", queue "bar"
2286-
// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar"
2287-
// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz"
2284+
// "R foo" -> plain routed sub on "foo"
2285+
// "R foo bar" -> queue routed sub on "foo", queue "bar"
2286+
// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar"
2287+
// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz"
22882288
func keyFromSubWithOrigin(sub *subscription) string {
22892289
var sb strings.Builder
22902290
sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))

server/leafnode_test.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4875,9 +4875,12 @@ func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) {
48754875
for _, test := range []struct {
48764876
name string
48774877
pinnedAccount string
4878+
lnocu bool
48784879
}{
4879-
{"without pinned account", _EMPTY_},
4880-
{"with pinned account", "accounts: [\"XYZ\"]"},
4880+
{"without pinned account", _EMPTY_, true},
4881+
{"with pinned account", "accounts: [\"XYZ\"]", true},
4882+
{"old server without pinned account", _EMPTY_, false},
4883+
{"old server with pinned account", "accounts: [\"XYZ\"]", false},
48814884
} {
48824885
t.Run(test.name, func(t *testing.T) {
48834886
leafBConf := `
@@ -4906,6 +4909,20 @@ func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) {
49064909

49074910
checkClusterFormed(t, b1, b2)
49084911

4912+
// To make route connections behave like if the server was connected
4913+
// to an older server, change the routes' lnocu field.
4914+
if !test.lnocu {
4915+
for _, s := range []*Server{b1, b2} {
4916+
s.mu.RLock()
4917+
s.forEachRoute(func(r *client) {
4918+
r.mu.Lock()
4919+
r.route.lnocu = false
4920+
r.mu.Unlock()
4921+
})
4922+
s.mu.RUnlock()
4923+
}
4924+
}
4925+
49094926
// This leaf will have a cluster name that matches an account name.
49104927
// The idea is to make sure that hub servers are not using incorrect
49114928
// keys to differentiate a routed queue interest on subject "A" with
@@ -4954,19 +4971,21 @@ func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) {
49544971
natsFlush(t, ncA)
49554972

49564973
// Check the acc.rm on B2
4957-
acc, err := b2.LookupAccount("XYZ")
4974+
b2Acc, err := b2.LookupAccount("XYZ")
49584975
require_NoError(t, err)
49594976

49604977
rsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("foo")})
49614978
rqsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("XYZ"), queue: []byte("foo")})
49624979
rlsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("foo")})
49634980
rlqsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("XYZ"), queue: []byte("foo")})
4981+
// Ensure all keys are different
4982+
require_True(t, rsubKey != rqsubKey && rqsubKey != rlsubKey && rlsubKey != rlqsubKey)
49644983

49654984
checkFor(t, time.Second, 10*time.Millisecond, func() error {
4966-
acc.mu.RLock()
4967-
defer acc.mu.RUnlock()
4985+
b2Acc.mu.RLock()
4986+
defer b2Acc.mu.RUnlock()
49684987
for _, key := range []string{rsubKey, rqsubKey, rlsubKey, rlqsubKey} {
4969-
v, ok := acc.rm[key]
4988+
v, ok := b2Acc.rm[key]
49704989
if !ok {
49714990
return fmt.Errorf("Did not find key %q for sub: %+v", key, sub)
49724991
}
@@ -4978,15 +4997,15 @@ func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) {
49784997
})
49794998

49804999
// Now check that on B1, we have 2 distinct subs for the route.
4981-
acc, err = b1.LookupAccount("XYZ")
5000+
b1Acc, err := b1.LookupAccount("XYZ")
49825001
require_NoError(t, err)
49835002

49845003
var route *client
49855004

49865005
if test.pinnedAccount == _EMPTY_ {
4987-
acc.mu.RLock()
4988-
rIdx := acc.routePoolIdx
4989-
acc.mu.RUnlock()
5006+
b1Acc.mu.RLock()
5007+
rIdx := b1Acc.routePoolIdx
5008+
b1Acc.mu.RUnlock()
49905009
b1.mu.RLock()
49915010
b1.forEachRouteIdx(rIdx, func(r *client) bool {
49925011
route = r
@@ -5014,8 +5033,45 @@ func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) {
50145033
}
50155034
}
50165035
route.mu.Unlock()
5017-
if len(entries) != 4 {
5018-
return fmt.Errorf("Expected 4 entries with %q, got this: %q", "foo", entries)
5036+
// With new servers, we expect 4 entries, but with older servers,
5037+
// we have collisions and have only 2.
5038+
var expected int
5039+
if test.lnocu {
5040+
expected = 4
5041+
} else {
5042+
expected = 2
5043+
}
5044+
if len(entries) != expected {
5045+
return fmt.Errorf("Expected %d entries with %q, got this: %q", expected, "foo", entries)
5046+
}
5047+
return nil
5048+
})
5049+
5050+
// Close the connections and expect all gone.
5051+
ncB2.Close()
5052+
ncA.Close()
5053+
5054+
checkFor(t, time.Second, 10*time.Millisecond, func() error {
5055+
b2Acc.mu.RLock()
5056+
defer b2Acc.mu.RUnlock()
5057+
for _, key := range []string{rsubKey, rqsubKey, rlsubKey, rlqsubKey} {
5058+
if _, ok := b2Acc.rm[key]; ok {
5059+
return fmt.Errorf("Key %q still present", key)
5060+
}
5061+
}
5062+
return nil
5063+
})
5064+
checkFor(t, time.Second, 10*time.Millisecond, func() error {
5065+
var entries []string
5066+
route.mu.Lock()
5067+
for key := range route.subs {
5068+
if strings.Contains(key, "foo") {
5069+
entries = append(entries, key)
5070+
}
5071+
}
5072+
route.mu.Unlock()
5073+
if len(entries) != 0 {
5074+
return fmt.Errorf("Still routed subscriptions on %q: %q", "foo", entries)
50195075
}
50205076
return nil
50215077
})

server/route.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,9 +2432,8 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
24322432
return
24332433
}
24342434

2435-
// Create the fast key which will use the subject or '[origin]<spc>subject<spc>queue'
2436-
// for queue subscribers, where "origin" will be non-empty if it is a sub
2437-
// from a leafnode which has a cluster name provided.
2435+
// Create the subscription key which will prevent collisions between regular
2436+
// and leaf routed subscriptions. See keyFromSubWithOrigin() for details.
24382437
key := keyFromSubWithOrigin(sub)
24392438

24402439
// Decide whether we need to send an update out to all the routes.

0 commit comments

Comments
 (0)