Skip to content

Commit

Permalink
colexec: fix some benchmark issues due to a recent change
Browse files Browse the repository at this point in the history
In 481387c, which was supposed to be
a no-op, we introduced a "closer registry" which resulted in some
microbenchmark regressions. This was the case because we would reuse the
same `CloserRegistry` for all iterations of the benchmark. Internally,
it is represented as a single slice, so it would grow quite large, and
only when tearing down the benchmark we would clean it up. This
introduced some extra memory usage and allocations which would
artificially slow down the benchmark and is now fixed (by resetting the
registry after each benchmark run).

This change logically makes sense too since it resembles how we use
these objects on the main query path - the registries are pooled and
reused via `vectorizedFlowCreator`.

We also apply the same change to the monitor registry. All benchmarks
using both registries have been adjusted accordingly.

The observed regression on `BenchmarkExternalSort` is now removed:
```
name                                              old time/op    new time/op    delta
ExternalSort/rows=262144/cols=1/spilled=false-24    4.24ms ± 1%    3.11ms ± 2%  -26.76%  (p=0.000 n=10+9)

name                                              old speed      new speed      delta
ExternalSort/rows=262144/cols=1/spilled=false-24   495MB/s ± 1%   675MB/s ± 2%  +36.54%  (p=0.000 n=10+9)

name                                              old alloc/op   new alloc/op   delta
ExternalSort/rows=262144/cols=1/spilled=false-24    13.7MB ± 0%    13.7MB ± 0%   +0.03%  (p=0.000 n=10+9)

name                                              old allocs/op  new allocs/op  delta
ExternalSort/rows=262144/cols=1/spilled=false-24       347 ± 0%       348 ± 0%   +0.29%  (p=0.000 n=10+10)
```

Release note: None
  • Loading branch information
