Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 7aba207

Browse files
committed
1ms to the stale marker to guarantee the accuracy
1 parent 1190621 commit 7aba207

File tree

4 files changed

+36
-14
lines changed

4 files changed

+36
-14
lines changed

pkg/remote/client_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010
"testing"
1111
"time"
1212

13-
"github.com/golang/snappy"
1413
"github.com/grafana/xk6-output-prometheus-remote/pkg/stale"
14+
15+
"github.com/golang/snappy"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
1718
prompb "go.buf.build/grpc/go/prometheus/prometheus"

pkg/remotewrite/remotewrite.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Output struct {
2525

2626
config Config
2727
logger logrus.FieldLogger
28+
now func() time.Time
2829
periodicFlusher *output.PeriodicFlusher
2930
tsdb map[metrics.TimeSeries]*seriesWithMeasure
3031
trendStatsResolver map[string]func(*metrics.TrendSink) float64
@@ -55,6 +56,10 @@ func New(params output.Params) (*Output, error) {
5556
o := &Output{
5657
client: wc,
5758
config: config,
59+
// TODO: consider to do this function millisecond-based
60+
// so we don't need to truncate all the time we invoke it.
61+
// Before we should analyze if in some cases is it useful to have it in ns.
62+
now: time.Now,
5863
logger: logger,
5964
tsdb: make(map[metrics.TimeSeries]*seriesWithMeasure),
6065
}
@@ -93,8 +98,7 @@ func (o *Output) Stop() error {
9398
if !o.config.StaleMarkers.Bool {
9499
return nil
95100
}
96-
97-
staleMarkers := o.staleMarkers(time.Now())
101+
staleMarkers := o.staleMarkers()
98102
if len(staleMarkers) < 1 {
99103
o.logger.Debug("No time series to mark as stale")
100104
return nil
@@ -109,10 +113,18 @@ func (o *Output) Stop() error {
109113
}
110114

111115
// staleMarkers maps all the seen time series with a stale marker.
112-
func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries {
113-
timestamp := t.UnixMilli()
114-
staleMarkers := make([]*prompb.TimeSeries, 0, len(o.tsdb))
116+
func (o *Output) staleMarkers() []*prompb.TimeSeries {
117+
// Add 1ms so in the extreme case that the time frame
118+
// between the last and the next flush operation is under-millisecond,
119+
// we can avoid the sample being seen as a duplicate,
120+
// if we force it in the future.
121+
// It is essential because if it overlaps, the remote write discards the last sample,
122+
// so the stale marker and the metric will remain active for the next 5 min
123+
// as the default logic without stale markers.
124+
timestamp := o.now().
125+
Truncate(time.Millisecond).Add(1 * time.Millisecond).UnixMilli()
115126

127+
staleMarkers := make([]*prompb.TimeSeries, 0, len(o.tsdb))
116128
for _, swm := range o.tsdb {
117129
series := swm.MapPrompb()
118130
// series' length is expected to be equal to 1 for most of the cases

pkg/remotewrite/remotewrite_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,11 @@ func TestOutputStaleMarkers(t *testing.T) {
312312
Tags: registry.RootTagSet(),
313313
}
314314

315-
o := Output{}
315+
o := Output{
316+
now: func() time.Time {
317+
return time.Unix(1, 0)
318+
},
319+
}
316320
err := o.setTrendStatsResolver([]string{"p(99)"})
317321
require.NoError(t, err)
318322
trendSink, err := newExtendedTrendSink(o.trendStatsResolver)
@@ -321,27 +325,25 @@ func TestOutputStaleMarkers(t *testing.T) {
321325
o.tsdb = map[metrics.TimeSeries]*seriesWithMeasure{
322326
trendSinkSeries: {
323327
TimeSeries: trendSinkSeries,
324-
Latest: time.Now(),
325-
// TODO: if Measure would be a lighter interface
326-
// then it could be just a mapper mock.
328+
// TODO: if Measure is a lighter interface
329+
// then it can be just a mapper mock.
327330
Measure: trendSink,
328331
},
329332
counterSinkSeries: {
330333
TimeSeries: counterSinkSeries,
331-
Latest: time.Now(),
332334
Measure: &metrics.CounterSink{},
333335
},
334336
}
335337

336-
now := time.Now()
337-
markers := o.staleMarkers(now)
338+
markers := o.staleMarkers()
338339
require.Len(t, markers, 2)
339340

340341
sortByNameLabel(markers)
341342
expNameLabels := []string{"k6_metric1_p99", "k6_metric2_total"}
343+
expTimestamp := time.Unix(1, int64(1*time.Millisecond)).UnixMilli()
342344
for i, expName := range expNameLabels {
343345
assert.Equal(t, expName, markers[i].Labels[0].Value)
344-
assert.Equal(t, now.UnixMilli(), markers[i].Samples[0].Timestamp)
346+
assert.Equal(t, expTimestamp, markers[i].Samples[0].Timestamp)
345347
assert.True(t, math.IsNaN(markers[i].Samples[0].Value), "it isn't a StaleNaN value")
346348
}
347349
}
@@ -360,9 +362,12 @@ func TestOutputStopWithStaleMarkers(t *testing.T) {
360362
logger: logger,
361363
config: Config{
362364
// setting a large interval so it does not trigger
365+
// and the trigger can be inoked only when Stop is
366+
// invoked.
363367
PushInterval: types.NullDurationFrom(1 * time.Hour),
364368
StaleMarkers: null.BoolFrom(tc),
365369
},
370+
now: time.Now,
366371
}
367372

368373
err := o.Start()

pkg/stale/stale.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,9 @@ import "math"
1616
// The value is the same used by the Prometheus package.
1717
// https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants
1818
//
19+
// It isn't imported directly to avoid the direct dependency
20+
// from the big Prometheus project that would bring more
21+
// dependencies.
22+
//
1923
//nolint:gochecknoglobals
2024
var Marker = math.Float64frombits(0x7ff0000000000002)

0 commit comments

Comments
 (0)