Skip to content

Commit a58e89a

Browse files
craig[bot]stevendanna
andcommitted
Merge #118638
118638: jobs: add job_id index to crdb_internal.jobs virtual table r=stevendanna a=stevendanna To produce rows, `crdb_internal.jobs` queries `crdb_internal.system_jobs`. The latter is very expensive in the abscence of a filter on job ID because it does 2 full scans of the job_info table. This is a backportable change that adds a job_id index to the crdb_internal.jobs virtual table, allowing it to produce a query to crdb_internal.system_jobs that takes advantage of system_job's index on id. We should pursue a more complete fix, perhaps by changing these virtual tables to virtual views, allowing the optimizer to solve many of these problems for us. For example, this change does not substantially improve `SHOW CHANGEFEED JOB <id>` without a follow up commit. Before this change, `SHOW JOB 101` on a fresh demo cluster produced the following scans: ``` Scan /Tenant/2/Table/15/1/10{1-2}, /Tenant/2/Table/15/1/10{2-3}, /Tenant/2/Table/15/1/10{3-4}, /Tenant/2/Table/15/1/10{4-5}, /Tenant/2/Table/15/1/9397966460656353{29-30}, /Tenant/2/Table/15/1/93979664607451545{7-8}, /Tenant/2/Table/15/1/93979664613556224{1-2}, /Tenant/2/Table/15/1/93979664625333043{3-4}, /Tenant/2/Table/15/1/93979664629252096{1-2}, /Tenant/2/Table/15/1/93979664633554534{5-6}, /Tenant/2/Table/15/1/93979664639180800{1-2}, /Tenant/2/Table/15/1/93979664643502899{3-4}, /Tenant/2/Table/15/1/93979664647874150{5-6}, /Tenant/2/Table/15/1/93979664652251955{3-4}, /Tenant/2/Table/15/1/93979664656652697{7-8}, /Tenant/2/Table/15/1/9397966468795924{49-50}, /Tenant/2/Table/15/1/93979664688050995{3-4}, /Tenant/2/Table/15/1/93979664688146022{5-6}, /Tenant/2/Table/15/1/93979664694434201{7-8}, /Tenant/2/Table/15/1/93979664816006758{5-6}, /Tenant/2/Table/15/1/93979664825014681{7-8}, /Tenant/2/Table/15/1/93979664834094694{5-6}, /Tenant/2/Table/15/1/93979664842948608{1-2}, /Tenant/2/Table/15/1/93979664852251443{3-4} Scan /Tenant/2/Table/53/{1-2} Scan /Tenant/2/Table/53/{1-2} ``` Afer, it produces ``` Scan /Tenant/2/Table/15/1/10{1-2} Scan /Tenant/2/Table/53/1/101/"legacy_payload"{-/PrefixEnd} Scan /Tenant/2/Table/53/1/101/"legacy_progress"{-/PrefixEnd} ``` Release note: None Epic: None Co-authored-by: Steven Danna <[email protected]>
2 parents 356b76f + ccd6fd1 commit a58e89a

File tree

6 files changed

+110
-43
lines changed

6 files changed

+110
-43
lines changed

