Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/sql/appstatspb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (t *TransactionStatistics) Add(other *TransactionStatistics) {
t.RowsRead.Add(other.RowsRead, t.Count, other.Count)
t.BytesRead.Add(other.BytesRead, t.Count, other.Count)
t.RowsWritten.Add(other.RowsWritten, t.Count, other.Count)
t.KVCPUTimeNanos.Add(other.KVCPUTimeNanos, t.Count, other.Count)

t.ExecStats.Add(other.ExecStats)

Expand Down Expand Up @@ -181,6 +182,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.RowsWritten.Add(other.RowsWritten, s.Count, other.Count)
s.KVCPUTimeNanos.Add(other.KVCPUTimeNanos, s.Count, other.Count)
s.Nodes = util.CombineUnique(s.Nodes, other.Nodes)
s.KVNodeIDs = util.CombineUnique(s.KVNodeIDs, other.KVNodeIDs)
s.Regions = util.CombineUnique(s.Regions, other.Regions)
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/appstatspb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ message StatementStatistics {
// system.statement_hints table.
optional int64 stmt_hints_count = 37 [(gogoproto.nullable) = false];

// kv_cpu_time_nanos is the CPU time in nanoseconds spent by KV. This is
// representative of the "synchronous" portion of work performed by KV
// (ex/ not replication related work).
optional NumericStat kv_cpu_time_nanos = 38 [(gogoproto.nullable) = false, (gogoproto.customname) = "KVCPUTimeNanos"];

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!

reserved 13, 14, 17, 18, 19, 20;
Expand Down Expand Up @@ -188,6 +193,11 @@ message TransactionStatistics {

// RowsWritten collects the number of rows written to disk.
optional NumericStat rows_written = 10 [(gogoproto.nullable) = false];

// kv_cpu_time_nanos is the CPU time in nanoseconds spent by KV. This is
// representative of the "synchronous" portion of work performed by KV
// (ex/ not replication related work).
optional NumericStat kv_cpu_time_nanos = 12 [(gogoproto.nullable) = false, (gogoproto.customname) = "KVCPUTimeNanos"];
}


Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,15 +1757,14 @@ type connExecutor struct {
// client to send statements while holding the transaction open.
idleLatency time.Duration

// rowsRead and bytesRead are separate from QueryLevelStats because they are
// accumulated independently since they are always collected, as opposed to
// QueryLevelStats which are sampled.
// rowsRead, bytesRead, kvCPUTimeNanos, and rowsWritten are separate from accumulatedStats
// since they are always collected as opposed to QueryLevelStats which are sampled.
rowsRead int64
bytesRead int64

// rowsWritten tracks the number of rows written (modified) by all
// statements in this txn so far.
rowsWritten int64
rowsWritten int64
kvCPUTimeNanos time.Duration

// rowsWrittenLogged and rowsReadLogged indicates whether we have
// already logged an event about reaching written/read rows setting,
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/hlc"

// This import is needed here to properly inject tree.ValidateJSONPath from
// pkg/util/jsonpath/parser/parse.go.
_ "github.com/cockroachdb/cockroach/pkg/util/jsonpath/parser"
Expand Down Expand Up @@ -2951,6 +2952,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.rowsRead += stats.rowsRead
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten
ex.extraTxnState.kvCPUTimeNanos += stats.kvCPUTimeNanos

if ppInfo := getPausablePortalInfo(planner); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
// We need to ensure that we're using the planner bound to the first-time
Expand Down Expand Up @@ -3339,8 +3341,8 @@ type topLevelQueryStats struct {
// client receiving the PGWire protocol messages (as well as construcing
// those messages).
clientTime time.Duration
// kvCPUTime is the CPU time consumed by KV operations during query execution.
kvCPUTime time.Duration
// kvCPUTimeNanos is the CPU time consumed by KV operations during query execution.
kvCPUTimeNanos time.Duration
// NB: when adding another field here, consider whether
// forwardInnerQueryStats method needs an adjustment.
}
Expand All @@ -3353,7 +3355,7 @@ func (s *topLevelQueryStats) add(other *topLevelQueryStats) {
s.indexRowsWritten += other.indexRowsWritten
s.networkEgressEstimate += other.networkEgressEstimate
s.clientTime += other.clientTime
s.kvCPUTime += other.kvCPUTime
s.kvCPUTimeNanos += other.kvCPUTimeNanos
}

// execWithDistSQLEngine converts a plan to a distributed SQL physical plan and
Expand Down Expand Up @@ -4199,6 +4201,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
ex.extraTxnState.kvCPUTimeNanos = 0

if ex.server.cfg.TestingKnobs.BeforeRestart != nil {
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason)
Expand Down Expand Up @@ -4230,6 +4233,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.idleLatency = 0
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.kvCPUTimeNanos = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
ex.extraTxnState.rowsWrittenLogged = false
Expand Down Expand Up @@ -4342,6 +4346,7 @@ func (ex *connExecutor) recordTransactionFinish(
RowsRead: ex.extraTxnState.rowsRead,
RowsWritten: ex.extraTxnState.rowsWritten,
BytesRead: ex.extraTxnState.bytesRead,
KVCPUTimeNanos: ex.extraTxnState.kvCPUTimeNanos,
Priority: ex.state.mu.priority,
// TODO(107318): add isolation level
// TODO(107318): add qos
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ func forwardInnerQueryStats(f metadataForwarder, stats topLevelQueryStats) {
meta.Metrics.RowsWritten = stats.rowsWritten
meta.Metrics.IndexRowsWritten = stats.indexRowsWritten
meta.Metrics.IndexBytesWritten = stats.indexBytesWritten
meta.Metrics.KVCPUTime = int64(stats.kvCPUTime)
meta.Metrics.KVCPUTime = int64(stats.kvCPUTimeNanos)
// stats.networkEgressEstimate and stats.clientTime are ignored since they
// only matter at the "true" top-level query (and actually should be zero
// here anyway).
Expand Down Expand Up @@ -1676,7 +1676,7 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
r.stats.rowsWritten += meta.Metrics.RowsWritten
r.stats.indexRowsWritten += meta.Metrics.IndexRowsWritten
r.stats.indexBytesWritten += meta.Metrics.IndexBytesWritten
r.stats.kvCPUTime += time.Duration(meta.Metrics.KVCPUTime)
r.stats.kvCPUTimeNanos += time.Duration(meta.Metrics.KVCPUTime)

if sm, ok := r.scanStageEstimateMap[meta.Metrics.StageID]; ok {
sm.rowsRead += uint64(meta.Metrics.RowsRead)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (ex *connExecutor) recordStatementSummary(
).
PlanGist(planner.instrumentation.planGist.String(), planner.instrumentation.planGist.Hash()).
LatencyRecorder(ex.statsCollector).
QueryLevelStats(stats.bytesRead, stats.rowsRead, stats.rowsWritten).
QueryLevelStats(stats.bytesRead, stats.rowsRead, stats.rowsWritten, stats.kvCPUTimeNanos.Nanoseconds()).
ExecStats(queryLevelStats).
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
// converting to strings.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (g *routineGenerator) startInternal(ctx context.Context, txn *kv.Txn) (err
statsBuilder.StatementError(err)
return err
}
statsBuilder.QueryLevelStats(queryStats.bytesRead, queryStats.rowsRead, queryStats.rowsWritten)
statsBuilder.QueryLevelStats(queryStats.bytesRead, queryStats.rowsRead, queryStats.rowsWritten, queryStats.kvCPUTimeNanos.Nanoseconds())
forwardInnerQueryStats(g.p.routineMetadataForwarder, queryStats)
if openCursor {
return cursorHelper.createCursor(g.p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"mean": {{.Float}},
"sqDiff": {{.Float}}
},
"kvCPUTimeNanos": {
"mean": {{.Float}},
"sqDiff": {{.Float}}
},
"nodes": [{{joinInts .IntArray}}],
"kvNodeIds": [{{joinInt32s .Int32Array}}],
"regions": [{{joinStrings .StringArray}}],
Expand Down Expand Up @@ -470,6 +474,10 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"rowsWritten": {
"mean": {{.Float}},
"sqDiff": {{.Float}}
},
"kvCPUTimeNanos": {
"mean": {{.Float}},
"sqDiff": {{.Float}}
}
},
"execution_statistics": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (t *innerTxnStats) jsonFields() jsonFields {
{"bytesRead", (*numericStats)(&t.BytesRead)},
{"rowsRead", (*numericStats)(&t.RowsRead)},
{"rowsWritten", (*numericStats)(&t.RowsWritten)},
{"kvCPUTimeNanos", (*numericStats)(&t.KVCPUTimeNanos)},
}
}

Expand Down Expand Up @@ -335,6 +336,7 @@ func (s *innerStmtStats) jsonFields() jsonFields {
{"bytesRead", (*numericStats)(&s.BytesRead)},
{"rowsRead", (*numericStats)(&s.RowsRead)},
{"rowsWritten", (*numericStats)(&s.RowsWritten)},
{"kvCPUTimeNanos", (*numericStats)(&s.KVCPUTimeNanos)},
{"nodes", (*int64Array)(&s.Nodes)},
{"kvNodeIds", (*int32Array)(&s.KVNodeIDs)},
{"regions", (*stringArray)(&s.Regions)},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (s *Container) RecordStatement(ctx context.Context, value *sqlstats.Recorde
stats.mu.data.BytesRead.Record(stats.mu.data.Count, float64(value.BytesRead))
stats.mu.data.RowsRead.Record(stats.mu.data.Count, float64(value.RowsRead))
stats.mu.data.RowsWritten.Record(stats.mu.data.Count, float64(value.RowsWritten))
stats.mu.data.KVCPUTimeNanos.Record(stats.mu.data.Count, float64(value.KVCPUTimeNanos))
stats.mu.data.LastExecTimestamp = s.getTimeNow()
stats.mu.data.Nodes = util.CombineUnique(stats.mu.data.Nodes, value.Nodes)
stats.mu.data.KVNodeIDs = util.CombineUnique(stats.mu.data.KVNodeIDs, value.KVNodeIDs)
Expand Down Expand Up @@ -239,6 +240,7 @@ func (s *Container) RecordTransaction(ctx context.Context, value *sqlstats.Recor
stats.mu.data.RowsRead.Record(stats.mu.data.Count, float64(value.RowsRead))
stats.mu.data.RowsWritten.Record(stats.mu.data.Count, float64(value.RowsWritten))
stats.mu.data.BytesRead.Record(stats.mu.data.Count, float64(value.BytesRead))
stats.mu.data.KVCPUTimeNanos.Record(stats.mu.data.Count, float64(value.KVCPUTimeNanos.Nanoseconds()))

if value.CollectedExecStats {
stats.mu.data.ExecStats.Count++
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestContainer_Add(t *testing.T) {
BytesRead: 100,
RowsRead: 20,
RowsWritten: 5,
KVCPUTimeNanos: 30,
Failed: true,
Generic: true,
AppliedStmtHints: true,
Expand All @@ -137,6 +138,7 @@ func TestContainer_Add(t *testing.T) {
RowsRead: 20,
RowsWritten: 5,
BytesRead: 100,
KVCPUTimeNanos: 30 * time.Nanosecond,
}
require.NoError(t, src.RecordTransaction(ctx, txnStats))

Expand All @@ -160,6 +162,7 @@ func TestContainer_Add(t *testing.T) {
BytesRead: 60,
RowsRead: 70,
RowsWritten: 80,
KVCPUTimeNanos: 90,
Failed: true,
Generic: true,
AppliedStmtHints: true,
Expand All @@ -177,6 +180,7 @@ func TestContainer_Add(t *testing.T) {
RowsRead: 20,
RowsWritten: 5,
BytesRead: 100,
KVCPUTimeNanos: 90 * time.Nanosecond,
}
require.NoError(t, dest.RecordStatement(ctx, reducedStmtStats))
require.NoError(t, dest.RecordTransaction(ctx, reducedTxnStats))
Expand Down Expand Up @@ -221,6 +225,7 @@ func verifyStmtStatsMultiple(
require.InEpsilon(t, float64(stmtStats.BytesRead), destStmtStats.mu.data.BytesRead.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.RowsRead), destStmtStats.mu.data.RowsRead.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.RowsWritten), destStmtStats.mu.data.RowsWritten.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.KVCPUTimeNanos), destStmtStats.mu.data.KVCPUTimeNanos.Mean, epsilon)
}

// verifyStmtStatsReduced verifies that statement statistics have been properly
Expand All @@ -241,6 +246,7 @@ func verifyStmtStatsReduced(
require.InEpsilon(t, float64(stmtStats.BytesRead)/cnt, destStmtStats.mu.data.BytesRead.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.RowsRead)/cnt, destStmtStats.mu.data.RowsRead.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.RowsWritten)/cnt, destStmtStats.mu.data.RowsWritten.Mean, epsilon)
require.InEpsilon(t, float64(stmtStats.KVCPUTimeNanos)/cnt, destStmtStats.mu.data.KVCPUTimeNanos.Mean, epsilon)
}

// verifyTxnStatsMultiple verifies that transaction statistics have been recorded
Expand All @@ -259,6 +265,7 @@ func verifyTxnStatsMultiple(
require.InEpsilon(t, float64(txnStats.RowsRead), destTxnStats.mu.data.RowsRead.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.RowsWritten), destTxnStats.mu.data.RowsWritten.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.BytesRead), destTxnStats.mu.data.BytesRead.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.KVCPUTimeNanos.Nanoseconds()), destTxnStats.mu.data.KVCPUTimeNanos.Mean, epsilon)
}

// verifyTxnStatsReduced verifies that transaction statistics have been properly
Expand All @@ -277,6 +284,7 @@ func verifyTxnStatsReduced(
require.InEpsilon(t, float64(txnStats.RowsRead)/cnt, destTxnStats.mu.data.RowsRead.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.RowsWritten)/cnt, destTxnStats.mu.data.RowsWritten.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.BytesRead)/cnt, destTxnStats.mu.data.BytesRead.Mean, epsilon)
require.InEpsilon(t, float64(txnStats.KVCPUTimeNanos.Nanoseconds())/cnt, destTxnStats.mu.data.KVCPUTimeNanos.Mean, epsilon)
}

