From fbf49372423ff8b4657df4a404c5a183e8ba9914 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 11 Jul 2023 14:40:02 +0100 Subject: [PATCH 1/3] Select profiles per rowGroup in parallel --- pkg/phlaredb/block_querier.go | 101 ++++++++++++++++++++++------------ pkg/phlaredb/query/iters.go | 15 +++++ 2 files changed, 81 insertions(+), 35 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 1211a9ae8..1ecbef39c 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -936,53 +936,84 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params var ( buf [][]parquet.Value ) - - pIt := query.NewBinaryJoinIterator( - 0, - b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"), - b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), - ) - if b.meta.Version >= 2 { - pIt = query.NewBinaryJoinIterator( - 0, - pIt, - b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), - ) buf = make([][]parquet.Value, 3) } else { buf = make([][]parquet.Value, 2) } + pItPerRG := query.NewPerRowGroupIter(b.profiles.file.RowGroups(), func(rowGroups []parquet.RowGroup, rowNumOffset int64) query.Iterator { + pIt := query.NewBinaryJoinIterator( + 0, + b.profiles.columnIter(ctx, rowGroups, rowNumOffset, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"), + b.profiles.columnIter(ctx, rowGroups, rowNumOffset, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), + ) + if b.meta.Version >= 2 { + pIt = query.NewBinaryJoinIterator( + 0, + pIt, + b.profiles.columnIter(ctx, rowGroups, rowNumOffset, "StacktracePartition", nil, "StacktracePartition"), + ) + } + return pIt + }) + + itersCh := make(chan iter.Iterator[Profile], len(pItPerRG)) iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef)) - defer pIt.Close() - - currSeriesIndex := int64(-1) - var currentSeriesSlice []Profile - for pIt.Next() { - res := pIt.At() - buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition") - seriesIndex := buf[0][0].Int64() - if seriesIndex != currSeriesIndex { - currSeriesIndex = seriesIndex + resultsReceived := make(chan struct{}, 0) + go func() { + for it := range itersCh { + iters = append(iters, it) + } + close(resultsReceived) + }() + + g, ctx := errgroup.WithContext(ctx) + + // pull the profiles in parallel per rg + for _, pIt := range pItPerRG { + g.Go(func() error { + defer pIt.Close() + + currSeriesIndex := int64(-1) + var currentSeriesSlice []Profile + for pIt.Next() { + res := pIt.At() + buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition") + seriesIndex := buf[0][0].Int64() + if seriesIndex != currSeriesIndex { + currSeriesIndex = seriesIndex + if len(currentSeriesSlice) > 0 { + itersCh <- iter.NewSliceIterator(currentSeriesSlice) + } + currentSeriesSlice = make([]Profile, 0, 100) + } + + currentSeriesSlice = append(currentSeriesSlice, BlockProfile{ + labels: lblsPerRef[seriesIndex].lbs, + fp: lblsPerRef[seriesIndex].fp, + ts: model.TimeFromUnixNano(buf[1][0].Int64()), + stacktracePartition: retrieveStacktracePartition(buf, 2), + RowNum: res.RowNumber[0], + }) + } if len(currentSeriesSlice) > 0 { - iters = append(iters, iter.NewSliceIterator(currentSeriesSlice)) + itersCh <- iter.NewSliceIterator(currentSeriesSlice) } - currentSeriesSlice = make([]Profile, 0, 100) - } - currentSeriesSlice = append(currentSeriesSlice, BlockProfile{ - labels: lblsPerRef[seriesIndex].lbs, - fp: lblsPerRef[seriesIndex].fp, - ts: model.TimeFromUnixNano(buf[1][0].Int64()), - stacktracePartition: retrieveStacktracePartition(buf, 2), - RowNum: res.RowNumber[0], + return nil + }) } - if len(currentSeriesSlice) > 0 { - iters = append(iters, iter.NewSliceIterator(currentSeriesSlice)) + + // wait for all iters to complete + if err := g.Wait(); err != nil { + return nil, err } + close(itersCh) + <-resultsReceived + return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil } @@ -1146,13 +1177,13 @@ func (r *parquetReader[M, P]) relPath() string { return r.persister.Name() + block.ParquetSuffix } -func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, predicate query.Predicate, alias string) query.Iterator { +func (r *parquetReader[M, P]) columnIter(ctx context.Context, rowGroups []parquet.RowGroup, rowNumOffset int64, columnName string, predicate query.Predicate, alias string) query.Iterator { index, _ := query.GetColumnIndexByPath(r.file, columnName) if index == -1 { return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath())) } ctx = query.AddMetricsToContext(ctx, r.metrics.query) - return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) + return query.NewSyncIteratorWithRowNumOffset(ctx, rowGroups, rowNumOffset, index, columnName, 1000, predicate, alias) } func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] { diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 545740c20..336d412cb 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -19,6 +19,16 @@ import ( const MaxDefinitionLevel = 5 +func NewPerRowGroupIter(rowGroups []parquet.RowGroup, makeIter func(rowGroups []parquet.RowGroup, rowNumOffset int64) Iterator) []Iterator { + iters := make([]Iterator, len(rowGroups)) + var offset int64 + for idx := range iters { + iters[idx] = makeIter(rowGroups[idx:idx+1], offset) + offset += rowGroups[idx].NumRows() + } + return iters +} + // RowNumber is the sequence of row numbers uniquely identifying a value // in a tree of nested columns, starting at the top-level and including // another row number for each level of nesting. -1 is a placeholder @@ -836,11 +846,16 @@ func syncIteratorPoolPut(b []parquet.Value) { } func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator { + return NewSyncIteratorWithRowNumOffset(ctx, rgs, 0, column, columnName, readSize, filter, selectAs) +} + +func NewSyncIteratorWithRowNumOffset(ctx context.Context, rgs []parquet.RowGroup, rowNumOffset int64, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator { // Assign row group bounds. // Lower bound is inclusive // Upper bound is exclusive, points at the first row of the next group rn := EmptyRowNumber() + rn.Skip(rowNumOffset) rgsMin := make([]RowNumber, len(rgs)) rgsMax := make([]RowNumber, len(rgs)) for i, rg := range rgs { From 85250af6070a50f826846921e78ccbf07f99469c Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 11 Jul 2023 14:52:35 +0100 Subject: [PATCH 2/3] WIP --- pkg/phlaredb/block_querier.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 1ecbef39c..626db87d5 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -960,7 +960,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params itersCh := make(chan iter.Iterator[Profile], len(pItPerRG)) iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef)) - resultsReceived := make(chan struct{}, 0) + resultsReceived := make(chan struct{}) go func() { for it := range itersCh { iters = append(iters, it) @@ -971,7 +971,8 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params g, ctx := errgroup.WithContext(ctx) // pull the profiles in parallel per rg - for _, pIt := range pItPerRG { + for idx := range pItPerRG { + pIt := pItPerRG[idx] g.Go(func() error { defer pIt.Close() From 6c4685a4918e9b3df24b388a349f1292fc804350 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 11 Jul 2023 17:20:03 +0100 Subject: [PATCH 3/3] mroe test --- pkg/phlaredb/block_querier.go | 18 ++++----- pkg/phlaredb/block_querier_test.go | 63 ++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 626db87d5..5f5ab8764 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -933,15 +933,6 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params } } - var ( - buf [][]parquet.Value - ) - if b.meta.Version >= 2 { - buf = make([][]parquet.Value, 3) - } else { - buf = make([][]parquet.Value, 2) - } - pItPerRG := query.NewPerRowGroupIter(b.profiles.file.RowGroups(), func(rowGroups []parquet.RowGroup, rowNumOffset int64) query.Iterator { pIt := query.NewBinaryJoinIterator( 0, @@ -976,6 +967,15 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params g.Go(func() error { defer pIt.Close() + var ( + buf [][]parquet.Value + ) + if b.meta.Version >= 2 { + buf = make([][]parquet.Value, 3) + } else { + buf = make([][]parquet.Value, 2) + } + currSeriesIndex := int64(-1) var currentSeriesSlice []Profile for pIt.Next() { diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index 495107d00..32d46c8ce 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -3,6 +3,8 @@ package phlaredb import ( "context" "fmt" + "os" + rpprof "runtime/pprof" "strings" "testing" "time" @@ -191,3 +193,64 @@ func TestBlockCompatability(t *testing.T) { } } + +func BenchmarkSelectMatchingProfilesRealBlock(t *testing.B) { + blockPath := os.Getenv("PROFILES_BLOCK_PATH") + if blockPath == "" { + t.Skip("PROFILES_BLOCK_PATH not set") + } + + bucket, err := filesystem.NewBucket(blockPath) + require.NoError(t, err) + + ctx := context.Background() + metas, err := NewBlockQuerier(ctx, bucket).BlockMetas(ctx) + require.NoError(t, err) + + for _, meta := range metas { + + q := NewSingleBlockQuerierFromMeta(ctx, bucket, meta) + require.NoError(t, q.Open(ctx)) + + profilesTypes, err := q.index.LabelValues("__profile_type__") + require.NoError(t, err) + t.Logf("block %s has %d profile types", meta.ULID.String(), len(profilesTypes)) + + for _, profileType := range profilesTypes { + name := fmt.Sprintf("block-%s-%s", meta.ULID.String(), profileType) + t.Run(name, func(t *testing.B) { + profileTypeParts := strings.Split(profileType, ":") + + it, err := q.SelectMatchingProfiles(ctx, &ingestv1.SelectProfilesRequest{ + LabelSelector: `{namespace="profiles-ops-001"}`, + Start: 0, + End: time.Now().UnixMilli(), + Type: &typesv1.ProfileType{ + Name: profileTypeParts[0], + SampleType: profileTypeParts[1], + SampleUnit: profileTypeParts[2], + PeriodType: profileTypeParts[3], + PeriodUnit: profileTypeParts[4], + }, + }) + require.NoError(t, err) + + f, err := os.Create("heap-after-" + name + ".pprof") + require.NoError(t, err) + + require.NoError(t, rpprof.WriteHeapProfile(f)) + + require.NoError(t, f.Close()) + + // TODO: It would be nice actually comparing the whole profile, but at present the result is not deterministic. + p, err := q.MergePprof(ctx, it) + + var sampleSum int64 + for _, s := range p.Sample { + sampleSum += s.Value[0] + } + t.Logf("profileType=%s sum=%d", profileType, sampleSum) + }) + } + } +}