Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.18-RC.4 #5664

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,51 +1065,64 @@ func (s *Server) Node() string {
// Tradeoff is subscription and interest graph events vs connect and
// disconnect events, etc.
func (s *Server) initEventTracking() {
if !s.EventsEnabled() {
// Capture sys in case we are shutdown while setting up.
s.mu.RLock()
sys := s.sys
s.mu.RUnlock()

if sys == nil || sys.client == nil || sys.account == nil {
return
}
// Create a system hash which we use for other servers to target us specifically.
s.sys.shash = getHash(s.info.Name)
sys.shash = getHash(s.info.Name)

// This will be for all inbox responses.
subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*")
subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*")
if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
s.sys.inboxPre = subject
sys.inboxPre = subject
// This is for remote updates for connection accounting.
subject = fmt.Sprintf(accConnsEventSubjOld, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking for %s: %v", subject, err)
return
}
// This will be for responses for account info that we send out.
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for broad requests to respond with number of subscriptions for a given subject.
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for statsz from others.
subject = fmt.Sprintf(serverStatsSubj, "*")
if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
} else {
// Keep track of this one.
s.sys.remoteStatsSub = sub
sys.remoteStatsSub = sub
}

// Listen for all server shutdowns.
subject = fmt.Sprintf(shutdownEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for servers entering lame-duck mode.
// NOTE: This currently is handled in the same way as a server shutdown, but has
// a different subject in case we need to handle differently in future.
subject = fmt.Sprintf(lameDuckEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for account claims updates.
subscribeToUpdate := true
Expand All @@ -1120,13 +1133,15 @@ func (s *Server) initEventTracking() {
for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} {
if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
}
// Listen for ping messages that will be sent to all servers for statsz.
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
monSrvc := map[string]sysMsgHandler{
"IDZ": s.idzReq,
Expand Down Expand Up @@ -1180,10 +1195,12 @@ func (s *Server) initEventTracking() {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
subject = fmt.Sprintf(serverPingReqSubj, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
extractAccount := func(subject string) (string, error) {
Expand Down Expand Up @@ -1276,6 +1293,7 @@ func (s *Server) initEventTracking() {
for name, req := range monAccSrvc {
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}

Expand All @@ -1284,6 +1302,7 @@ func (s *Server) initEventTracking() {
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// For now only the STATZ subject has an account specific ping equivalent.
Expand All @@ -1301,39 +1320,46 @@ func (s *Server) initEventTracking() {
})
})); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// For tracking remote latency measurements.
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil {
s.Errorf("Error setting up internal latency tracking: %v", err)
return
}
// This is for simple debugging of number of subscribers that exist in the system.
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
return
}

// Listen for requests to reload the server configuration.
subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil {
s.Errorf("Error setting up server reload handler: %v", err)
return
}

// Client connection kick
subject = fmt.Sprintf(clientKickReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil {
s.Errorf("Error setting up client kick service: %v", err)
return
}
// Client connection LDM
subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil {
s.Errorf("Error setting up client LDM service: %v", err)
return
}
}

Expand Down
10 changes: 10 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4098,6 +4098,16 @@ func (s *Server) isLameDuckMode() bool {
return s.ldm
}

// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
// the client listener is closed, existing client connections are
// kicked, Raft leaderships are transferred, JetStream is shutdown
// and then finally shutdown the the NATS Server itself.
// This function blocks and will not return until the NATS Server
// has completed the entire shutdown operation.
func (s *Server) LameDuckShutdown() {
s.lameDuckMode()
}

// This function will close the client listener then close the clients
// at some interval to avoid a reconnect storm.
// We will also transfer any raft leaders and shutdown JetStream.
Expand Down
14 changes: 13 additions & 1 deletion server/stree/stree.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (t *SubjectTree[T]) Empty() *SubjectTree[T] {

// Insert a value into the tree. Will return if the value was updated and if so the old value.
func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) {
if t == nil {
return nil, false
}

old, updated := t.insert(&t.root, subject, value, 0)
if !updated {
t.size++
Expand All @@ -60,6 +64,10 @@ func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) {

// Find will find the value and return it or false if it was not found.
func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) {
if t == nil {
return nil, false
}

var si int
for n := t.root; n != nil; {
if n.isLeaf() {
Expand Down Expand Up @@ -88,6 +96,10 @@ func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) {

// Delete will delete the item and return its value, or not found if it did not exist.
func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) {
if t == nil {
return nil, false
}

val, deleted := t.delete(&t.root, subject, 0)
if deleted {
t.size--
Expand All @@ -97,7 +109,7 @@ func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) {

// Match will match against a subject that can have wildcards and invoke the callback func for each matched value.
func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
if len(filter) == 0 || cb == nil {
if t == nil || t.root == nil || len(filter) == 0 || cb == nil {
return
}
// We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'.
Expand Down
11 changes: 11 additions & 0 deletions server/stree/stree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,14 @@ func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) {
})
}
}

func TestSubjectTreeNilNoPanic(t *testing.T) {
var st *SubjectTree[int]
st.Match([]byte("foo"), func(_ []byte, _ *int) {})
_, found := st.Find([]byte("foo"))
require_False(t, found)
_, found = st.Delete([]byte("foo"))
require_False(t, found)
_, found = st.Insert([]byte("foo"), 22)
require_False(t, found)
}