func testMonitor(
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/sqlstats/ssprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type RecordedStmtStats struct {
BytesRead int64
RowsRead int64
RowsWritten int64
KVCPUTimeNanos int64
Nodes []int64
KVNodeIDs []int32
StatementType tree.StatementType
Expand Down Expand Up @@ -123,6 +124,7 @@ type RecordedTxnStats struct {
RowsRead int64
RowsWritten int64
BytesRead int64
KVCPUTimeNanos time.Duration
Priority roachpb.UserPriority
TxnErr error
Application string
Expand Down Expand Up @@ -233,14 +235,15 @@ func (b *RecordedStatementStatsBuilder) LatencyRecorder(
}

func (b *RecordedStatementStatsBuilder) QueryLevelStats(
bytesRead int64, rowsRead int64, rowsWritten int64,
bytesRead int64, rowsRead int64, rowsWritten int64, kvCPUTime int64,
) *RecordedStatementStatsBuilder {
if b == nil {
return b
}
b.stmtStats.BytesRead = bytesRead
b.stmtStats.RowsRead = rowsRead
b.stmtStats.RowsWritten = rowsWritten
b.stmtStats.KVCPUTimeNanos = kvCPUTime
return b
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ui/workspaces/cluster-ui/src/api/statementsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Statistics = {
runLat: NumericStat;
svcLat: NumericStat;
regions: string[];
kvCPUTimeNanos?: NumericStat;
};

type ExecStats = {
Expand Down Expand Up @@ -253,6 +254,7 @@ export function convertStatementRawFormatToAggregatedStatistics(
service_lat: s.statistics.statistics.svcLat,
sql_type: s.metadata.stmtType,
regions: s.statistics.statistics.regions,
kv_cpu_time_nanos: s.statistics.statistics.kvCPUTimeNanos,
},
};
}
16 changes: 16 additions & 0 deletions pkg/ui/workspaces/cluster-ui/src/barCharts/barCharts.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ const cpuBars = [
),
];

const kvCPUTimeBars = [
bar(
"kv-cpu-time",
(d: StatementStatistics) => d.stats.kv_cpu_time_nanos?.mean,
),
];

const maxMemUsageBars = [
bar(
"max-mem-usage",
Expand Down Expand Up @@ -87,6 +94,9 @@ const contentionStdDev = bar(cx("contention-dev"), (d: StatementStatistics) =>
const cpuStdDev = bar(cx("cpu-dev"), (d: StatementStatistics) =>
stdDevLong(d.stats.exec_stats.cpu_sql_nanos, d.stats.exec_stats.count),
);
const kvCPUTimeStdDev = bar(cx("kv-cpu-time-dev"), (d: StatementStatistics) =>
stdDevLong(d.stats.kv_cpu_time_nanos, d.stats.count),
);
const maxMemUsageStdDev = bar(
cx("max-mem-usage-dev"),
(d: StatementStatistics) =>
Expand Down Expand Up @@ -123,6 +133,12 @@ export const cpuBarChart = barChartFactory(
v => Duration(v),
cpuStdDev,
);
export const kvCPUTimeBarChart = barChartFactory(
"grey",
kvCPUTimeBars,
v => Duration(v),
kvCPUTimeStdDev,
);
export const maxMemUsageBarChart = barChartFactory(
"grey",
maxMemUsageBars,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,13 @@ export class StatementDetails extends React.Component<
Duration,
)}
/>
<SummaryCardItem
label="KV CPU Time"
value={formatNumberForDisplay(
stats?.kv_cpu_time_nanos?.mean,
Duration,
)}
/>
<SummaryCardItem
label="Client Wait Time"
value={formatNumberForDisplay(stats?.idle_lat.mean, duration)}
Expand Down
Loading
Loading