Skip to content

executor: add more explain analyze info for hash join spill #59255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,14 @@ func (w *buildWorkerBase) fetchBuildSideRows(ctx context.Context, hashJoinCtx *h
err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, spillHelper)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

err = triggerIntest(2)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/executor/join/hash_join_spill_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type hashJoinSpillHelper struct {

canSpillFlag atomic.Bool

round int

spillTriggeredForTest bool
spillRoundForTest int
spillTriggedInBuildingStageForTest bool
Expand Down Expand Up @@ -109,6 +111,8 @@ func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, prob
helper.probeSpilledRowIdx = append(helper.probeSpilledRowIdx, i)
}

helper.round = 0

// hashJoinExec may be nil in test
if hashJoinExec != nil {
helper.memTracker = hashJoinExec.memTracker
Expand Down Expand Up @@ -389,6 +393,30 @@ func (h *hashJoinSpillHelper) init() {
}
}

func (h *hashJoinSpillHelper) getSpilledPartitionsNum() int {
return len(h.getSpilledPartitions())
}

func (h *hashJoinSpillHelper) getBuildSpillBytes() int64 {
return h.getSpillBytesImpl(h.buildRowsInDisk)
}

func (h *hashJoinSpillHelper) getProbeSpillBytes() int64 {
return h.getSpillBytesImpl(h.probeRowsInDisk)
}

func (*hashJoinSpillHelper) getSpillBytesImpl(disks [][]*chunk.DataInDiskByChunks) int64 {
totalBytes := int64(0)
for _, disk := range disks {
for _, d := range disk {
if d != nil {
totalBytes += d.GetTotalBytesInDisk()
}
}
}
return totalBytes
}

func (h *hashJoinSpillHelper) spillRowTableImpl(partitionsNeedSpill []int, totalReleasedMemory int64) error {
workerNum := len(h.hashJoinExec.BuildWorkers)
errChannel := make(chan error, workerNum)
Expand Down
36 changes: 35 additions & 1 deletion pkg/executor/join/hash_join_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,23 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/execdetails"
)

func writeBytesStatsToString(buf *bytes.Buffer, convertedBytes []int64) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to return string here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to return string here.

deleted

buf.WriteString("[")
for i, byte := range convertedBytes {
if i == 0 {
buf.WriteString(fmt.Sprintf("%.2f", util.ByteToGiB(float64(byte))))
} else {
buf.WriteString(fmt.Sprintf(" %.2f", util.ByteToGiB(float64(byte))))
}
}
buf.WriteString("]")
return buf.String()
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
Expand Down Expand Up @@ -107,6 +121,13 @@ func (s *hashStatistic) String() string {
return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, execdetails.FormatDuration(s.buildTableElapse))
}

type spillStats struct {
round int
totalSpillBytesPerRound []int64
partitionNumPerRound []int
spillBuildBytesPerRound []int64
}

type hashJoinRuntimeStatsV2 struct {
concurrent int
probeCollision int64
Expand All @@ -127,6 +148,8 @@ type hashJoinRuntimeStatsV2 struct {
maxBuildHashTableForCurrentRound int64
maxProbeForCurrentRound int64
maxFetchAndProbeForCurrentRound int64

spill spillStats
}

func setMaxValue(addr *int64, currentValue int64) {
Expand Down Expand Up @@ -194,14 +217,25 @@ func (e *hashJoinRuntimeStatsV2) String() string {
buf.WriteString(execdetails.FormatDuration(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe))))
buf.WriteString(", probe:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxProbe)))
buf.WriteString(", fetch and wait:")
buf.WriteString(", fetch_and_wait:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.maxProbe)))
if e.probeCollision > 0 {
buf.WriteString(", probe_collision:")
buf.WriteString(strconv.FormatInt(e.probeCollision, 10))
}
buf.WriteString("}")
}
if e.spill.round > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that e.spill.round is very large? If so, I don't think all the data should be printed. Instead, some aggregated info may be more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that e.spill.round is very large? If so, I don't think all the data should be printed. Instead, some aggregated info may be more readable.

It will not be very large.

buf.WriteString(", spill:{round:")
buf.WriteString(strconv.Itoa(e.spill.round))
buf.WriteString(", partition_num_per_round:")
fmt.Fprintf(buf, "%v", e.spill.partitionNumPerRound)
buf.WriteString(", total_spill_GiB_per_round:")
writeBytesStatsToString(buf, e.spill.totalSpillBytesPerRound)
buf.WriteString(", build_spill_GiB_per_round:")
writeBytesStatsToString(buf, e.spill.spillBuildBytesPerRound)
buf.WriteString("}")
}
return buf.String()
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,27 @@ func (e *HashJoinV2Exec) reset() {
}
}