pkg/bench/rttanalysis/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ go_test(
4343
"generate_objects_bench_test.go",
4444
"grant_revoke_bench_test.go",
4545
"grant_revoke_role_bench_test.go",
46+
"jobs_test.go",
4647
"orm_queries_bench_test.go",
4748
"rtt_analysis_test.go",
4849
"system_bench_test.go",
@@ -60,12 +61,15 @@ go_test(
6061
shard_count = 16,
6162
deps = [
6263
"//pkg/base",
64+
"//pkg/jobs/jobspb",
6365
"//pkg/security/securityassets",
6466
"//pkg/security/securitytest",
67+
"//pkg/security/username",
6568
"//pkg/server",
6669
"//pkg/testutils/serverutils",
6770
"//pkg/testutils/sqlutils",
6871
"//pkg/testutils/testcluster",
72+
"//pkg/util/protoutil",
6973
"//pkg/util/randutil",
7074
],
7175
)

pkg/bench/rttanalysis/jobs_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2024 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package rttanalysis
12+
13+
import (
14+
"encoding/hex"
15+
"fmt"
16+
"testing"
17+
18+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
19+
"github.com/cockroachdb/cockroach/pkg/security/username"
20+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
21+
)
22+
23+
func BenchmarkJobs(b *testing.B) { reg.Run(b) }
24+
func init() {
25+
payloadBytes, err := protoutil.Marshal(&jobspb.Payload{
26+
Details: jobspb.WrapPayloadDetails(jobspb.ImportDetails{}),
27+
UsernameProto: username.RootUserName().EncodeProto(),
28+
})
29+
if err != nil {
30+
panic(err)
31+
}
32+
33+
progressBytes, err := protoutil.Marshal(&jobspb.Progress{
34+
Details: jobspb.WrapProgressDetails(jobspb.ImportProgress{}),
35+
Progress: &jobspb.Progress_FractionCompleted{FractionCompleted: 1},
36+
})
37+
if err != nil {
38+
panic(err)
39+
}
40+
41+
setupQueries := []string{
42+
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) (SELECT id, 'legacy_progress', '\\x%s' FROM generate_series(1000, 3000) as id)",
43+
hex.EncodeToString(progressBytes)),
44+
45+
fmt.Sprintf("INSERT INTO system.job_info(job_id, info_key, value) (SELECT id, 'legacy_payload', '\\x%s' FROM generate_series(1000, 3000) as id)",
46+
hex.EncodeToString(payloadBytes)),
47+
"INSERT INTO system.jobs(id, status, created, job_type) (SELECT id, 'succeeded', now(), 'IMPORT' FROM generate_series(1000, 3000) as id)",
48+
}
49+
50+
reg.Register("Jobs", []RoundTripBenchTestCase{
51+
{
52+
SetupEx: setupQueries,
53+
Reset: "DELETE FROM system.jobs WHERE id >= 1000 AND id <= 3000; DELETE FROM system.job_info WHERE job_id >= 1000 AND job_id <= 3000",
54+
Name: "show job",
55+
Stmt: "SHOW JOB 2000",
56+
},
57+
})
58+
}

pkg/bench/rttanalysis/testdata/benchmark_expectations

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ exp,benchmark
6161
23,Grant/grant_all_on_3_tables
6262
19,GrantRole/grant_1_role
6363
25,GrantRole/grant_2_roles
64+
3,Jobs/show_job
6465
3,ORMQueries/activerecord_type_introspection_query
6566
0,ORMQueries/asyncpg_types
6667
0,ORMQueries/column_descriptions_json_agg

