Skip to content

Commit 7eb2bdd

Browse files
authored
Include full queue metrics in the monitoring index (#42439)
Add queue metrics to the Metricbeat monitoring schema so they can be included in standard Agent dashboards. This is the Beats-side half of #42093. Affected metrics are: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`, all within `monitoring.metrics.libbeat.pipeline.queue`.
1 parent 074a201 commit 7eb2bdd

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

CHANGELOG.next.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
129129
- Fix setting unique registry for non beat receivers {issue}42288[42288] {pull}42292[42292]
130130
- The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401]
131131
- Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748]
132+
- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439]
133+
- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439]
132134

133135
*Auditbeat*
134136

libbeat/publisher/pipeline/monitoring.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,6 @@ type metricsObserverVars struct {
7474
eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint
7575
eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters
7676
activeEvents *monitoring.Uint
77-
78-
// queue metrics
79-
queueACKed *monitoring.Uint
80-
queueMaxEvents *monitoring.Uint
81-
percentQueueFull *monitoring.Float
8277
}
8378

8479
func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
@@ -118,19 +113,6 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
118113
// events.dropped counts events that were dropped because errors from
119114
// the output workers exceeded the configured maximum retry count.
120115
eventsDropped: monitoring.NewUint(reg, "events.dropped"),
121-
122-
// (Gauge) queue.max_events measures the maximum number of events the
123-
// queue will accept, or 0 if there is none.
124-
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),
125-
126-
// queue.acked counts events that have been acknowledged by the output
127-
// workers. This includes events that were dropped for fatal errors,
128-
// which are also reported in events.dropped.
129-
queueACKed: monitoring.NewUint(reg, "queue.acked"),
130-
131-
// (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1)
132-
// of the queue's event capacity that is currently filled.
133-
percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"),
134116
},
135117
}
136118
}

metricbeat/module/beat/stats/data.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,40 @@ var (
7171
"write": c.Dict("write", s.Schema{
7272
"bytes": c.Int("bytes"),
7373
"errors": c.Int("errors"),
74+
"latency": c.Dict("latency", s.Schema{
75+
"count": c.Int("count"),
76+
"max": c.Int("max"),
77+
"median": c.Float("median"),
78+
"p99": c.Float("p99"),
79+
}),
7480
}),
7581
}),
7682
"pipeline": c.Dict("pipeline", s.Schema{
7783
"clients": c.Int("clients"),
7884
"queue": c.Dict("queue", s.Schema{
79-
"acked": c.Int("acked"),
8085
"max_events": c.Int("max_events"),
86+
87+
"added": c.Dict("added", s.Schema{
88+
"events": c.Int("events"),
89+
"bytes": c.Int("bytes"),
90+
}),
91+
"consumed": c.Dict("consumed", s.Schema{
92+
"events": c.Int("events"),
93+
"bytes": c.Int("bytes"),
94+
}),
95+
"removed": c.Dict("removed", s.Schema{
96+
"events": c.Int("events"),
97+
"bytes": c.Int("bytes"),
98+
}),
99+
"filled": c.Dict("filled", s.Schema{
100+
"events": c.Int("events"),
101+
"bytes": c.Int("bytes"),
102+
"pct": c.Float("pct"),
103+
}),
104+
105+
// Backwards compatibility: "acked" is the old name for
106+
// "removed.events" and should not be used by new code/dashboards.
107+
"acked": c.Int("acked"),
81108
}),
82109
"events": c.Dict("events", s.Schema{
83110
"active": c.Int("active"),

0 commit comments

Comments
 (0)