Skip to content

Commit 5ed0c14

Browse files
committed
Add meta snapshot metrics to jsz monitoring
``` "snapshot": { "pending_entries": 1, "pending_size": 1314, "last_time": "2025-11-06T18:14:40.659678019Z", "last_duration": 161096363 } ``` Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent a098c7e commit 5ed0c14

File tree

3 files changed

+93
-8
lines changed

3 files changed

+93
-8
lines changed

server/jetstream_cluster.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type jetStreamCluster struct {
7070
peerStreamCancelMove *subscription
7171
// To pop out the monitorCluster before the raft layer.
7272
qch chan struct{}
73+
// Track last meta snapshot time and duration for monitoring.
74+
lastMetaSnapTime int64 // Unix nanoseconds
75+
lastMetaSnapDuration int64 // Duration in nanoseconds
7376
}
7477

7578
// Used to track inflight stream add requests to properly re-use same group and sync subject.
@@ -1667,15 +1670,23 @@ func (js *jetStream) metaSnapshot() ([]byte, error) {
16671670
return nil, err
16681671
}
16691672

1670-
// Track how long it took to compress the JSON
1673+
// Track how long it took to compress the JSON.
16711674
cstart := time.Now()
16721675
snap := s2.Encode(nil, b)
16731676
cend := time.Since(cstart)
1677+
took := time.Since(start)
16741678

1675-
if took := time.Since(start); took > time.Second {
1679+
if took > time.Second {
16761680
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
16771681
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
16781682
}
1683+
1684+
// Track in jsz monitoring as well.
1685+
if cc != nil {
1686+
atomic.StoreInt64(&cc.lastMetaSnapTime, start.UnixNano())
1687+
atomic.StoreInt64(&cc.lastMetaSnapDuration, int64(took))
1688+
}
1689+
16791690
return snap, nil
16801691
}
16811692

server/monitor.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2969,14 +2969,23 @@ type AccountDetail struct {
29692969
Streams []StreamDetail `json:"stream_detail,omitempty"`
29702970
}
29712971

2972+
// MetaSnapshotStats shows information about meta snapshots.
2973+
type MetaSnapshotStats struct {
2974+
PendingEntries uint64 `json:"pending_entries"` // PendingEntries is the count of pending entries in the meta layer
2975+
PendingSize uint64 `json:"pending_size"` // PendingSize is the size in bytes of pending entries in the meta layer
2976+
LastTime time.Time `json:"last_time,omitempty"` // LastTime is when the last meta snapshot was taken
2977+
LastDuration time.Duration `json:"last_duration,omitempty"` // LastDuration is how long the last meta snapshot took
2978+
}
2979+
29722980
// MetaClusterInfo shows information about the meta group.
29732981
type MetaClusterInfo struct {
2974-
Name string `json:"name,omitempty"` // Name is the name of the cluster
2975-
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
2976-
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
2977-
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
2978-
Size int `json:"cluster_size"` // Size is the known size of the cluster
2979-
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
2982+
Name string `json:"name,omitempty"` // Name is the name of the cluster
2983+
Leader string `json:"leader,omitempty"` // Leader is the server name of the cluster leader
2984+
Peer string `json:"peer,omitempty"` // Peer is unique ID of the leader
2985+
Replicas []*PeerInfo `json:"replicas,omitempty"` // Replicas is a list of known peers
2986+
Size int `json:"cluster_size"` // Size is the known size of the cluster
2987+
Pending int `json:"pending"` // Pending is how many RAFT messages are not yet processed
2988+
Snapshot *MetaSnapshotStats `json:"snapshot"` // Snapshot contains meta snapshot statistics
29802989
}
29812990

29822991
// JSInfo has detailed information on JetStream.
@@ -3181,13 +3190,32 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
31813190

31823191
if mg := js.getMetaGroup(); mg != nil {
31833192
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
3193+
entries, bytes := mg.Size()
31843194
jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
31853195
if isLeader {
31863196
jsi.Meta.Replicas = ci.Replicas
31873197
}
31883198
if ipq := s.jsAPIRoutedReqs; ipq != nil {
31893199
jsi.Meta.Pending = ipq.len()
31903200
}
3201+
// Add meta snapshot stats
3202+
jsi.Meta.Snapshot = &MetaSnapshotStats{
3203+
PendingEntries: entries,
3204+
PendingSize: bytes,
3205+
}
3206+
js.mu.RLock()
3207+
cluster := js.cluster
3208+
js.mu.RUnlock()
3209+
if cluster != nil {
3210+
timeNanos := atomic.LoadInt64(&cluster.lastMetaSnapTime)
3211+
durationNanos := atomic.LoadInt64(&cluster.lastMetaSnapDuration)
3212+
if timeNanos > 0 {
3213+
jsi.Meta.Snapshot.LastTime = time.Unix(0, timeNanos).UTC()
3214+
}
3215+
if durationNanos > 0 {
3216+
jsi.Meta.Snapshot.LastDuration = time.Duration(durationNanos)
3217+
}
3218+
}
31913219
}
31923220
}
31933221

server/monitor_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5306,6 +5306,52 @@ func TestMonitorJsz(t *testing.T) {
53065306
t.Fatalf("received cluster info from multiple nodes")
53075307
}
53085308
})
5309+
t.Run("meta-snapshot-stats", func(t *testing.T) {
5310+
for _, url := range []string{monUrl1, monUrl2} {
5311+
info := readJsInfo(url)
5312+
require_True(t, info.Meta != nil)
5313+
require_True(t, info.Meta.Snapshot != nil)
5314+
5315+
snapshot := info.Meta.Snapshot
5316+
5317+
// In case no snapshots have happened there would be some pending entries.
5318+
if snapshot.LastTime.IsZero() {
5319+
require_True(t, snapshot.PendingEntries >= 1)
5320+
require_True(t, snapshot.PendingSize >= 1)
5321+
}
5322+
5323+
// Force meta snapshots on both servers to test snapshot timing.
5324+
for _, srv := range srvs {
5325+
if js := srv.getJetStream(); js != nil {
5326+
if mg := js.getMetaGroup(); mg != nil {
5327+
if snap, err := js.metaSnapshot(); err == nil {
5328+
mg.InstallSnapshot(snap)
5329+
}
5330+
}
5331+
}
5332+
}
5333+
// Wait for snapshot timing to be recorded
5334+
time.Sleep(100 * time.Millisecond)
5335+
5336+
// Get latest stats again.
5337+
info = readJsInfo(url)
5338+
require_True(t, info.Meta != nil)
5339+
require_True(t, info.Meta.Snapshot != nil)
5340+
5341+
snapshot = info.Meta.Snapshot
5342+
5343+
require_True(t, !snapshot.LastTime.IsZero())
5344+
// Assert that snapshot time is in UTC
5345+
require_Equal(t, snapshot.LastTime.Location(), time.UTC)
5346+
5347+
// Assert that duration is non-negative and reasonable
5348+
require_True(t, snapshot.LastDuration >= 0)
5349+
require_True(t, snapshot.LastDuration < 30*time.Second)
5350+
5351+
// Assert that snapshot time is recent.
5352+
require_True(t, time.Since(snapshot.LastTime) < 5*time.Minute)
5353+
}
5354+
})
53095355
t.Run("account-non-existing", func(t *testing.T) {
53105356
for _, url := range []string{monUrl1, monUrl2} {
53115357
info := readJsInfo(url + "?acc=DOES_NOT_EXIST")

0 commit comments

Comments
 (0)