Skip to content

Commit

Permalink
pkg/profile: Remove pprof_labels column
Browse files Browse the repository at this point in the history
We don't write to it and we don't read it anymore. It's all part of the labels now.
  • Loading branch information
metalmatze committed Sep 20, 2024
1 parent 22f590f commit 45692a1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 126 deletions.
103 changes: 11 additions & 92 deletions pkg/normalizer/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -66,10 +67,8 @@ type Series struct {
}

type NormalizedWriteRawRequest struct {
Series []Series
AllLabelNames []string
AllPprofLabelNames []string
AllPprofNumLabelNames []string
Series []Series
AllLabelNames []string
}

func MetaFromPprof(p *pprofpb.Profile, name string, sampleIndex int) profile.Meta {
Expand Down Expand Up @@ -129,9 +128,7 @@ func WriteRawRequestToArrowRecord(
}

ps, err := schema.GetDynamicParquetSchema(map[string][]string{
profile.ColumnLabels: normalizedRequest.AllLabelNames,
profile.ColumnPprofLabels: normalizedRequest.AllPprofLabelNames,
profile.ColumnPprofNumLabels: normalizedRequest.AllPprofNumLabelNames,
profile.ColumnLabels: normalizedRequest.AllLabelNames,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -310,42 +307,6 @@ func WriteRawRequestToArrowRecord(
}
}
}
case profile.ColumnPprofLabels:
for _, name := range normalizedRequest.AllPprofLabelNames {
cBuilder := b.Field(b.Schema().FieldIndices(col.Name + "." + name)[0]).(*array.BinaryDictionaryBuilder)
for _, series := range normalizedRequest.Series {
for _, sample := range series.Samples {
for _, p := range sample {
for _, ns := range p.Samples {
if val, ok := ns.Label[name]; ok {
if err := cBuilder.AppendString(val); err != nil {
return nil, err
}
} else {
cBuilder.AppendNull()
}
}
}
}
}
}
case profile.ColumnPprofNumLabels:
for _, name := range normalizedRequest.AllPprofNumLabelNames {
cBuilder := b.Field(b.Schema().FieldIndices(col.Name + "." + name)[0]).(*array.Int64Builder)
for _, series := range normalizedRequest.Series {
for _, sample := range series.Samples {
for _, p := range sample {
for _, ns := range p.Samples {
if val, ok := ns.NumLabel[name]; ok {
cBuilder.Append(val)
} else {
cBuilder.AppendNull()
}
}
}
}
}
}
default:
panic(fmt.Sprintf("unknown column: %s", col.Name))
}
Expand Down Expand Up @@ -527,8 +488,6 @@ func serializePprofStacktrace(

func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawRequest) (NormalizedWriteRawRequest, error) {
allLabelNames := make(map[string]struct{})
allPprofLabelNames := make(map[string]struct{})
allPprofNumLabelNames := make(map[string]struct{})

series := make([]Series, 0, len(req.Series))
for _, rawSeries := range req.Series {
Expand Down Expand Up @@ -581,12 +540,12 @@ func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawR
return NormalizedWriteRawRequest{}, status.Errorf(codes.InvalidArgument, "invalid profile: %v", err)
}

// Find all pprof label names and add them to the list of (infrastructure) label names
LabelNamesFromSamples(
ls,
p.StringTable,
p.Sample,
allPprofLabelNames,
allPprofNumLabelNames,
allLabelNames,
)

normalizedProfiles, err := NormalizePprof(ctx, name, ls, p, req.Normalized, sample.ExecutableInfo)
Expand All @@ -603,11 +562,12 @@ func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawR
})
}

allLabelNamesKeys := maps.Keys(allLabelNames)
slices.Sort(allLabelNamesKeys)

return NormalizedWriteRawRequest{
Series: series,
AllLabelNames: sortedKeys(allLabelNames),
AllPprofLabelNames: sortedKeys(allPprofLabelNames),
AllPprofNumLabelNames: sortedKeys(allPprofNumLabelNames),
Series: series,
AllLabelNames: allLabelNamesKeys,
}, nil
}

