diff --git a/pkg/sql/colexec/aggregators_test.go b/pkg/sql/colexec/aggregators_test.go index 89cfd844d71..c4cc8498c0d 100644 --- a/pkg/sql/colexec/aggregators_test.go +++ b/pkg/sql/colexec/aggregators_test.go @@ -66,9 +66,10 @@ const ( // aggType is a helper struct that allows tests to test both the ordered and // hash aggregators at the same time. type aggType struct { - new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator - name string - order ordering + new func(context.Context, *colexecagg.NewAggregatorArgs) colexecop.ResettableOperator + afterEachRun func() // if set, will be called at the end of each benchmark iteration + name string + order ordering } var aggTypesWithPartial = []aggType{ @@ -1155,10 +1156,21 @@ func benchmarkAggregateFunction( if numSameAggs != 1 { numSameAggsSuffix = fmt.Sprintf("/numSameAggs=%d", numSameAggs) } + afterEachRunDefault := func(b *testing.B, op colexecop.Operator) { + if err = op.(colexecop.Closer).Close(ctx); err != nil { + b.Fatal(err) + } + } b.Run(fmt.Sprintf( "%s/%s/%s%s/groupSize=%d%s/numInputRows=%d", fName, agg.name, inputTypesString, numSameAggsSuffix, groupSize, distinctProbString, numInputRows), func(b *testing.B) { + afterEachRun := afterEachRunDefault + if agg.afterEachRun != nil { + afterEachRun = func(*testing.B, colexecop.Operator) { + agg.afterEachRun() + } + } // Simulate the scenario when the optimizer has the perfect // estimate. estimatedRowCount := uint64(math.Ceil(float64(numInputRows) / float64(groupSize))) @@ -1186,9 +1198,7 @@ func benchmarkAggregateFunction( break } } - if err = a.(colexecop.Closer).Close(ctx); err != nil { - b.Fatal(err) - } + afterEachRun(b, a) source.Reset(ctx) } }, diff --git a/pkg/sql/colexec/colexecargs/closer_registry.go b/pkg/sql/colexec/colexecargs/closer_registry.go index ce68edcc6af..76caee7e0dc 100644 --- a/pkg/sql/colexec/colexecargs/closer_registry.go +++ b/pkg/sql/colexec/colexecargs/closer_registry.go @@ -61,3 +61,11 @@ func (r *CloserRegistry) Reset() { } r.toClose = r.toClose[:0] } + +// BenchmarkReset should only be called from benchmarks in order to prepare the +// registry for the new iteration. This should be used whenever a single +// registry is utilized for the whole benchmark loop. +func (r *CloserRegistry) BenchmarkReset(ctx context.Context) { + r.Close(ctx) + r.Reset() +} diff --git a/pkg/sql/colexec/colexecargs/monitor_registry.go b/pkg/sql/colexec/colexecargs/monitor_registry.go index c761d14e5ae..0199e4f704a 100644 --- a/pkg/sql/colexec/colexecargs/monitor_registry.go +++ b/pkg/sql/colexec/colexecargs/monitor_registry.go @@ -272,3 +272,11 @@ func (r *MonitorRegistry) Reset() { r.accounts = r.accounts[:0] r.monitors = r.monitors[:0] } + +// BenchmarkReset should only be called from benchmarks in order to prepare the +// registry for the new iteration. This should be used whenever a single +// registry is utilized for the whole benchmark loop. +func (r *MonitorRegistry) BenchmarkReset(ctx context.Context) { + r.Close(ctx) + r.Reset() +} diff --git a/pkg/sql/colexec/crossjoiner_test.go b/pkg/sql/colexec/crossjoiner_test.go index 128a898d2e1..6fe908b2984 100644 --- a/pkg/sql/colexec/crossjoiner_test.go +++ b/pkg/sql/colexec/crossjoiner_test.go @@ -427,9 +427,11 @@ func BenchmarkCrossJoiner(b *testing.B) { queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */) defer cleanup() var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } for _, spillForced := range []bool{false, true} { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = spillForced @@ -476,6 +478,7 @@ func BenchmarkCrossJoiner(b *testing.B) { cj.Init(ctx) for b := cj.Next(); b.Length() > 0; b = cj.Next() { } + afterEachRun() } }) } diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index 645984dd8fa..92908e3e2f9 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -466,6 +466,7 @@ func runDistinctBenchmarks( ctx context.Context, b *testing.B, distinctConstructor func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error), + afterEachRun func(), getNumOrderedCols func(nCols int) int, namePrefix string, isExternal bool, @@ -611,6 +612,7 @@ func runDistinctBenchmarks( distinct.Init(ctx) for b := distinct.Next(); b.Length() > 0; b = distinct.Next() { } + afterEachRun() } b.StopTimer() }) @@ -643,6 +645,7 @@ func BenchmarkDistinct(b *testing.B) { ctx, b, distinctConstructor, + func() {}, func(nCols int) int { return int(float64(nCols) * orderedColsFraction[distinctIdx]) }, diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index bd6f5296726..87f722e35cf 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -274,9 +274,11 @@ func BenchmarkExternalDistinct(b *testing.B) { queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */) defer cleanup() var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } for _, spillForced := range []bool{false, true} { for _, maintainOrdering := range []bool{false, true} { @@ -305,6 +307,7 @@ func BenchmarkExternalDistinct(b *testing.B) { &monitorRegistry, &closerRegistry, ) }, + afterEachRun, func(nCols int) int { return 0 }, diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index 05a05d628a1..b8b68ce73ec 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -213,9 +213,11 @@ func BenchmarkExternalHashAggregator(b *testing.B) { queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */) defer cleanup() var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } numRows := []int{coldata.BatchSize(), 64 * coldata.BatchSize(), 4096 * coldata.BatchSize()} groupSizes := []int{1, 2, 32, 128, coldata.BatchSize()} @@ -246,8 +248,9 @@ func BenchmarkExternalHashAggregator(b *testing.B) { // purposes of this benchmark. return colexecop.NewNoop(op) }, - name: fmt.Sprintf("spilled=%t", spillForced), - order: unordered, + afterEachRun: afterEachRun, + name: fmt.Sprintf("spilled=%t", spillForced), + order: unordered, }, aggFn, []*types.T{types.Int}, 1 /* numGroupCol */, groupSize, 0 /* distinctProb */, numInputRows, 0, /* chunkSize */ diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index a028ab09988..169852957cf 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -234,9 +234,11 @@ func BenchmarkExternalHashJoiner(b *testing.B) { queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */) defer cleanup() var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } nCols := 4 for _, typ := range []*types.T{types.Int, types.Bytes} { @@ -289,6 +291,7 @@ func BenchmarkExternalHashJoiner(b *testing.B) { hj.Init(ctx) for b := hj.Next(); b.Length() > 0; b = hj.Next() { } + afterEachRun() } }) } diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index 36e0f47c03a..aaaf18dd4fe 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -227,9 +227,11 @@ func BenchmarkExternalSort(b *testing.B) { queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(b, false /* inMem */) defer cleanup() var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} { for _, nCols := range []int{1, 2, 4} { @@ -280,6 +282,7 @@ func BenchmarkExternalSort(b *testing.B) { sorter.Init(ctx) for out := sorter.Next(); out.Length() != 0; out = sorter.Next() { } + afterEachRun() require.Equal(b, spillForced, spilled, fmt.Sprintf( "expected: spilled=%t\tactual: spilled=%t", spillForced, spilled, )) diff --git a/pkg/sql/colflow/colbatch_scan_test.go b/pkg/sql/colflow/colbatch_scan_test.go index 51709f0cb81..fea883a9028 100644 --- a/pkg/sql/colflow/colbatch_scan_test.go +++ b/pkg/sql/colflow/colbatch_scan_test.go @@ -159,9 +159,11 @@ func BenchmarkColBatchScan(b *testing.B) { evalCtx := eval.MakeTestingEvalContext(s.ClusterSettings()) defer evalCtx.Stop(ctx) var monitorRegistry colexecargs.MonitorRegistry - defer monitorRegistry.Close(ctx) var closerRegistry colexecargs.CloserRegistry - defer closerRegistry.Close(ctx) + afterEachRun := func() { + closerRegistry.BenchmarkReset(ctx) + monitorRegistry.BenchmarkReset(ctx) + } flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, @@ -197,6 +199,7 @@ func BenchmarkColBatchScan(b *testing.B) { } } b.StopTimer() + afterEachRun() } }) }