Skip to content

Commit a42cff1

Browse files
authored
fix hashbuild / shufflehashbuild / indexbuild / fuzzyfilter reset bug. (#19690)
1 parent e4a3281 commit a42cff1

File tree

11 files changed

+42
-64
lines changed

11 files changed

+42
-64
lines changed

pkg/container/pSpool/sender.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ func (ps *PipelineSpool) ReceiveBatch(idx int) (data *batch.Batch, info error) {
106106

107107
next := ps.rs[idx].popNextIndex()
108108
if ps.shardPool[next].dataContent == nil {
109-
ps.csDoneSignal <- struct{}{}
109+
defer func() {
110+
ps.csDoneSignal <- struct{}{}
111+
}()
110112
}
111113
return ps.shardPool[next].dataContent, ps.shardPool[next].errContent
112114
}

pkg/sql/colexec/fuzzyfilter/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ func (fuzzyFilter *FuzzyFilter) getProbeIdx() int {
107107
}
108108

109109
func (fuzzyFilter *FuzzyFilter) Reset(proc *process.Process, pipelineFailed bool, err error) {
110-
message.FinalizeRuntimeFilter(fuzzyFilter.RuntimeFilterSpec, pipelineFailed, err, proc.GetMessageBoard())
110+
runtimeSucceed := fuzzyFilter.ctr.state > HandleRuntimeFilter
111+
112+
message.FinalizeRuntimeFilter(fuzzyFilter.RuntimeFilterSpec, runtimeSucceed, proc.GetMessageBoard())
111113
ctr := &fuzzyFilter.ctr
112114
ctr.state = Build
113115
ctr.collisionCnt = 0

pkg/sql/colexec/hashbuild/build.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ func (hashBuild *HashBuild) Call(proc *process.Process) (vm.CallResult, error) {
9191
panic("wrong joinmap message tag!")
9292
}
9393
message.SendMessage(message.JoinMapMsg{JoinMapPtr: jm, Tag: ap.JoinMapTag}, proc.GetMessageBoard())
94+
ctr.state = SendSucceed
9495

96+
case SendSucceed:
9597
result.Batch = nil
9698
result.Status = vm.ExecStop
9799
analyzer.Output(result.Batch)

pkg/sql/colexec/hashbuild/types.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
BuildHashMap = iota
3131
HandleRuntimeFilter
3232
SendJoinMap
33+
SendSucceed
3334
)
3435

3536
type container struct {
@@ -83,21 +84,15 @@ func (hashBuild *HashBuild) Release() {
8384
}
8485

8586
func (hashBuild *HashBuild) Reset(proc *process.Process, pipelineFailed bool, err error) {
87+
runtimeSucceed := hashBuild.ctr.state > HandleRuntimeFilter
88+
mapSucceed := hashBuild.ctr.state == SendSucceed
89+
90+
hashBuild.ctr.hashmapBuilder.Reset(proc, !mapSucceed)
8691
hashBuild.ctr.state = BuildHashMap
8792
hashBuild.ctr.runtimeFilterIn = false
88-
message.FinalizeRuntimeFilter(hashBuild.RuntimeFilterSpec, pipelineFailed, err, proc.GetMessageBoard())
89-
message.FinalizeJoinMapMessage(proc.GetMessageBoard(), hashBuild.JoinMapTag, false, 0, pipelineFailed, err)
90-
if pipelineFailed || err != nil {
91-
hashBuild.ctr.hashmapBuilder.FreeWithError(proc)
92-
} else {
93-
hashBuild.ctr.hashmapBuilder.Reset(proc)
94-
}
93+
message.FinalizeRuntimeFilter(hashBuild.RuntimeFilterSpec, runtimeSucceed, proc.GetMessageBoard())
94+
message.FinalizeJoinMapMessage(proc.GetMessageBoard(), hashBuild.JoinMapTag, false, 0, mapSucceed)
9595
}
9696
func (hashBuild *HashBuild) Free(proc *process.Process, pipelineFailed bool, err error) {
97-
98-
if pipelineFailed || err != nil {
99-
hashBuild.ctr.hashmapBuilder.FreeWithError(proc)
100-
} else {
101-
hashBuild.ctr.hashmapBuilder.Free(proc)
102-
}
97+
hashBuild.ctr.hashmapBuilder.Free(proc)
10398
}

pkg/sql/colexec/hashmap_util/hashmap_util.go

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (hb *HashmapBuilder) Prepare(Conditions []*plan.Expr, proc *process.Process
7070
hb.vecs = make([][]*vector.Vector, 0)
7171
hb.executor = make([]colexec.ExpressionExecutor, len(Conditions))
7272
hb.keyWidth = 0
73+
hb.InputBatchRowCount = 0
7374
for i, expr := range Conditions {
7475
if _, ok := Conditions[i].Expr.(*pbplan.Expr_Col); !ok {
7576
hb.needDupVec = true
@@ -90,24 +91,25 @@ func (hb *HashmapBuilder) Prepare(Conditions []*plan.Expr, proc *process.Process
9091
return nil
9192
}
9293

93-
func (hb *HashmapBuilder) Reset(proc *process.Process) {
94+
func (hb *HashmapBuilder) Reset(proc *process.Process, hashTableHasNotSent bool) {
95+
if hashTableHasNotSent || hb.InputBatchRowCount == 0 {
96+
hb.FreeHashMapAndBatches(proc)
97+
}
98+
9499
if hb.needDupVec {
95100
for i := range hb.vecs {
96101
for j := range hb.vecs[i] {
97102
hb.vecs[i][j].Free(proc.Mp())
98103
}
99104
}
100105
}
101-
if hb.InputBatchRowCount == 0 {
102-
hb.FreeHashMapAndBatches(proc)
103-
}
104106
hb.InputBatchRowCount = 0
105107
hb.Batches.Reset()
106108
hb.IntHashMap = nil
107109
hb.StrHashMap = nil
108110
hb.vecs = nil
109111
for i := range hb.UniqueJoinKeys {
110-
hb.UniqueJoinKeys[i].CleanOnlyData()
112+
hb.UniqueJoinKeys[i].Free(proc.Mp())
111113
}
112114
hb.UniqueJoinKeys = nil
113115
hb.MultiSels.Free()
@@ -137,8 +139,6 @@ func (hb *HashmapBuilder) Free(proc *process.Process) {
137139
hb.UniqueJoinKeys = nil
138140
}
139141

140-
// hashmap and batches are owned by probe operators
141-
// build operator can only call this when error occurs, or inputbatch rowcount is 0
142142
func (hb *HashmapBuilder) FreeHashMapAndBatches(proc *process.Process) {
143143
if hb.IntHashMap != nil {
144144
hb.IntHashMap.Free()
@@ -151,28 +151,6 @@ func (hb *HashmapBuilder) FreeHashMapAndBatches(proc *process.Process) {
151151
hb.Batches.Clean(proc.Mp())
152152
}
153153

154-
func (hb *HashmapBuilder) FreeWithError(proc *process.Process) {
155-
hb.needDupVec = false
156-
hb.FreeHashMapAndBatches(proc)
157-
hb.MultiSels.Free()
158-
for i := range hb.executor {
159-
if hb.executor[i] != nil {
160-
hb.executor[i].Free()
161-
}
162-
}
163-
hb.executor = nil
164-
for i := range hb.vecs {
165-
for j := range hb.vecs[i] {
166-
hb.vecs[i][j].Free(proc.Mp())
167-
}
168-
}
169-
hb.vecs = nil
170-
for i := range hb.UniqueJoinKeys {
171-
hb.UniqueJoinKeys[i].Free(proc.Mp())
172-
}
173-
hb.UniqueJoinKeys = nil
174-
}
175-
176154
func (hb *HashmapBuilder) evalJoinCondition(proc *process.Process) error {
177155
for idx1 := range hb.Batches.Buf {
178156
tmpVes := make([]*vector.Vector, len(hb.executor))

pkg/sql/colexec/hashmap_util/hashmap_util_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ func TestBuildHashMap(t *testing.T) {
5757
require.NoError(t, err)
5858
require.Less(t, int64(0), hb.GetSize())
5959
require.Less(t, uint64(0), hb.GetGroupCount())
60-
hb.FreeWithError(proc)
60+
hb.Reset(proc, true)
61+
hb.Free(proc)
6162
require.Equal(t, int64(0), proc.Mp().CurrNB())
6263
}
6364

pkg/sql/colexec/indexbuild/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ func (indexBuild *IndexBuild) Release() {
7474
}
7575

7676
func (indexBuild *IndexBuild) Reset(proc *process.Process, pipelineFailed bool, err error) {
77-
message.FinalizeRuntimeFilter(indexBuild.RuntimeFilterSpec, pipelineFailed, err, proc.GetMessageBoard())
77+
runtimeSucceed := indexBuild.ctr.state > HandleRuntimeFilter
78+
79+
message.FinalizeRuntimeFilter(indexBuild.RuntimeFilterSpec, runtimeSucceed, proc.GetMessageBoard())
7880
indexBuild.ctr.state = ReceiveBatch
7981
if indexBuild.ctr.buf != nil {
8082
indexBuild.ctr.buf.CleanOnlyData()

pkg/sql/colexec/shufflebuild/build.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func (shuffleBuild *ShuffleBuild) Call(proc *process.Process) (vm.CallResult, er
9898
jm.IncRef(1)
9999
}
100100
message.SendMessage(message.JoinMapMsg{JoinMapPtr: jm, IsShuffle: true, ShuffleIdx: ap.ShuffleIdx, Tag: ap.JoinMapTag}, proc.GetMessageBoard())
101+
ctr.state = SendSucceed
101102

103+
case SendSucceed:
102104
result.Batch = nil
103105
result.Status = vm.ExecStop
104106
analyzer.Output(result.Batch)

pkg/sql/colexec/shufflebuild/types.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
ReceiveBatch = iota
3131
BuildHashMap
3232
SendJoinMap
33+
SendSucceed
3334
)
3435

3536
type container struct {
@@ -81,22 +82,15 @@ func (shuffleBuild *ShuffleBuild) Release() {
8182
}
8283

8384
func (shuffleBuild *ShuffleBuild) Reset(proc *process.Process, pipelineFailed bool, err error) {
84-
shuffleBuild.ctr.state = ReceiveBatch
85-
message.FinalizeRuntimeFilter(shuffleBuild.RuntimeFilterSpec, pipelineFailed, err, proc.GetMessageBoard())
86-
message.FinalizeJoinMapMessage(proc.GetMessageBoard(), shuffleBuild.JoinMapTag, true, shuffleBuild.ShuffleIdx, pipelineFailed, err)
85+
runtimeSucceed := shuffleBuild.ctr.state > ReceiveBatch
86+
mapSucceed := shuffleBuild.ctr.state == SendSucceed
8787

88-
if pipelineFailed || err != nil {
89-
shuffleBuild.ctr.hashmapBuilder.FreeWithError(proc)
90-
} else {
91-
shuffleBuild.ctr.hashmapBuilder.Reset(proc)
92-
}
88+
shuffleBuild.ctr.hashmapBuilder.Reset(proc, !mapSucceed)
89+
shuffleBuild.ctr.state = ReceiveBatch
90+
message.FinalizeRuntimeFilter(shuffleBuild.RuntimeFilterSpec, runtimeSucceed, proc.GetMessageBoard())
91+
message.FinalizeJoinMapMessage(proc.GetMessageBoard(), shuffleBuild.JoinMapTag, true, shuffleBuild.ShuffleIdx, mapSucceed)
9392
}
9493

9594
func (shuffleBuild *ShuffleBuild) Free(proc *process.Process, pipelineFailed bool, err error) {
96-
97-
if pipelineFailed || err != nil {
98-
shuffleBuild.ctr.hashmapBuilder.FreeWithError(proc)
99-
} else {
100-
shuffleBuild.ctr.hashmapBuilder.Free(proc)
101-
}
95+
shuffleBuild.ctr.hashmapBuilder.Free(proc)
10296
}

pkg/vm/message/joinMapMsg.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ func ReceiveJoinMap(tag int32, isShuffle bool, shuffleIdx int32, mb *MessageBoar
259259
}
260260
}
261261

262-
func FinalizeJoinMapMessage(mb *MessageBoard, tag int32, isShuffle bool, shuffleIdx int32, pipelineFailed bool, err error) {
263-
if pipelineFailed || err != nil {
262+
func FinalizeJoinMapMessage(mb *MessageBoard, tag int32, isShuffle bool, shuffleIdx int32, sendMapSucceed bool) {
263+
if !sendMapSucceed {
264264
SendMessage(JoinMapMsg{JoinMapPtr: nil, IsShuffle: isShuffle, ShuffleIdx: shuffleIdx, Tag: tag}, mb)
265265
}
266266
}

0 commit comments

Comments
 (0)