Skip to content

Commit 03848b7

Browse files
committed
Add meta snapshot metrics to jsz monitoring
Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent a098c7e commit 03848b7

File tree

3 files changed

+66
-6
lines changed

3 files changed

+66
-6
lines changed

server/jetstream_cluster.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type jetStreamCluster struct {
7070
peerStreamCancelMove *subscription
7171
// To pop out the monitorCluster before the raft layer.
7272
qch chan struct{}
73+
// Track last meta snapshot time and duration for monitoring (atomic)
74+
lastMetaSnapTime int64 // Unix nanoseconds, accessed atomically
75+
lastMetaSnapDuration int64 // Duration in nanoseconds, accessed atomically
7376
}
7477

7578
// Used to track inflight stream add requests to properly re-use same group and sync subject.
@@ -1405,11 +1408,20 @@ func (js *jetStream) monitorCluster() {
14051408
byNeither := !byEntries && !bySize
14061409
// For the meta layer we want to snapshot when over the above threshold (which could be 0 by default).
14071410
if ne, nsz := n.Size(); force || byNeither || (byEntries && ne > ethresh) || (bySize && nsz > szthresh) || n.NeedSnapshot() {
1411+
snapStart := time.Now()
14081412
snap, err := js.metaSnapshot()
14091413
if err != nil {
14101414
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
14111415
} else if err = n.InstallSnapshot(snap); err == nil {
14121416
lastSnapTime = time.Now()
1417+
snapDuration := lastSnapTime.Sub(snapStart)
1418+
// Update cluster-level last snap time and duration for monitoring (atomic)
1419+
js.mu.RLock()
1420+
if js.cluster != nil {
1421+
atomic.StoreInt64(&js.cluster.lastMetaSnapTime, lastSnapTime.UnixNano())
1422+
atomic.StoreInt64(&js.cluster.lastMetaSnapDuration, int64(snapDuration))
1423+
}
1424+
js.mu.RUnlock()
14131425
} else if err != errNoSnapAvailable && err != errNodeClosed {
14141426
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
14151427
}

server/monitor.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2969,14 +2969,23 @@ type AccountDetail struct {
29692969
Streams []StreamDetail `json:"stream_detail,omitempty"`
29702970
}
29712971

2972+
// MetaSnapshotStats shows information about meta snapshots.
2973+
type MetaSnapshotStats struct {
2974+
PendingEntries uint64 `json:"pending_entries"` // PendingEntries is the count of pending entries in the meta layer
2975+
PendingSize uint64 `json:"pending_size"` // PendingSize is the size in bytes of pending entries in the meta layer
2976+
LastTime time.Time `json:"last_time,omitempty"` // LastTime is when the last meta snapshot was taken
2977+
LastDuration time.Duration `json:"last_duration,omitempty"` // LastDuration is how long the last meta snapshot took
2978+
}
2979+
29722980
// MetaClusterInfo shows information about the meta group.
29732981
type MetaClusterInfo struct {
2974-
Name string `json:"name,omitempty"` // Name is the name of the cluster
2975-
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
2976-
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
2977-
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
2978-
Size int `json:"cluster_size"` // Size is the known size of the cluster
2979-
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
2982+
Name string `json:"name,omitempty"` // Name is the name of the cluster
2983+
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
2984+
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
2985+
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
2986+
Size int `json:"cluster_size"` // Size is the known size of the cluster
2987+
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
2988+
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
29802989
}
29812990

29822991
// JSInfo has detailed information on JetStream.
@@ -3181,13 +3190,33 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
31813190

31823191
if mg := js.getMetaGroup(); mg != nil {
31833192
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
3193+
entries, bytes := mg.Size()
31843194
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
31853195
if isLeader {
31863196
jsi.Meta.Replicas = ci.Replicas
31873197
}
31883198
if ipq := s.jsAPIRoutedReqs; ipq != nil {
31893199
jsi.Meta.Pending = ipq.len()
31903200
}
3201+
// Add meta snapshot stats
3202+
jsi.Meta.Snapshot = &MetaSnapshotStats{
3203+
PendingEntries: entries,
3204+
PendingSize: bytes,
3205+
}
3206+
// Read atomic snapshot timing (no lock needed for atomics)
3207+
js.mu.RLock()
3208+
cluster := js.cluster
3209+
js.mu.RUnlock()
3210+
if cluster != nil {
3211+
timeNanos := atomic.LoadInt64(&cluster.lastMetaSnapTime)
3212+
durationNanos := atomic.LoadInt64(&cluster.lastMetaSnapDuration)
3213+
if timeNanos > 0 {
3214+
jsi.Meta.Snapshot.LastTime = time.Unix(0, timeNanos).UTC()
3215+
}
3216+
if durationNanos > 0 {
3217+
jsi.Meta.Snapshot.LastDuration = time.Duration(durationNanos)
3218+
}
3219+
}
31913220
}
31923221
}
31933222

server/monitor_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5306,6 +5306,25 @@ func TestMonitorJsz(t *testing.T) {
53065306
t.Fatalf("received cluster info from multiple nodes")
53075307
}
53085308
})
5309+
t.Run("meta-snapshot-stats", func(t *testing.T) {
5310+
for _, url := range []string{monUrl1, monUrl2} {
5311+
info := readJsInfo(url)
5312+
require_True(t, info.Meta != nil)
5313+
require_True(t, info.Meta.Snapshot != nil)
5314+
5315+
snapshot := info.Meta.Snapshot
5316+
5317+
require_True(t, snapshot.PendingEntries >= 0)
5318+
require_True(t, snapshot.PendingSize >= 0)
5319+
5320+
if !snapshot.LastTime.IsZero() {
5321+
require_Equal(t, snapshot.LastTime.Location(), time.UTC)
5322+
require_True(t, snapshot.LastDuration >= 0)
5323+
require_True(t, snapshot.LastDuration < 30*time.Second)
5324+
require_True(t, time.Since(snapshot.LastTime) < 5*time.Minute)
5325+
}
5326+
}
5327+
})
53095328
t.Run("account-non-existing", func(t *testing.T) {
53105329
for _, url := range []string{monUrl1, monUrl2} {
53115330
info := readJsInfo(url + "?acc=DOES_NOT_EXIST")

0 commit comments

Comments
 (0)