Skip to content

Commit efc7f2a

Browse files
perf: optimize deduplication (#3351)
Co-authored-by: Aleksandar Petrov <[email protected]>
1 parent e178664 commit efc7f2a

File tree

9 files changed

+441
-263
lines changed

9 files changed

+441
-263
lines changed

api/gen/proto/go/ingester/v1/ingester.pb.go

+223-212
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/gen/proto/go/ingester/v1/ingester_vtproto.pb.go

+117
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/ingester/v1/ingester.proto

+2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ message MergeSpanProfileResult {
136136
}
137137

138138
message ProfileSets {
139+
// DEPRECATED: Use fingerprints instead.
139140
repeated types.v1.Labels labelsSets = 1;
140141
repeated SeriesProfile profiles = 2;
142+
repeated uint64 fingerprints = 3;
141143
}
142144

143145
message SeriesProfile {

api/openapiv2/gen/phlare.swagger.json

+9-1
Original file line numberDiff line numberDiff line change
@@ -1080,14 +1080,22 @@
10801080
"items": {
10811081
"type": "object",
10821082
"$ref": "#/definitions/v1Labels"
1083-
}
1083+
},
1084+
"description": "DEPRECATED: Use fingerprints instead."
10841085
},
10851086
"profiles": {
10861087
"type": "array",
10871088
"items": {
10881089
"type": "object",
10891090
"$ref": "#/definitions/v1SeriesProfile"
10901091
}
1092+
},
1093+
"fingerprints": {
1094+
"type": "array",
1095+
"items": {
1096+
"type": "string",
1097+
"format": "uint64"
1098+
}
10911099
}
10921100
}
10931101
},

pkg/phlaredb/block_querier.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import (
4949
)
5050

5151
const (
52-
defaultBatchSize = 4096
52+
defaultBatchSize = 64 << 10
5353

5454
// This controls the buffer size for reads to a parquet io.Reader. This value should be small for memory or
5555
// disk backed readers, but when the reader is backed by network storage a larger size will be advantageous.

pkg/phlaredb/filter_profiles_bidi.go

+12-21
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,14 @@ import (
1111
"github.com/prometheus/common/model"
1212

1313
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
14-
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1514
"github.com/grafana/pyroscope/pkg/iter"
16-
phlaremodel "github.com/grafana/pyroscope/pkg/model"
1715
)
1816

1917
type BidiServerMerge[Res any, Req any] interface {
2018
Send(Res) error
2119
Receive() (Req, error)
2220
}
2321

24-
type labelWithIndex struct {
25-
phlaremodel.Labels
26-
index int
27-
}
28-
2922
type ProfileWithIndex struct {
3023
Profile
3124
Index int
@@ -72,8 +65,8 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
7265
defer sp.Finish()
7366
selection := make([][]Profile, len(profiles))
7467
selectProfileResult := &ingestv1.ProfileSets{
75-
Profiles: make([]*ingestv1.SeriesProfile, 0, batchProfileSize),
76-
LabelsSets: make([]*typesv1.Labels, 0, batchProfileSize),
68+
Profiles: make([]*ingestv1.SeriesProfile, 0, batchProfileSize),
69+
Fingerprints: make([]uint64, 0, batchProfileSize),
7770
}
7871
its := make([]iter.Iterator[ProfileWithIndex], len(profiles))
7972
for i, iter := range profiles {
@@ -92,28 +85,26 @@ func filterProfiles[B BidiServerMerge[Res, Req], Res filterResponse, Req filterR
9285
otlog.Int("batch_requested_size", batchProfileSize),
9386
)
9487

95-
seriesByFP := map[model.Fingerprint]labelWithIndex{}
96-
selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0]
88+
seriesByFP := map[model.Fingerprint]int{}
9789
selectProfileResult.Profiles = selectProfileResult.Profiles[:0]
90+
selectProfileResult.Fingerprints = selectProfileResult.Fingerprints[:0]
9891

9992
for _, profile := range batch {
10093
var ok bool
101-
var lblsIdx labelWithIndex
102-
lblsIdx, ok = seriesByFP[profile.Fingerprint()]
94+
var idx int
95+
fp := profile.Fingerprint()
96+
idx, ok = seriesByFP[fp]
10397
if !ok {
104-
lblsIdx = labelWithIndex{
105-
Labels: profile.Labels(),
106-
index: len(selectProfileResult.LabelsSets),
107-
}
108-
seriesByFP[profile.Fingerprint()] = lblsIdx
109-
selectProfileResult.LabelsSets = append(selectProfileResult.LabelsSets, &typesv1.Labels{Labels: profile.Labels()})
98+
idx = len(selectProfileResult.Fingerprints)
99+
seriesByFP[fp] = idx
100+
selectProfileResult.Fingerprints = append(selectProfileResult.Fingerprints, uint64(fp))
110101
}
111102
selectProfileResult.Profiles = append(selectProfileResult.Profiles, &ingestv1.SeriesProfile{
112-
LabelIndex: int32(lblsIdx.index),
103+
LabelIndex: int32(idx),
113104
Timestamp: int64(profile.Timestamp()),
114105
})
115-
116106
}
107+
117108
sp.LogFields(otlog.String("msg", "sending batch to client"))
118109
var err error
119110
switch s := BidiServerMerge[Res, Req](stream).(type) {

pkg/phlaredb/filter_profiles_bidi_test.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/stretchr/testify/require"
1212

1313
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
14-
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
1514
"github.com/grafana/pyroscope/pkg/iter"
1615
phlaremodel "github.com/grafana/pyroscope/pkg/model"
1716
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
@@ -39,34 +38,36 @@ func TestFilterProfiles(t *testing.T) {
3938
require.NoError(t, err)
4039
require.Equal(t, 2, len(filtered[0]))
4140
require.Equal(t, 3, len(bidi.profilesSent))
42-
testhelper.EqualProto(t, []*ingestv1.ProfileSets{
41+
42+
expectedSent := []*ingestv1.ProfileSets{
4343
{
44-
LabelsSets: lo.Times(5, func(i int) *typesv1.Labels {
45-
return &typesv1.Labels{Labels: phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i))}
44+
Fingerprints: lo.Times(5, func(i int) uint64 {
45+
return phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i)).Hash()
4646
}),
4747
Profiles: lo.Times(5, func(i int) *ingestv1.SeriesProfile {
4848
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64(i * int(time.Minute)))), LabelIndex: int32(i)}
4949
}),
5050
},
5151
{
52-
LabelsSets: lo.Times(5, func(i int) *typesv1.Labels {
53-
return &typesv1.Labels{Labels: phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+5))}
52+
Fingerprints: lo.Times(5, func(i int) uint64 {
53+
return phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+5)).Hash()
5454
}),
5555
Profiles: lo.Times(5, func(i int) *ingestv1.SeriesProfile {
5656
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64((i + 5) * int(time.Minute)))), LabelIndex: int32(i)}
5757
}),
5858
},
5959
{
60-
LabelsSets: lo.Times(1, func(i int) *typesv1.Labels {
61-
return &typesv1.Labels{Labels: phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+10))}
60+
Fingerprints: lo.Times(1, func(i int) uint64 {
61+
return phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+10)).Hash()
6262
}),
6363
Profiles: lo.Times(1, func(i int) *ingestv1.SeriesProfile {
6464
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64((i + 10) * int(time.Minute)))), LabelIndex: int32(i)}
6565
}),
6666
},
67-
}, bidi.profilesSent)
67+
}
68+
testhelper.EqualProto(t, expectedSent, bidi.profilesSent)
6869

