Skip to content

Commit 3ab75d6

Browse files
roachtest: add test for data migration between clusters
Adds a roachtest to verify cluster behavior when data stores are relocated to different nodes on a new cluster. The test: - Starts a source cluster and runs kv workload - Stops the cluster and copies data directory to destination nodes - Restarts cluster on new nodes with copied volumes - Validates data consistency between source and destination clusters This test improves coverage for scenarios where operators need to physically relocate storage volumes between nodes. Epic: none Fixes: #136622 Release note: None
1 parent 6e7f555 commit 3ab75d6

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ go_library(
180180
"sqlalchemy_blocklist.go",
181181
"sqlsmith.go",
182182
"status_server.go",
183+
"stop_and_copy.go",
183184
"sysbench.go",
184185
"tlp.go",
185186
"tombstones.go",
@@ -265,6 +266,7 @@ go_library(
265266
"//pkg/storage",
266267
"//pkg/storage/enginepb",
267268
"//pkg/testutils",
269+
"//pkg/testutils/fingerprintutils",
268270
"//pkg/testutils/floatcmp",
269271
"//pkg/testutils/jobutils",
270272
"//pkg/testutils/release",

pkg/cmd/roachtest/tests/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func RegisterTests(r registry.Registry) {
8686
registerKVScalability(r)
8787
registerKVSplits(r)
8888
registerKVRestartImpact(r)
89+
registerKVStopAndCopy(r)
8990
registerKnex(r)
9091
registerLOQRecovery(r)
9192
registerLargeRange(r)
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package tests
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
14+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
15+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
16+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
17+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
19+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
20+
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
21+
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
22+
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
23+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
24+
"github.com/cockroachdb/errors"
25+
)
26+
27+
const (
28+
nodesPerCluster = 3
29+
dataDir = "/mnt/data1/"
30+
tarFile = dataDir + "cockroach.tar.gz"
31+
)
32+
33+
type multiClusterConfig struct {
34+
srcCluster option.NodeListOption
35+
dstCluster option.NodeListOption
36+
}
37+
38+
func setupClusters(c cluster.Cluster) multiClusterConfig {
39+
return multiClusterConfig{
40+
srcCluster: c.Range(1, nodesPerCluster),
41+
dstCluster: c.Range(nodesPerCluster+1, 2*nodesPerCluster),
42+
}
43+
}
44+
45+
func startCluster(
46+
ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption, skipInit bool,
47+
) error {
48+
clusterSetting := install.MakeClusterSettings()
49+
srcStartOpts := option.NewStartOpts(
50+
option.NoBackupSchedule,
51+
option.WithInitTarget(nodes[0]),
52+
)
53+
if skipInit {
54+
srcStartOpts.RoachprodOpts.SkipInit = true
55+
}
56+
57+
t.Status("starting cluster with nodes ", nodes)
58+
return c.StartE(ctx, t.L(), srcStartOpts, clusterSetting, nodes)
59+
}
60+
61+
func copyAndTransferData(
62+
ctx context.Context, t test.Test, c cluster.Cluster, cfg multiClusterConfig,
63+
) error {
64+
t.Status("creating archive and transferring data to destination cluster")
65+
// This command creates a tar archive on the source cluster.
66+
if err := c.RunE(ctx, option.WithNodes(cfg.srcCluster), "tar", "-czf", tarFile, "-C", dataDir, "cockroach"); err != nil {
67+
return errors.Wrapf(err, "creating tar archive")
68+
}
69+
70+
// Fetch IP address of nodes from destination cluster.
71+
dstNodeIps, err := c.InternalIP(ctx, t.L(), cfg.dstCluster)
72+
if err != nil {
73+
return errors.Wrapf(err, "getting destination IPs")
74+
}
75+
76+
// This code copies data in parallel from source cluster nodes to their corresponding destination nodes.
77+
m := t.NewGroup(task.WithContext(ctx))
78+
for idx, n := range cfg.srcCluster {
79+
node := n
80+
dstIP := dstNodeIps[idx]
81+
m.Go(func(ctx context.Context, l *logger.Logger) error {
82+
return c.RunE(ctx, option.WithNodes(option.NodeListOption{node}), "scp", tarFile, fmt.Sprintf("ubuntu@%s:%s", dstIP, dataDir))
83+
})
84+
}
85+
m.Wait()
86+
87+
// This command extracts the tar archive in the destination data directory.
88+
if err = c.RunE(ctx, option.WithNodes(cfg.dstCluster), "tar", "-xzf", tarFile, "-C", dataDir); err != nil {
89+
return errors.Wrapf(err, "extracting tar archive:")
90+
}
91+
return nil
92+
}
93+
94+
func fingerprintDatabases(
95+
ctx context.Context, t test.Test, c cluster.Cluster, node int,
96+
) (map[string]map[string]int64, error) {
97+
dbConn := c.Conn(ctx, t.L(), node)
98+
defer dbConn.Close()
99+
100+
fingerPrints, err := fingerprintutils.FingerprintAllDatabases(ctx, dbConn, false, fingerprintutils.Stripped())
101+
if err != nil {
102+
return nil, err
103+
}
104+
return fingerPrints, nil
105+
}
106+
107+
// registerKVStopAndCopy tests cluster data persistence and recovery when storage data volumes
108+
// are relocated. It:
109+
// 1. Starts a cluster and runs a workload
110+
// 2. Performs a complete cluster shutdown
111+
// 3. Copies the storage data volumes to different nodes
112+
// 4. Restarts the cluster with the relocated data
113+
func registerKVStopAndCopy(r registry.Registry) {
114+
runStopAndCopy := func(ctx context.Context, t test.Test, c cluster.Cluster) {
115+
cfg := setupClusters(c)
116+
117+
if err := startCluster(ctx, t, c, cfg.srcCluster, false); err != nil {
118+
t.Fatal(err)
119+
}
120+
121+
// Initialize the database.
122+
t.Status("running workload on source cluster")
123+
c.Run(ctx, option.WithNodes(c.Node(1)), "./cockroach workload run kv --init --read-percent=0 --splits=1000 --duration=5m {pgurl:1}")
124+
125+
// Calculate the fingerprints for all databases on the source cluster, excluding the system database.
126+
t.Status("computing databases fingerprint on source cluster")
127+
srcFingerprints, err := fingerprintDatabases(ctx, t, c, cfg.srcCluster[0])
128+
if err != nil {
129+
t.Fatal(err)
130+
}
131+
132+
// Shut down the source cluster.
133+
t.Status("stopping source cluster")
134+
c.Stop(ctx, t.L(), option.DefaultStopOpts(), cfg.srcCluster)
135+
136+
if err := copyAndTransferData(ctx, t, c, cfg); err != nil {
137+
t.Fatal(err)
138+
}
139+
140+
// Start the destination cluster.
141+
if err := startCluster(ctx, t, c, cfg.dstCluster, true); err != nil {
142+
t.Fatal(err)
143+
}
144+
145+
// Calculate the fingerprints for all databases on the destination cluster, excluding the system database.
146+
t.Status("computing databases fingerprint on destination cluster")
147+
dstFingerprints, err := fingerprintDatabases(ctx, t, c, cfg.dstCluster[0])
148+
if err != nil {
149+
t.Fatal(err)
150+
}
151+
152+
// Match the database fingerprints of source and destination cluster.
153+
if err := fingerprintutils.CompareMultipleDatabaseFingerprints(srcFingerprints, dstFingerprints); err != nil {
154+
t.Fatal(err)
155+
}
156+
157+
t.Status("checking for replica divergence on destination cluster")
158+
db := c.Conn(ctx, t.L(), cfg.dstCluster[0], option.VirtualClusterName(install.SystemInterfaceName))
159+
defer db.Close()
160+
err = timeutil.RunWithTimeout(ctx, "consistency check", 20*time.Minute,
161+
func(ctx context.Context) error {
162+
return roachtestutil.CheckReplicaDivergenceOnDB(ctx, t.L(), db)
163+
},
164+
)
165+
if err != nil {
166+
t.Fatal(err)
167+
}
168+
}
169+
170+
r.Add(registry.TestSpec{
171+
Name: "stop-and-copy/nodes=3plus3",
172+
Owner: registry.OwnerKV,
173+
Cluster: r.MakeClusterSpec(2*nodesPerCluster, spec.CPU(8)),
174+
CompatibleClouds: registry.AllClouds,
175+
Suites: registry.Suites(registry.Weekly),
176+
Run: runStopAndCopy,
177+
Timeout: defaultTimeout,
178+
})
179+
}

0 commit comments

Comments
 (0)