Skip to content

Commit

Permalink
Merge #140477
Browse files Browse the repository at this point in the history
140477: colexec: fix some benchmark issues due to a recent change r=yuzefovich a=yuzefovich

In 481387c, which was supposed to be a no-op, we introduced a "closer registry" which resulted in some microbenchmark regressions. This was the case because we would reuse the same `CloserRegistry` for all iterations of the benchmark. Internally, it is represented as a single slice, so it would grow quite large, and only when tearing down the benchmark we would clean it up. This introduced some extra memory usage and allocations which would artificially slow down the benchmark and is now fixed (by resetting the registry after each benchmark run).

This change logically makes sense too since it resembles how we use these objects on the main query path - the registries are pooled and reused via `vectorizedFlowCreator`.

We also apply the same change to the monitor registry. All benchmarks using both registries have been adjusted accordingly.

The observed regression on `BenchmarkExternalSort` is now removed:
```
name                                              old time/op    new time/op    delta
ExternalSort/rows=262144/cols=1/spilled=false-24    4.24ms ± 1%    3.11ms ± 2%  -26.76%  (p=0.000 n=10+9)

name                                              old speed      new speed      delta
ExternalSort/rows=262144/cols=1/spilled=false-24   495MB/s ± 1%   675MB/s ± 2%  +36.54%  (p=0.000 n=10+9)

name                                              old alloc/op   new alloc/op   delta
ExternalSort/rows=262144/cols=1/spilled=false-24    13.7MB ± 0%    13.7MB ± 0%   +0.03%  (p=0.000 n=10+9)

name                                              old allocs/op  new allocs/op  delta
ExternalSort/rows=262144/cols=1/spilled=false-24       347 ± 0%       348 ± 0%   +0.29%  (p=0.000 n=10+10)
```

