Skip to content

Commit

Permalink
disttask: print total count in log (#56759)
Browse files Browse the repository at this point in the history
ref #56733
  • Loading branch information
lance6716 authored Oct 22, 2024
1 parent ef390c2 commit 44f1c14
Showing 1 changed file with 36 additions and 1 deletion.
37 changes: 36 additions & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ func NewWriteIndexToExternalStoragePipeline(
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta)
ctx, copCtx, sessPool, jobID, subtaskID,
tbl, indexes, extStore, srcChkPool, writerCnt,
onClose, memSizePerIndex, reorgMeta,
)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -466,6 +469,8 @@ func (*TableScanTaskSource) String() string {
// TableScanOperator scans table records in given key ranges from kv store.
type TableScanOperator struct {
*operator.AsyncOperator[TableScanTask, IndexRecordChunk]
logger *zap.Logger
totalCount *atomic.Int64
}

// NewTableScanOperator creates a new TableScanOperator.
Expand All @@ -478,6 +483,7 @@ func NewTableScanOperator(
cpMgr *ingest.CheckpointManager,
hintBatchSize int,
) *TableScanOperator {
totalCount := new(atomic.Int64)
pool := workerpool.NewWorkerPool(
"TableScanOperator",
util.DDL,
Expand All @@ -491,13 +497,22 @@ func NewTableScanOperator(
srcChkPool: srcChkPool,
cpMgr: cpMgr,
hintBatchSize: hintBatchSize,
totalCount: totalCount,
}
})
return &TableScanOperator{
AsyncOperator: operator.NewAsyncOperator[TableScanTask, IndexRecordChunk](ctx, pool),
logger: logutil.Logger(ctx),
totalCount: totalCount,
}
}

// Close implements operator.Operator interface.
func (o *TableScanOperator) Close() error {
o.logger.Info("table scan operator total count", zap.Int64("count", o.totalCount.Load()))
return o.AsyncOperator.Close()
}

type tableScanWorker struct {
ctx *OperatorCtx
copCtx copr.CopContext
Expand All @@ -507,6 +522,7 @@ type tableScanWorker struct {

cpMgr *ingest.CheckpointManager
hintBatchSize int
totalCount *atomic.Int64
}

func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecordChunk)) {
Expand Down Expand Up @@ -561,6 +577,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
if w.cpMgr != nil {
w.cpMgr.UpdateTotalKeys(task.ID, srcChk.NumRows(), done)
}
w.totalCount.Add(int64(srcChk.NumRows()))
sender(idxResult)
}
return rs.Close()
Expand All @@ -587,6 +604,8 @@ func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
// WriteExternalStoreOperator writes index records to external storage.
type WriteExternalStoreOperator struct {
*operator.AsyncOperator[IndexRecordChunk, IndexWriteResult]
logger *zap.Logger
totalCount *atomic.Int64
}

// NewWriteExternalStoreOperator creates a new WriteExternalStoreOperator.
Expand Down Expand Up @@ -615,6 +634,7 @@ func NewWriteExternalStoreOperator(
}
}

totalCount := new(atomic.Int64)
pool := workerpool.NewWorkerPool(
"WriteExternalStoreOperator",
util.DDL,
Expand Down Expand Up @@ -644,14 +664,24 @@ func NewWriteExternalStoreOperator(
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
totalCount: totalCount,
},
}
})
return &WriteExternalStoreOperator{
AsyncOperator: operator.NewAsyncOperator[IndexRecordChunk, IndexWriteResult](ctx, pool),
logger: logutil.Logger(ctx),
totalCount: totalCount,
}
}

// Close implements operator.Operator interface.
func (o *WriteExternalStoreOperator) Close() error {
o.logger.Info("write external storage operator total count",
zap.Int64("count", o.totalCount.Load()))
return o.AsyncOperator.Close()
}

// IndexWriteResult contains the result of writing index records to ingest engine.
type IndexWriteResult struct {
ID int
Expand Down Expand Up @@ -798,6 +828,8 @@ type indexIngestBaseWorker struct {

writers []ingest.Writer
srcChunkPool chan *chunk.Chunk
// only available in global sort
totalCount *atomic.Int64
}

func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResult, error) {
Expand All @@ -818,6 +850,9 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", rs.ID))
return result, nil
}
if w.totalCount != nil {
w.totalCount.Add(int64(count))
}
result.Added = count
result.Next = nextKey
if ResultCounterForTest != nil {
Expand Down

0 comments on commit 44f1c14

Please sign in to comment.