Skip to content

Commit 1d35377

Browse files
craig[bot]shailendra-patel
andcommitted
Merge #129771
129771: roachtest: add operation for pausing ldr job. r=vidit-bhat,nameisbhaskar a=shailendra-patel On DRT clusters we run chaos operation. Adding support for a new operation, this operation will pause a LDR job running for sometime. This will help uncover issue like replication lag and time ldr takes to recover from it. Epic: none Release note: None Co-authored-by: Shailendra Patel <[email protected]>
2 parents 6318706 + 3a2d5c8 commit 1d35377

File tree

5 files changed

+105
-0
lines changed

5 files changed

+105
-0
lines changed

pkg/cmd/roachtest/operations/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"manual_compaction.go",
1212
"network_partition.go",
1313
"node_kill.go",
14+
"pause_job.go",
1415
"register.go",
1516
"resize.go",
1617
"utils.go",
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2024 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package operations
12+
13+
import (
14+
"context"
15+
"fmt"
16+
"time"
17+
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
19+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/operation"
20+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
21+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
22+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestflags"
23+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
24+
)
25+
26+
type resumePausedJob struct {
27+
jobId string
28+
}
29+
30+
func (r *resumePausedJob) Cleanup(ctx context.Context, o operation.Operation, c cluster.Cluster) {
31+
conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster))
32+
defer conn.Close()
33+
34+
resumeJobStmt := fmt.Sprintf("RESUME JOB %s", r.jobId)
35+
_, err := conn.ExecContext(ctx, resumeJobStmt)
36+
if err != nil {
37+
o.Fatal(err)
38+
}
39+
}
40+
41+
func pauseLDRJob(
42+
ctx context.Context, o operation.Operation, c cluster.Cluster,
43+
) registry.OperationCleanup {
44+
conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster))
45+
defer conn.Close()
46+
47+
//fetch running ldr jobs
48+
jobs, err := conn.QueryContext(ctx, "(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'LOGICAL REPLICATION' AND status = 'running')")
49+
if err != nil {
50+
o.Fatal(err)
51+
}
52+
53+
var jobIds []string
54+
for jobs.Next() {
55+
var jobId string
56+
if err := jobs.Scan(&jobId); err != nil {
57+
o.Fatal(err)
58+
}
59+
jobIds = append(jobIds, jobId)
60+
}
61+
62+
//pick a random ldr job
63+
rng, _ := randutil.NewPseudoRand()
64+
jobId := jobIds[rng.Intn(len(jobIds))]
65+
66+
o.Status(fmt.Sprintf("pausing LDR job %s", jobId))
67+
pauseJobStmt := fmt.Sprintf("PAUSE JOB %s WITH REASON = 'roachtest operation'", jobId)
68+
_, err = conn.ExecContext(ctx, pauseJobStmt)
69+
if err != nil {
70+
o.Fatal(err)
71+
}
72+
73+
o.Status(fmt.Sprintf("paused LDR job %s", jobId))
74+
return &resumePausedJob{
75+
jobId: jobId,
76+
}
77+
}
78+
79+
func registerPauseLDRJob(r registry.Registry) {
80+
r.AddOperation(registry.OperationSpec{
81+
Name: "pause-ldr",
82+
Owner: registry.OwnerDisasterRecovery,
83+
Timeout: 15 * time.Minute,
84+
CompatibleClouds: registry.AllClouds,
85+
Dependencies: []registry.OperationDependency{registry.OperationRequiresLDRJobRunning},
86+
Run: pauseLDRJob,
87+
})
88+
}

pkg/cmd/roachtest/operations/register.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ func RegisterOperations(r registry.Registry) {
2323
registerBackupRestore(r)
2424
registerManualCompaction(r)
2525
registerResize(r)
26+
registerPauseLDRJob(r)
2627
}

pkg/cmd/roachtest/registry/operation_spec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
OperationRequiresPopulatedDatabase
3131
OperationRequiresZeroUnavailableRanges
3232
OperationRequiresZeroUnderreplicatedRanges
33+
OperationRequiresLDRJobRunning
3334
)
3435

3536
// OperationCleanup specifies an operation that

pkg/cmd/roachtest/roachtestutil/operations/dependency.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,20 @@ func CheckDependencies(
8080
if count != 0 {
8181
return false, nil
8282
}
83+
case registry.OperationRequiresLDRJobRunning:
84+
conn := c.Conn(ctx, l, 1, option.VirtualClusterName("system"))
85+
defer conn.Close()
86+
87+
jobsCur, err := conn.QueryContext(ctx, "(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'LOGICAL REPLICATION' AND status = 'running' limit 1)")
88+
if err != nil {
89+
return false, err
90+
}
91+
jobsCur.Next()
92+
var jobId string
93+
_ = jobsCur.Scan(&jobId)
94+
if jobId == "" {
95+
return false, nil
96+
}
8397
default:
8498
panic(fmt.Sprintf("unknown operation dependency %d", dep))
8599
}

0 commit comments

Comments
 (0)