@@ -1066,51 +1066,64 @@ func (s *Server) Node() string {
1066
1066
// Tradeoff is subscription and interest graph events vs connect and
1067
1067
// disconnect events, etc.
1068
1068
func (s * Server ) initEventTracking () {
1069
- if ! s .EventsEnabled () {
1069
+ // Capture sys in case we are shutdown while setting up.
1070
+ s .mu .RLock ()
1071
+ sys := s .sys
1072
+ s .mu .RUnlock ()
1073
+
1074
+ if sys == nil || sys .client == nil || sys .account == nil {
1070
1075
return
1071
1076
}
1072
1077
// Create a system hash which we use for other servers to target us specifically.
1073
- s . sys .shash = getHash (s .info .Name )
1078
+ sys .shash = getHash (s .info .Name )
1074
1079
1075
1080
// This will be for all inbox responses.
1076
- subject := fmt .Sprintf (inboxRespSubj , s . sys .shash , "*" )
1081
+ subject := fmt .Sprintf (inboxRespSubj , sys .shash , "*" )
1077
1082
if _ , err := s .sysSubscribe (subject , s .inboxReply ); err != nil {
1078
1083
s .Errorf ("Error setting up internal tracking: %v" , err )
1084
+ return
1079
1085
}
1080
- s . sys .inboxPre = subject
1086
+ sys .inboxPre = subject
1081
1087
// This is for remote updates for connection accounting.
1082
1088
subject = fmt .Sprintf (accConnsEventSubjOld , "*" )
1083
1089
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteConnsUpdate )); err != nil {
1084
1090
s .Errorf ("Error setting up internal tracking for %s: %v" , subject , err )
1091
+ return
1085
1092
}
1086
1093
// This will be for responses for account info that we send out.
1087
1094
subject = fmt .Sprintf (connsRespSubj , s .info .ID )
1088
1095
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteConnsUpdate )); err != nil {
1089
1096
s .Errorf ("Error setting up internal tracking: %v" , err )
1097
+ return
1090
1098
}
1091
1099
// Listen for broad requests to respond with number of subscriptions for a given subject.
1092
1100
if _ , err := s .sysSubscribe (accNumSubsReqSubj , s .noInlineCallback (s .nsubsRequest )); err != nil {
1093
1101
s .Errorf ("Error setting up internal tracking: %v" , err )
1102
+ return
1094
1103
}
1095
1104
// Listen for statsz from others.
1096
1105
subject = fmt .Sprintf (serverStatsSubj , "*" )
1097
1106
if sub , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteServerUpdate )); err != nil {
1098
1107
s .Errorf ("Error setting up internal tracking: %v" , err )
1108
+ return
1099
1109
} else {
1100
1110
// Keep track of this one.
1101
- s . sys .remoteStatsSub = sub
1111
+ sys .remoteStatsSub = sub
1102
1112
}
1113
+
1103
1114
// Listen for all server shutdowns.
1104
1115
subject = fmt .Sprintf (shutdownEventSubj , "*" )
1105
1116
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteServerShutdown )); err != nil {
1106
1117
s .Errorf ("Error setting up internal tracking: %v" , err )
1118
+ return
1107
1119
}
1108
1120
// Listen for servers entering lame-duck mode.
1109
1121
// NOTE: This currently is handled in the same way as a server shutdown, but has
1110
1122
// a different subject in case we need to handle differently in future.
1111
1123
subject = fmt .Sprintf (lameDuckEventSubj , "*" )
1112
1124
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteServerShutdown )); err != nil {
1113
1125
s .Errorf ("Error setting up internal tracking: %v" , err )
1126
+ return
1114
1127
}
1115
1128
// Listen for account claims updates.
1116
1129
subscribeToUpdate := true
@@ -1121,13 +1134,15 @@ func (s *Server) initEventTracking() {
1121
1134
for _ , sub := range []string {accUpdateEventSubjOld , accUpdateEventSubjNew } {
1122
1135
if _ , err := s .sysSubscribe (fmt .Sprintf (sub , "*" ), s .noInlineCallback (s .accountClaimUpdate )); err != nil {
1123
1136
s .Errorf ("Error setting up internal tracking: %v" , err )
1137
+ return
1124
1138
}
1125
1139
}
1126
1140
}
1127
1141
// Listen for ping messages that will be sent to all servers for statsz.
1128
1142
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
1129
1143
if _ , err := s .sysSubscribe (serverStatsPingReqSubj , s .noInlineCallback (s .statszReq )); err != nil {
1130
1144
s .Errorf ("Error setting up internal tracking: %v" , err )
1145
+ return
1131
1146
}
1132
1147
monSrvc := map [string ]sysMsgHandler {
1133
1148
"IDZ" : s .idzReq ,
@@ -1181,10 +1196,12 @@ func (s *Server) initEventTracking() {
1181
1196
subject = fmt .Sprintf (serverDirectReqSubj , s .info .ID , name )
1182
1197
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (req )); err != nil {
1183
1198
s .Errorf ("Error setting up internal tracking: %v" , err )
1199
+ return
1184
1200
}
1185
1201
subject = fmt .Sprintf (serverPingReqSubj , name )
1186
1202
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (req )); err != nil {
1187
1203
s .Errorf ("Error setting up internal tracking: %v" , err )
1204
+ return
1188
1205
}
1189
1206
}
1190
1207
extractAccount := func (subject string ) (string , error ) {
@@ -1277,6 +1294,7 @@ func (s *Server) initEventTracking() {
1277
1294
for name , req := range monAccSrvc {
1278
1295
if _ , err := s .sysSubscribe (fmt .Sprintf (accDirectReqSubj , "*" , name ), s .noInlineCallback (req )); err != nil {
1279
1296
s .Errorf ("Error setting up internal tracking: %v" , err )
1297
+ return
1280
1298
}
1281
1299
}
1282
1300
@@ -1285,6 +1303,7 @@ func (s *Server) initEventTracking() {
1285
1303
// is only one that will answer. This breaks tests since we still forward on remote server connect.
1286
1304
if _ , err := s .sysSubscribe (fmt .Sprintf (userDirectReqSubj , "*" ), s .userInfoReq ); err != nil {
1287
1305
s .Errorf ("Error setting up internal tracking: %v" , err )
1306
+ return
1288
1307
}
1289
1308
1290
1309
// For now only the STATZ subject has an account specific ping equivalent.
@@ -1302,39 +1321,46 @@ func (s *Server) initEventTracking() {
1302
1321
})
1303
1322
})); err != nil {
1304
1323
s .Errorf ("Error setting up internal tracking: %v" , err )
1324
+ return
1305
1325
}
1306
1326
1307
1327
// Listen for updates when leaf nodes connect for a given account. This will
1308
1328
// force any gateway connections to move to `modeInterestOnly`
1309
1329
subject = fmt .Sprintf (leafNodeConnectEventSubj , "*" )
1310
1330
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .leafNodeConnected )); err != nil {
1311
1331
s .Errorf ("Error setting up internal tracking: %v" , err )
1332
+ return
1312
1333
}
1313
1334
// For tracking remote latency measurements.
1314
- subject = fmt .Sprintf (remoteLatencyEventSubj , s . sys .shash )
1335
+ subject = fmt .Sprintf (remoteLatencyEventSubj , sys .shash )
1315
1336
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .remoteLatencyUpdate )); err != nil {
1316
1337
s .Errorf ("Error setting up internal latency tracking: %v" , err )
1338
+ return
1317
1339
}
1318
1340
// This is for simple debugging of number of subscribers that exist in the system.
1319
1341
if _ , err := s .sysSubscribeInternal (accSubsSubj , s .noInlineCallback (s .debugSubscribers )); err != nil {
1320
1342
s .Errorf ("Error setting up internal debug service for subscribers: %v" , err )
1343
+ return
1321
1344
}
1322
1345
1323
1346
// Listen for requests to reload the server configuration.
1324
1347
subject = fmt .Sprintf (serverReloadReqSubj , s .info .ID )
1325
1348
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .reloadConfig )); err != nil {
1326
1349
s .Errorf ("Error setting up server reload handler: %v" , err )
1350
+ return
1327
1351
}
1328
1352
1329
1353
// Client connection kick
1330
1354
subject = fmt .Sprintf (clientKickReqSubj , s .info .ID )
1331
1355
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .kickClient )); err != nil {
1332
1356
s .Errorf ("Error setting up client kick service: %v" , err )
1357
+ return
1333
1358
}
1334
1359
// Client connection LDM
1335
1360
subject = fmt .Sprintf (clientLDMReqSubj , s .info .ID )
1336
1361
if _ , err := s .sysSubscribe (subject , s .noInlineCallback (s .ldmClient )); err != nil {
1337
1362
s .Errorf ("Error setting up client LDM service: %v" , err )
1363
+ return
1338
1364
}
1339
1365
}
1340
1366
0 commit comments