pkg/sql/crdb_internal.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ const (
921921
WITH
922922
latestpayload AS (SELECT job_id, value FROM system.job_info AS payload WHERE info_key = 'legacy_payload' ORDER BY written DESC),
923923
latestprogress AS (SELECT job_id, value FROM system.job_info AS progress WHERE info_key = 'legacy_progress' ORDER BY written DESC)
924-
SELECT
924+
SELECT
925925
DISTINCT(id), status, created, payload.value AS payload, progress.value AS progress,
926926
created_by_type, created_by_id, claim_session_id, claim_instance_id, num_runs, last_run, job_type
927927
FROM system.jobs AS j
@@ -1101,6 +1101,7 @@ const (
11011101
// user is not allowed to see.
11021102
jobsQFrom = ` FROM crdb_internal.system_jobs`
11031103
jobsBackoffArgs = `(SELECT $1::FLOAT AS initial_delay, $2::FLOAT AS max_delay) args`
1104+
jobIDFilter = ` WHERE id = $3`
11041105
jobsStatusFilter = ` WHERE status = $3`
11051106
jobsTypeFilter = ` WHERE job_type = $3`
11061107
jobsQuery = jobsQSelect + `, last_run::timestamptz, COALESCE(num_runs, 0), ` + jobs.NextRunClause +
@@ -1133,11 +1134,18 @@ CREATE TABLE crdb_internal.jobs (
11331134
num_runs INT,
11341135
execution_errors STRING[],
11351136
execution_events JSONB,
1137+
INDEX(job_id),
11361138
INDEX(status),
11371139
INDEX(job_type)
11381140
)`,
11391141
comment: `decoded job metadata from crdb_internal.system_jobs (KV scan)`,
11401142
indexes: []virtualIndex{{
1143+
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
1144+
q := jobsQuery + jobIDFilter
1145+
targetID := tree.MustBeDInt(unwrappedConstraint)
1146+
return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetID)
1147+
},
1148+
}, {
11411149
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
11421150
q := jobsQuery + jobsStatusFilter
11431151
targetStatus := tree.MustBeDString(unwrappedConstraint)
@@ -1146,8 +1154,8 @@ CREATE TABLE crdb_internal.jobs (
11461154
}, {
11471155
populate: func(ctx context.Context, unwrappedConstraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
11481156
q := jobsQuery + jobsTypeFilter
1149-
targetStatus := tree.MustBeDString(unwrappedConstraint)
1150-
return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetStatus)
1157+
targetType := tree.MustBeDString(unwrappedConstraint)
1158+
return makeJobsTableRows(ctx, p, addRow, q, p.execCfg.JobRegistry.RetryInitialDelay(), p.execCfg.JobRegistry.RetryMaxDelay(), targetType)
11511159
},
11521160
}},
11531161
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {

pkg/sql/delegate/show_changefeed_jobs.go

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,59 +23,53 @@ func (d *delegator) delegateShowChangefeedJobs(n *tree.ShowChangefeedJobs) (tree
2323
// Note: changefeed_details may contain sensitive credentials in sink_uri. This information is redacted when marshaling
2424
// to JSON in ChangefeedDetails.MarshalJSONPB.
2525
const (
26-
// In 23.1, we can use the job_type column to filter jobs.
27-
queryTarget23_1 = `
28-
crdb_internal.system_jobs
29-
WHERE job_type = 'CHANGEFEED'
30-
`
3126
baseSelectClause = `
3227
WITH payload AS (
33-
SELECT
34-
id,
28+
SELECT
29+
id,
3530
crdb_internal.pb_to_json(
36-
'cockroach.sql.jobs.jobspb.Payload',
31+
'cockroach.sql.jobs.jobspb.Payload',
3732
payload, false, true
38-
)->'changefeed' AS changefeed_details
39-
FROM
40-
%s
41-
)
42-
SELECT
43-
job_id,
44-
description,
45-
user_name,
46-
status,
47-
running_status,
48-
created,
49-
started,
50-
finished,
51-
modified,
52-
high_water_timestamp,
53-
error,
33+
)->'changefeed' AS changefeed_details
34+
FROM
35+
crdb_internal.system_jobs
36+
WHERE job_type = 'CHANGEFEED'%s
37+
)
38+
SELECT
39+
job_id,
40+
description,
41+
user_name,
42+
status,
43+
running_status,
44+
created,
45+
started,
46+
finished,
47+
modified,
48+
high_water_timestamp,
49+
error,
5450
replace(
55-
changefeed_details->>'sink_uri',
51+
changefeed_details->>'sink_uri',
5652
'\u0026', '&'
57-
) AS sink_uri,
53+
) AS sink_uri,
5854
ARRAY (
59-
SELECT
55+
SELECT
6056
concat(
61-
database_name, '.', schema_name, '.',
57+
database_name, '.', schema_name, '.',
6258
name
63-
)
64-
FROM
65-
crdb_internal.tables
66-
WHERE
59+
)
60+
FROM
61+
crdb_internal.tables
62+
WHERE
6763
table_id = ANY (descriptor_ids)
68-
) AS full_table_names,
64+
) AS full_table_names,
6965
changefeed_details->'opts'->>'topics' AS topics,
70-
COALESCE(changefeed_details->'opts'->>'format','json') AS format
71-
FROM
72-
crdb_internal.jobs
66+
COALESCE(changefeed_details->'opts'->>'format','json') AS format
67+
FROM
68+
crdb_internal.jobs
7369
INNER JOIN payload ON id = job_id`
7470
)
7571

76-
selectClause := fmt.Sprintf(baseSelectClause, queryTarget23_1)
77-
78-
var whereClause, orderbyClause string
72+
var whereClause, innerWhereClause, orderbyClause string
7973
if n.Jobs == nil {
8074
// The query intends to present:
8175
// - first all the running jobs sorted in order of start time,
@@ -87,8 +81,10 @@ FROM
8781
} else {
8882
// Limit the jobs displayed to the select statement in n.Jobs.
8983
whereClause = fmt.Sprintf(`WHERE job_id in (%s)`, n.Jobs.String())
84+
innerWhereClause = fmt.Sprintf(` AND id in (%s)`, n.Jobs.String())
9085
}
9186

87+
selectClause := fmt.Sprintf(baseSelectClause, innerWhereClause)
9288
sqlStmt := fmt.Sprintf("%s %s %s", selectClause, whereClause, orderbyClause)
9389

9490
return d.parse(sqlStmt)

0 commit comments

Comments
 (0)