69-
require.Equal(t, []Profile{
70+
expectedFiltered := []Profile{
7071
ProfileWithLabels{
7172
profile: &schemav1.InMemoryProfile{TimeNanos: int64(5 * int(time.Minute))},
7273
lbs: phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 5)),
@@ -77,5 +78,6 @@ func TestFilterProfiles(t *testing.T) {
7778
lbs: phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 10)),
7879
fp: model.Fingerprint(phlaremodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 10)).Hash()),
7980
},
80-
}, filtered[0])
81+
}
82+
require.Equal(t, expectedFiltered, filtered[0])
8183
}

pkg/phlaredb/phlaredb_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func TestMergeProfilesStacktraces(t *testing.T) {
195195
resp, err := bidi.Receive()
196196
require.NoError(t, err)
197197
require.Nil(t, resp.Result)
198-
require.Len(t, resp.SelectedProfiles.LabelsSets, 1)
198+
require.Len(t, resp.SelectedProfiles.Fingerprints, 1)
199199
require.Len(t, resp.SelectedProfiles.Profiles, 5)
200200

201201
require.NoError(t, bidi.Send(&ingestv1.MergeProfilesStacktracesRequest{
@@ -325,7 +325,7 @@ func TestMergeProfilesPprof(t *testing.T) {
325325
resp, err := bidi.Receive()
326326
require.NoError(t, err)
327327
require.Nil(t, resp.Result)
328-
require.Len(t, resp.SelectedProfiles.LabelsSets, 1)
328+
require.Len(t, resp.SelectedProfiles.Fingerprints, 1)
329329
require.Len(t, resp.SelectedProfiles.Profiles, 5)
330330

331331
require.NoError(t, bidi.Send(&ingestv1.MergeProfilesPprofRequest{

0 commit comments

Comments
 (0)