yuzefovich committed Feb 5, 2025
1 parent 82b1a2b commit 904adf9
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 20 deletions.
22 changes: 16 additions & 6 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ const (
// aggType is a helper struct that allows tests to test both the ordered and
// hash aggregators at the same time.
type aggType struct {
new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator
name string
order ordering
new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator
afterEachRun func() // if set, will be called at the end of each benchmark iteration
name string
order ordering
}

var aggTypesWithPartial = []aggType{
Expand Down Expand Up @@ -1155,10 +1156,21 @@ func benchmarkAggregateFunction(
if numSameAggs != 1 {
numSameAggsSuffix = fmt.Sprintf("/numSameAggs=%d", numSameAggs)
}
afterEachRunDefault := func(b *testing.B, op colexecop.Operator) {
if err = op.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
}
b.Run(fmt.Sprintf(
"%s/%s/%s%s/groupSize=%d%s/numInputRows=%d",
fName, agg.name, inputTypesString, numSameAggsSuffix, groupSize, distinctProbString, numInputRows),
func(b *testing.B) {
afterEachRun := afterEachRunDefault
if agg.afterEachRun != nil {
afterEachRun = func(*testing.B, colexecop.Operator) {
agg.afterEachRun()
}
}
// Simulate the scenario when the optimizer has the perfect
// estimate.
estimatedRowCount := uint64(math.Ceil(float64(numInputRows) / float64(groupSize)))
Expand Down Expand Up @@ -1186,9 +1198,7 @@ func benchmarkAggregateFunction(
break
}
}
if err = a.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
afterEachRun(b, a)
source.Reset(ctx)
}
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/colexec/colexecargs/closer_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ func (r *CloserRegistry) Reset() {
}
r.toClose = r.toClose[:0]
}

// BenchmarkReset should only be called from benchmarks in order to prepare the
// registry for the new iteration. This should be used whenever a single
// registry is utilized for the whole benchmark loop.
func (r *CloserRegistry) BenchmarkReset(ctx context.Context) {
r.Close(ctx)
r.Reset()
}
8 changes: 8 additions & 0 deletions pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,11 @@ func (r *MonitorRegistry) Reset() {
r.accounts = r.accounts[:0]
r.monitors = r.monitors[:0]
}

// BenchmarkReset should only be called from benchmarks in order to prepare the
// registry for the new iteration. This should be used whenever a single
// registry is utilized for the whole benchmark loop.
func (r *MonitorRegistry) BenchmarkReset(ctx context.Context) {
r.Close(ctx)
r.Reset()
}
7 changes: 5 additions & 2 deletions pkg/sql/colexec/crossjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,11 @@ func BenchmarkCrossJoiner(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, spillForced := range []bool{false, true} {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced
Expand Down Expand Up @@ -476,6 +478,7 @@ func BenchmarkCrossJoiner(b *testing.B) {
cj.Init(ctx)
for b := cj.Next(); b.Length() > 0; b = cj.Next() {
}
afterEachRun()
}
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func runDistinctBenchmarks(
ctx context.Context,
b *testing.B,
distinctConstructor func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error),
afterEachRun func(),
getNumOrderedCols func(nCols int) int,
namePrefix string,
isExternal bool,
Expand Down Expand Up @@ -611,6 +612,7 @@ func runDistinctBenchmarks(
distinct.Init(ctx)
for b := distinct.Next(); b.Length() > 0; b = distinct.Next() {
}
afterEachRun()
}
b.StopTimer()
})
Expand Down Expand Up @@ -643,6 +645,7 @@ func BenchmarkDistinct(b *testing.B) {
ctx,
b,
distinctConstructor,
func() {},
func(nCols int) int {
return int(float64(nCols) * orderedColsFraction[distinctIdx])
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ func BenchmarkExternalDistinct(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, spillForced := range []bool{false, true} {
for _, maintainOrdering := range []bool{false, true} {
Expand Down Expand Up @@ -305,6 +307,7 @@ func BenchmarkExternalDistinct(b *testing.B) {
&monitorRegistry, &closerRegistry,
)
},
afterEachRun,
func(nCols int) int {
return 0
},
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/colexec/external_hash_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,11 @@ func BenchmarkExternalHashAggregator(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

numRows := []int{coldata.BatchSize(), 64 * coldata.BatchSize(), 4096 * coldata.BatchSize()}
groupSizes := []int{1, 2, 32, 128, coldata.BatchSize()}
Expand Down Expand Up @@ -246,8 +248,9 @@ func BenchmarkExternalHashAggregator(b *testing.B) {
// purposes of this benchmark.
return colexecop.NewNoop(op)
},
name: fmt.Sprintf("spilled=%t", spillForced),
order: unordered,
afterEachRun: afterEachRun,
name: fmt.Sprintf("spilled=%t", spillForced),
order: unordered,
},
aggFn, []*types.T{types.Int}, 1 /* numGroupCol */, groupSize,
0 /* distinctProb */, numInputRows, 0, /* chunkSize */
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,11 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

nCols := 4
for _, typ := range []*types.T{types.Int, types.Bytes} {
Expand Down Expand Up @@ -289,6 +291,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
hj.Init(ctx)
for b := hj.Next(); b.Length() > 0; b = hj.Next() {
}
afterEachRun()
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,11 @@ func BenchmarkExternalSort(b *testing.B) {
queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */)
defer cleanup()
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} {
for _, nCols := range []int{1, 2, 4} {
Expand Down Expand Up @@ -280,6 +282,7 @@ func BenchmarkExternalSort(b *testing.B) {
sorter.Init(ctx)
for out := sorter.Next(); out.Length() != 0; out = sorter.Next() {
}
afterEachRun()
require.Equal(b, spillForced, spilled, fmt.Sprintf(
"expected: spilled=%t\tactual: spilled=%t", spillForced, spilled,
))
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colflow/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ func BenchmarkColBatchScan(b *testing.B) {
evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings())
defer evalCtx.Stop(ctx)
var monitorRegistry colexecargs.MonitorRegistry
defer monitorRegistry.Close(ctx)
var closerRegistry colexecargs.CloserRegistry
defer closerRegistry.Close(ctx)
afterEachRun := func() {
closerRegistry.BenchmarkReset(ctx)
monitorRegistry.BenchmarkReset(ctx)
}

flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Expand Down Expand Up @@ -197,6 +199,7 @@ func BenchmarkColBatchScan(b *testing.B) {
}
}
b.StopTimer()
afterEachRun()
}
})
}
Expand Down

0 comments on commit 904adf9

Please sign in to comment.