Skip to content

Commit 7dd9c20

Browse files
mergify[bot]strawgaterdner
authored
Add latency metrics for logstash async output (#42565) (#42586)
* Add latency metrics for logstash async output * Properly handle per-batch latency (cherry picked from commit accc5e1) Co-authored-by: William Easton <[email protected]> Co-authored-by: Denis <[email protected]>
1 parent e4bddf3 commit 7dd9c20

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

libbeat/outputs/logstash/async.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error {
221221
window[i] = &events[i].Content
222222
}
223223
ref.count.Add(1)
224-
return client.Send(ref.callback, window)
224+
225+
return client.Send(ref.customizedCallback(), window)
225226
}
226227

227228
func (c *asyncClient) getClient() *v2.AsyncClient {
@@ -231,7 +232,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient {
231232
return client
232233
}
233234

234-
func (r *msgRef) callback(n uint32, err error) {
235+
func (r *msgRef) customizedCallback() func(uint32, error) {
236+
start := time.Now()
237+
238+
return func(n uint32, err error) {
239+
r.callback(start, n, err)
240+
}
241+
}
242+
243+
func (r *msgRef) callback(start time.Time, n uint32, err error) {
235244
r.client.observer.AckedEvents(int(n))
236245
r.slice = r.slice[n:]
237246
r.deadlockListener.ack(int(n))
@@ -246,6 +255,11 @@ func (r *msgRef) callback(n uint32, err error) {
246255
r.win.tryGrowWindow(r.batchSize)
247256
}
248257
}
258+
259+
// Report the latency for the batch of events
260+
duration := time.Since(start)
261+
r.client.observer.ReportLatency(duration)
262+
249263
r.dec()
250264
}
251265

0 commit comments

Comments
 (0)