From 251d619d7628656b1a13661f02ef5293e408cdb8 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 18 Jun 2024 11:49:41 +0800 Subject: [PATCH] fix: infinite loop in index writer (#3356) --- pkg/model/labels_test.go | 36 +++++++++++++++++++++++++++ pkg/phlaredb/labels/labels.go | 1 + pkg/phlaredb/phlaredb_test.go | 29 +++++++++++++++++++++ pkg/phlaredb/tsdb/index/index.go | 6 +++-- pkg/phlaredb/tsdb/index/index_test.go | 15 +++++++++++ 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/pkg/model/labels_test.go b/pkg/model/labels_test.go index 5e7fdcd26c..c164533cb6 100644 --- a/pkg/model/labels_test.go +++ b/pkg/model/labels_test.go @@ -68,6 +68,42 @@ func TestLabelsUnique(t *testing.T) { } } +func Test_LabelsBuilder_Unique(t *testing.T) { + tests := []struct { + name string + input Labels + add Labels + expected Labels + }{ + { + name: "duplicates in input are preserved", + input: Labels{ + {Name: "unique", Value: "yes"}, + {Name: "unique", Value: "no"}, + }, + add: Labels{ + {Name: "foo", Value: "bar"}, + {Name: "foo", Value: "baz"}, + }, + expected: Labels{ + {Name: "foo", Value: "baz"}, + {Name: "unique", Value: "yes"}, + {Name: "unique", Value: "no"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + builder := NewLabelsBuilder(test.input) + for _, l := range test.add { + builder.Set(l.Name, l.Value) + } + assert.Equal(t, test.expected, builder.Labels()) + }) + } +} + func TestLabels_SessionID_Order(t *testing.T) { input := []Labels{ { diff --git a/pkg/phlaredb/labels/labels.go b/pkg/phlaredb/labels/labels.go index 405955abde..e9592c499a 100644 --- a/pkg/phlaredb/labels/labels.go +++ b/pkg/phlaredb/labels/labels.go @@ -60,6 +60,7 @@ func CreateProfileLabels(enforceOrder bool, p *profilev1.Profile, externalLabels } else { sort.Sort(lbs) } + lbs = lbs.Unique() profilesLabels[pos] = lbs seriesRefs[pos] = model.Fingerprint(lbs.Hash()) diff --git a/pkg/phlaredb/phlaredb_test.go b/pkg/phlaredb/phlaredb_test.go index 603cbe112a..a4a3a71b62 100644 --- a/pkg/phlaredb/phlaredb_test.go +++ b/pkg/phlaredb/phlaredb_test.go @@ -281,6 +281,35 @@ func TestMergeProfilesStacktraces(t *testing.T) { }) } +// See https://github.com/grafana/pyroscope/pull/3356 +func Test_HeadFlush_DuplicateLabels(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + // ingest some sample data + var ( + ctx = testContext(t) + testDir = contextDataDir(ctx) + end = time.Unix(0, int64(time.Hour)) + start = end.Add(-time.Minute) + step = 15 * time.Second + ) + + db, err := New(ctx, Config{ + DataPath: testDir, + MaxBlockDuration: time.Duration(100000) * time.Minute, + }, NoLimit, ctx.localBucketClient) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + ingestProfiles(t, db, cpuProfileGenerator, start.UnixNano(), end.UnixNano(), step, + &typesv1.LabelPair{Name: "namespace", Value: "my-namespace"}, + &typesv1.LabelPair{Name: "pod", Value: "my-pod"}, + &typesv1.LabelPair{Name: "pod", Value: "not-my-pod"}, + ) +} + func TestMergeProfilesPprof(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) diff --git a/pkg/phlaredb/tsdb/index/index.go b/pkg/phlaredb/tsdb/index/index.go index 38c1527f54..31081d97ed 100644 --- a/pkg/phlaredb/tsdb/index/index.go +++ b/pkg/phlaredb/tsdb/index/index.go @@ -942,7 +942,10 @@ func (w *Writer) writePostingsToTmpFiles() error { // using more memory than a single label name can. for len(names) > 0 { if w.labelNames[names[0]]+c > maxPostings { - break + if c > 0 { + break + } + return fmt.Errorf("corruption detected when writing postings to index: label %q has %d uses, but maxPostings is %d", names[0], w.labelNames[names[0]], maxPostings) } batchNames = append(batchNames, names[0]) c += w.labelNames[names[0]] @@ -1016,7 +1019,6 @@ func (w *Writer) writePostingsToTmpFiles() error { return w.ctx.Err() default: } - } return nil } diff --git a/pkg/phlaredb/tsdb/index/index_test.go b/pkg/phlaredb/tsdb/index/index_test.go index d420937217..8f355fc6bc 100644 --- a/pkg/phlaredb/tsdb/index/index_test.go +++ b/pkg/phlaredb/tsdb/index/index_test.go @@ -584,3 +584,18 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) } + +func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) + require.NoError(t, err) + + require.NoError(t, w.AddSymbol("__name__")) + require.NoError(t, w.AddSymbol("metric_1")) + require.NoError(t, w.AddSymbol("metric_2")) + + require.NoError(t, w.AddSeries(0, phlaremodel.LabelsFromStrings("__name__", "metric_1", "__name__", "metric_2"), 0)) + + err = w.Close() + require.Error(t, err) + require.ErrorContains(t, err, "corruption detected when writing postings to index") +}