Epic: None
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 5, 2025
2 parents d276833 + 904adf9 commit 2e7d5a6
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 20 deletions.
22 changes: 16 additions & 6 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ const (
// aggType is a helper struct that allows tests to test both the ordered and
// hash aggregators at the same time.
type aggType struct {
new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator
name string
order ordering
new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator
afterEachRun func() // if set, will be called at the end of each benchmark iteration
name string
order ordering
}

var aggTypesWithPartial = []aggType{
Expand Down Expand Up @@ -1155,10 +1156,21 @@ func benchmarkAggregateFunction(
if numSameAggs != 1 {
numSameAggsSuffix = fmt.Sprintf("/numSameAggs=%d", numSameAggs)
}
afterEachRunDefault := func(b *testing.B, op colexecop.Operator) {
if err = op.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
}
b.Run(fmt.Sprintf(
"%s/%s/%s%s/groupSize=%d%s/numInputRows=%d",
fName, agg.name, inputTypesString, numSameAggsSuffix, groupSize, distinctProbString, numInputRows),
func(b *testing.B) {
afterEachRun := afterEachRunDefault
if agg.afterEachRun != nil {
afterEachRun = func(*testing.B, colexecop.Operator) {
agg.afterEachRun()
}
}
// Simulate the scenario when the optimizer has the perfect
// estimate.
estimatedRowCount := uint64(math.Ceil(float64(numInputRows) / float64(groupSize)))
Expand Down Expand Up @@ -1186,9 +1198,7 @@ func benchmarkAggregateFunction(
break
}
}
if err = a.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
afterEachRun(b, a)
source.Reset(ctx)
}
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/colexec/colexecargs/closer_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ func (r *CloserRegistry) Reset() {
}
r.toClose = r.toClose[:0]
}

// BenchmarkReset should only be called from benchmarks in order to prepare the
// registry for the new iteration. This should be used whenever a single
// registry is utilized for the whole benchmark loop.
func (r *CloserRegistry) BenchmarkReset(ctx context.Context) {
r.Close(ctx)
r.Reset()
}
8 changes: 8 additions & 0 deletions pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,11 @@ func (r *MonitorRegistry) Reset() {
r.accounts = r.accounts[:0]
r.monitors = r.monitors[:0]
}

// BenchmarkReset should only be called from benchmarks in order to prepare the
// registry for the new iteration. This should be used whenever a single
// registry is utilized for the whole benchmark loop.
func (r *MonitorRegistry) BenchmarkReset(ctx context.Context) {
r.Close(ctx)
r.Reset()
}
7 changes: 5 additions & 2 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,11 @@ func BenchmarkCrossJoiner(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
Expand Down Expand Up @@ -476,6 +478,7 @@ func BenchmarkCrossJoiner(b *testing.B) {
cj.Init(ctx)
for b := cj.Next(); b.Length() > 0; b = cj.Next() {
}
afterEachRun()
}
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func runDistinctBenchmarks(
ctx context.Context,
b *testing.B,
distinctConstructor func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error),
afterEachRun func(),
getNumOrderedCols func(nCols int) int,
namePrefix string,
isExternal bool,
Expand Down Expand Up @@ -611,6 +612,7 @@ func runDistinctBenchmarks(
distinct.Init(ctx)
for b := distinct.Next(); b.Length() > 0; b = distinct.Next() {
}
afterEachRun()
}
b.StopTimer()
})
Expand Down Expand Up @@ -643,6 +645,7 @@ func BenchmarkDistinct(b *testing.B) {
ctx,
b,
distinctConstructor,
func() {},
func(nCols int) int {
return int(float64(nCols) * orderedColsFraction[distinctIdx])
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ func BenchmarkExternalDistinct(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, spillForced := range []bool{false, true} {
for _, maintainOrdering := range []bool{false, true} {
Expand Down Expand Up @@ -305,6 +307,7 @@ func BenchmarkExternalDistinct(b *testing.B) {
&monitorRegistry, &closerRegistry,
)
},
afterEachRun,
func(nCols int) int {
return 0
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/colexec/external_hash_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,11 @@ func BenchmarkExternalHashAggregator(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

numRows := []int{coldata.BatchSize(), 64 * coldata.BatchSize(), 4096 * coldata.BatchSize()}
groupSizes := []int{1, 2, 32, 128, coldata.BatchSize()}
Expand Down Expand Up @@ -246,8 +248,9 @@ func BenchmarkExternalHashAggregator(b *testing.B) {
// purposes of this benchmark.
return colexecop.NewNoop(op)
},
name: fmt.Sprintf("spilled=%t", spillForced),
order: unordered,
afterEachRun: afterEachRun,
name: fmt.Sprintf("spilled=%t", spillForced),
order: unordered,
},
aggFn, []*types.T{types.Int}, 1 /* numGroupCol */, groupSize,
0 /* distinctProb */, numInputRows, 0, /* chunkSize */
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

nCols := 4
for _, typ := range []*types.T{types.Int, types.Bytes} {
Expand Down Expand Up @@ -289,6 +291,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
hj.Init(ctx)
for b := hj.Next(); b.Length() > 0; b = hj.Next() {
}
afterEachRun()
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,11 @@ func BenchmarkExternalSort(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} {
for _, nCols := range []int{1, 2, 4} {
Expand Down Expand Up @@ -280,6 +282,7 @@ func BenchmarkExternalSort(b *testing.B) {
sorter.Init(ctx)
for out := sorter.Next(); out.Length() != 0; out = sorter.Next() {
}
afterEachRun()
require.Equal(b, spillForced, spilled, fmt.Sprintf(
"expected: spilled=%t\tactual: spilled=%t", spillForced, spilled,
))
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colflow/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ func BenchmarkColBatchScan(b *testing.B) {
evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings())
defer evalCtx.Stop(ctx)
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Expand Down Expand Up @@ -197,6 +199,7 @@ func BenchmarkColBatchScan(b *testing.B) {
}
}
b.StopTimer()
afterEachRun()
}
})
}
Expand Down

0 comments on commit 2e7d5a6

Please sign in to comment.