Expand All @@ -616,7 +576,6 @@ func LabelNamesFromSamples(
stringTable []string,
samples []*pprofpb.Sample,
allLabels map[string]struct{},
allNumLabels map[string]struct{},
) {
labels := map[string]struct{}{}
for _, sample := range samples {
Expand Down Expand Up @@ -648,28 +607,6 @@ func LabelNamesFromSamples(
for labelName := range resLabels {
allLabels[labelName] = struct{}{}
}

for _, sample := range samples {
for _, label := range sample.Label {
key := stringTable[label.Key]
if label.Num != 0 {
key = strutil.SanitizeLabelName(key)
if _, ok := allNumLabels[key]; !ok {
allNumLabels[key] = struct{}{}
}
}
}
}
}

func sortedKeys(m map[string]struct{}) []string {
if len(m) == 0 {
return nil
}

out := maps.Keys(m)
sort.Strings(out)
return out
}

// SampleToParquetRow converts a sample to a Parquet row. The passed labels
Expand Down Expand Up @@ -739,24 +676,6 @@ func SampleToParquetRow(
}
columnIndex++
}
case profile.ColumnPprofLabels:
for _, name := range profileLabelNames {
if value, ok := s.Label[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
} else {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
columnIndex++
}
case profile.ColumnPprofNumLabels:
for _, name := range profileNumLabelNames {
if value, ok := s.NumLabel[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
} else {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
columnIndex++
}
default:
panic(fmt.Errorf("conversion not implement for column: %s", column.Name))
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/parca/parca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ func TestLabels(t *testing.T) {

ts := timestamppb.New(timestamp.Time(1677488315039)) // time_nanos of the profile divided by 1e6
res, err := api.Query(ctx, &querypb.QueryRequest{
GroupBy: &querypb.GroupBy{
Fields: []string{"labels.api"},
},
ReportType: querypb.QueryRequest_REPORT_TYPE_PPROF,
Options: &querypb.QueryRequest_Single{
Single: &querypb.SingleProfile{
Expand All @@ -489,5 +492,5 @@ func TestLabels(t *testing.T) {
}
}
want := map[string]struct{}{"api": {}}
require.Equal(t, want, got, "profile should contain pprof_labels from the original profile only")
require.Equal(t, want, got, "profile should contain labels from the original profile only")
}
8 changes: 4 additions & 4 deletions pkg/parcacol/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ func (q *Querier) Labels(
start := timestamp.FromTime(startTime)
end := timestamp.FromTime(endTime)

filterExpr = append(filterExpr, logicalplan.Col(profile.ColumnTimestamp).Gt(logicalplan.Literal(start)),
logicalplan.Col(profile.ColumnTimestamp).Lt(logicalplan.Literal(end)))
filterExpr = append(filterExpr,
logicalplan.Col(profile.ColumnTimestamp).Gt(logicalplan.Literal(start)),
logicalplan.Col(profile.ColumnTimestamp).Lt(logicalplan.Literal(end)),
)
}

err := q.engine.ScanTable(q.tableName).
Expand Down Expand Up @@ -1287,8 +1289,6 @@ func (q *Querier) findSingle(ctx context.Context, query string, t time.Time) ([]

aggrCols := []logicalplan.Expr{
logicalplan.Col(profile.ColumnStacktrace),
logicalplan.DynCol(profile.ColumnPprofLabels),
logicalplan.DynCol(profile.ColumnPprofNumLabels),
}

totalSum := logicalplan.Sum(logicalplan.Col(profile.ColumnValue))
Expand Down
40 changes: 11 additions & 29 deletions pkg/profile/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ import (
const (
SchemaName = "parca"
// The columns are sorted by their name in the schema too.
ColumnDuration = "duration"
ColumnLabels = "labels"
ColumnName = "name"
ColumnPeriod = "period"
ColumnPeriodType = "period_type"
ColumnPeriodUnit = "period_unit"
ColumnPprofLabels = "pprof_labels"
ColumnPprofNumLabels = "pprof_num_labels"
ColumnSampleType = "sample_type"
ColumnSampleUnit = "sample_unit"
ColumnStacktrace = "stacktrace"
ColumnTimestamp = "timestamp"
ColumnValue = "value"
ColumnDuration = "duration"
ColumnLabels = "labels"
ColumnName = "name"
ColumnPeriod = "period"
ColumnPeriodType = "period_type"
ColumnPeriodUnit = "period_unit"
ColumnSampleType = "sample_type"
ColumnSampleUnit = "sample_unit"
ColumnStacktrace = "stacktrace"
ColumnTimestamp = "timestamp"
ColumnValue = "value"
)

func SchemaDefinition() *schemapb.Schema {
Expand Down Expand Up @@ -83,22 +81,6 @@ func SchemaDefinition() *schemapb.Schema {
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
},
Dynamic: false,
}, {
Name: ColumnPprofLabels,
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_STRING,
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
Nullable: true,
},
Dynamic: true,
}, {
Name: ColumnPprofNumLabels,
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
Nullable: true,
},
Dynamic: true,
}, {
Name: ColumnSampleType,
StorageLayout: &schemapb.StorageLayout{
Expand Down

0 comments on commit 45692a1

Please sign in to comment.