Skip to content

Commit bf1e65f

Browse files
committed
Add remote write v2 HA tracker and relabel
Signed-off-by: SungJin1212 <[email protected]>
1 parent ba60d7a commit bf1e65f

File tree

5 files changed

+397
-49
lines changed

5 files changed

+397
-49
lines changed

Diff for: pkg/cortexpbv2/compatv2.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
)
99

1010
// ToWriteRequestV2 converts matched slices of Labels, Samples, and Histograms into a WriteRequest proto.
11-
func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, additionalSymbols ...string) *WriteRequest {
11+
func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, help ...string) *WriteRequest {
1212
st := writev2.NewSymbolTable()
1313
labelRefs := make([][]uint32, 0, len(lbls))
1414
for _, lbl := range lbls {
1515
labelRefs = append(labelRefs, st.SymbolizeLabels(lbl, nil))
1616
}
1717

18-
for _, s := range additionalSymbols {
18+
for _, s := range help {
1919
st.Symbolize(s)
2020
}
2121

Diff for: pkg/distributor/distributor.go

+75-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/common/model"
2121
"github.com/prometheus/prometheus/model/labels"
2222
"github.com/prometheus/prometheus/model/relabel"
23+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2324
"github.com/prometheus/prometheus/scrape"
2425
"github.com/prometheus/prometheus/storage"
2526
"github.com/weaveworks/common/httpgrpc"
@@ -515,6 +516,17 @@ func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 {
515516
return h
516517
}
517518

519+
// Remove the label labelname from a slice of LabelPairs if it exists.
520+
func removeLabelV2(labelName string, labels *labels.Labels) {
521+
for i := 0; i < len(*labels); i++ {
522+
pair := (*labels)[i]
523+
if pair.Name == labelName {
524+
*labels = append((*labels)[:i], (*labels)[i+1:]...)
525+
return
526+
}
527+
}
528+
}
529+
518530
// Remove the label labelname from a slice of LabelPairs if it exists.
519531
func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) {
520532
for i := 0; i < len(*labels); i++ {
@@ -619,11 +631,9 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
619631
nil
620632
}
621633

622-
func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits) ([]uint32, []cortexpbv2.TimeSeries, int64, int64, int64, int64, error, error) {
634+
func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits, b labels.ScratchBuilder, removeReplica bool) ([]uint32, []cortexpbv2.TimeSeries, int64, int64, int64, int64, error, error) {
623635
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeysV2")
624636
defer pSpan.Finish()
625-
626-
b := labels.NewScratchBuilder(0)
627637
// For each timeseries or samples, we compute a hash to distribute across ingesters;
628638
// check each sample/metadata and discard if outside limits.
629639
validatedTimeseries := make([]cortexpbv2.TimeSeries, 0, len(req.Timeseries))
@@ -643,6 +653,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
643653
}
644654
}()
645655

656+
st := writev2.NewSymbolTable()
646657
// For each timeseries, compute a hash to distribute across ingesters;
647658
// check each sample and discard if outside limits.
648659
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
@@ -656,12 +667,37 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
656667
}
657668

658669
lbs := ts.ToLabels(&b, req.Symbols)
659-
las := cortexpb.FromLabelsToLabelAdapters(lbs)
660670

661-
// TODO(Sungjin1212): Implement relabel
662-
// TODO(Sunghin1212): Implement ha tracker
671+
if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
672+
l, _ := relabel.Process(lbs, mrc...)
673+
if len(l) == 0 {
674+
// all labels are gone, samples will be discarded
675+
d.validateMetrics.DiscardedSamples.WithLabelValues(
676+
validation.DroppedByRelabelConfiguration,
677+
userID,
678+
).Add(float64(len(ts.Samples) + len(ts.Histograms)))
679+
680+
// all labels are gone, exemplars will be discarded
681+
d.validateMetrics.DiscardedExemplars.WithLabelValues(
682+
validation.DroppedByRelabelConfiguration,
683+
userID,
684+
).Add(float64(len(ts.Exemplars)))
685+
continue
686+
}
687+
lbs = l
688+
}
689+
690+
// If we found both the cluster and replica labels, we only want to include the cluster label when
691+
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
692+
// series we're trying to dedupe when HA tracking moves over to a different replica.
693+
if removeReplica {
694+
removeLabelV2(limits.HAReplicaLabel, &lbs)
695+
}
663696

664-
if len(las) == 0 {
697+
for _, labelName := range limits.DropLabels {
698+
removeLabelV2(labelName, &lbs)
699+
}
700+
if len(lbs) == 0 {
665701
d.validateMetrics.DiscardedSamples.WithLabelValues(
666702
validation.DroppedByUserConfigurationOverride,
667703
userID,
@@ -674,6 +710,10 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
674710
continue
675711
}
676712

713+
// update label refs
714+
ts.LabelsRefs = st.SymbolizeLabels(lbs, nil)
715+
las := cortexpb.FromLabelsToLabelAdapters(lbs)
716+
677717
// We rely on sorted labels in different places:
678718
// 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
679719
// different tokens, which is bad.
@@ -714,6 +754,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W
714754
validatedHistogramSamples += len(ts.Histograms)
715755
validatedExemplars += len(ts.Exemplars)
716756
}
757+
717758
return seriesKeys, validatedTimeseries, int64(validatedMetadata), int64(validatedFloatSamples), int64(validatedHistogramSamples), int64(validatedExemplars), firstPartialErr, nil
718759
}
719760

@@ -917,12 +958,37 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
917958
}
918959
}
919960

961+
b := labels.NewScratchBuilder(0)
962+
removeReplica := false
920963
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
921964
limits := d.limits.GetOverridesForUser(userID)
922965

923-
// TODO(Sungjin1212): Add ha tracker
966+
if limits.AcceptHASamples && len(req.Timeseries) > 0 {
967+
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, cortexpb.FromLabelsToLabelAdapters(req.Timeseries[0].ToLabels(&b, req.Symbols)))
968+
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
969+
if err != nil {
970+
// TODO(Sungjin1212): reuse timeseries slice
971+
972+
if errors.Is(err, ha.ReplicasNotMatchError{}) {
973+
// These samples have been deduped.
974+
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
975+
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
976+
}
977+
978+
if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
979+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
980+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
981+
}
982+
983+
return nil, err
984+
}
985+
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
986+
if !removeReplica { // False, Nil
987+
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
988+
}
989+
}
924990

925-
seriesKeys, validatedTimeseries, validatedMetadatas, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeysV2(ctx, req, userID, limits)
991+
seriesKeys, validatedTimeseries, validatedMetadatas, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeysV2(ctx, req, userID, limits, b, removeReplica)
926992
if err != nil {
927993
return nil, err
928994
}

0 commit comments

Comments
 (0)