func (e *HashJoinV2Exec) collectSpillStats() {
if e.stats == nil || !e.spillHelper.isSpillTriggered() {
return
}

round := e.spillHelper.round
if len(e.stats.spill.totalSpillBytesPerRound) < round+1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible that length + N < round here? If so, for loop makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible that length + N < round here? If so, for loop makes sense

It's impossible.

e.stats.spill.totalSpillBytesPerRound = append(e.stats.spill.totalSpillBytesPerRound, 0)
e.stats.spill.spillBuildBytesPerRound = append(e.stats.spill.spillBuildBytesPerRound, 0)
e.stats.spill.partitionNumPerRound = append(e.stats.spill.partitionNumPerRound, 0)
}

buildSpillBytes := e.spillHelper.getBuildSpillBytes()
probeSpillBytes := e.spillHelper.getProbeSpillBytes()
spilledPartitionNum := e.spillHelper.getSpilledPartitionsNum()

e.stats.spill.spillBuildBytesPerRound[round] += buildSpillBytes
e.stats.spill.totalSpillBytesPerRound[round] += buildSpillBytes + probeSpillBytes
e.stats.spill.partitionNumPerRound[round] = spilledPartitionNum
}

func (e *HashJoinV2Exec) startBuildAndProbe(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -1067,6 +1088,7 @@ func (e *HashJoinV2Exec) startBuildAndProbe(ctx context.Context) {
e.fetchAndProbeHashTable(ctx)

e.waiterWg.Wait()
e.collectSpillStats()
e.reset()

e.spillHelper.spillRoundForTest = max(e.spillHelper.spillRoundForTest, lastRound)
Expand All @@ -1081,6 +1103,7 @@ func (e *HashJoinV2Exec) startBuildAndProbe(ctx context.Context) {
// No more data to restore
return
}
e.spillHelper.round = restoredPartition.round

if e.memTracker.BytesConsumed() != 0 {
e.isMemoryClearedForTest = false
Expand All @@ -1090,6 +1113,10 @@ func (e *HashJoinV2Exec) startBuildAndProbe(ctx context.Context) {
e.restoredBuildInDisk = restoredPartition.buildSideChunks
e.restoredProbeInDisk = restoredPartition.probeSideChunks

if e.stats != nil && e.stats.spill.round < lastRound {
e.stats.spill.round = lastRound
}

e.inRestore = true
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/chunk/chunk_in_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (d *DataInDiskByChunks) Add(chk *Chunk) (err error) {
return
}

// GetTotalBytesInDisk returns total bytes in disk
func (d *DataInDiskByChunks) GetTotalBytesInDisk() int64 {
return d.totalDataSize
}

func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 {
totalChunkNum := len(d.offsetOfEachChunk)
if chkIdx == totalChunkNum-1 {
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ import (
"google.golang.org/protobuf/protoadapt"
)

// ByteNumOneGiB shows how many bytes one GiB contains
const ByteNumOneGiB int64 = 1024 * 1024 * 1024

// ByteToGiB converts Byte to GiB
func ByteToGiB(bytes float64) float64 {
return bytes / float64(ByteNumOneGiB)
}

// SliceToMap converts slice to map
// nolint:unused
func SliceToMap(slice []string) map[string]any {
Expand Down