Skip to content

Commit bce8eb6

Browse files
craig[bot]shailendra-patelasg0451
committed
140021: roachtest: add test for data migration between clusters r=tbg a=shailendra-patel Adds a roachtest to verify cluster behavior when data stores are relocated to different nodes on a new cluster. The test: - Starts a source cluster and runs kv workload - Stops the cluster and copies data directory to destination nodes - Restarts cluster on new nodes with copied volumes - Validates data consistency between source and destination clusters This test improves coverage for scenarios where operators need to physically relocate storage volumes between nodes. Epic: none Fixes: #136622 Release note: None 140539: changefeedccl: fix avro schema bug r=wenyihu6 a=asg0451 Fix bug in avro schema generation introduced by #139655, which caused extraneous fields to appear in the schema. Epic: none Release note: None Co-authored-by: Shailendra Patel <[email protected]> Co-authored-by: Miles Frankel <[email protected]>
3 parents 563e5d3 + 3ab75d6 + d4dfbf8 commit bce8eb6

File tree

5 files changed

+222
-3
lines changed

5 files changed

+222
-3
lines changed

pkg/ccl/changefeedccl/avro/avro.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,10 @@ type EnvelopeOpts struct {
180180
type EnvelopeRecord struct {
181181
Record
182182

183-
// TODO: opts should be private
184-
Opts EnvelopeOpts
185-
Before, After, Rec *DataRecord
183+
// NOTE: this struct gets serialized to json, but we still need to be able
184+
// to access these fields outside this package, so we hide them.
185+
Opts EnvelopeOpts `json:"-"`
186+
Before, After, Rec *DataRecord `json:"-"`
186187
}
187188

188189
// typeToSchema converts a database type to an avro field

pkg/ccl/changefeedccl/encoder_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
gosql "database/sql"
1111
"encoding/base64"
12+
gojson "encoding/json"
1213
"fmt"
1314
"math/rand"
1415
"net/url"
@@ -770,6 +771,41 @@ func TestAvroSchemaNamespace(t *testing.T) {
770771
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
771772
}
772773

774+
func TestAvroSchemaHasExpectedTopLevelFields(t *testing.T) {
775+
defer leaktest.AfterTest(t)()
776+
defer log.Scope(t).Close(t)
777+
778+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
779+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
780+
sqlDB.Exec(t, `CREATE DATABASE movr`)
781+
sqlDB.Exec(t, `CREATE TABLE movr.drivers (id INT PRIMARY KEY, name STRING)`)
782+
sqlDB.Exec(t,
783+
`INSERT INTO movr.drivers VALUES (1, 'Alice')`,
784+
)
785+
786+
foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+
787+
`WITH format=%s`, changefeedbase.OptFormatAvro))
788+
defer closeFeed(t, foo)
789+
790+
assertPayloads(t, foo, []string{
791+
`drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"name":{"string":"Alice"}}}}`,
792+
})
793+
794+
reg := foo.(*kafkaFeed).registry
795+
796+
schemaJSON := reg.SchemaForSubject(`drivers-value`)
797+
var schema map[string]any
798+
require.NoError(t, gojson.Unmarshal([]byte(schemaJSON), &schema))
799+
var keys []string
800+
for k := range schema {
801+
keys = append(keys, k)
802+
}
803+
require.ElementsMatch(t, keys, []string{"type", "name", "fields"})
804+
}
805+
806+
cdcTest(t, testFn, feedTestForceSink("kafka"), feedTestUseRootUserConnection)
807+
}
808+
773809
func TestTableNameCollision(t *testing.T) {
774810
defer leaktest.AfterTest(t)()
775811
defer log.Scope(t).Close(t)

pkg/cmd/roachtest/tests/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ go_library(
182182
"sqlalchemy_blocklist.go",
183183
"sqlsmith.go",
184184
"status_server.go",
185+
"stop_and_copy.go",
185186
"sysbench.go",
186187
"tlp.go",
187188
"tombstones.go",
@@ -266,6 +267,7 @@ go_library(
266267
"//pkg/storage",
267268
"//pkg/storage/enginepb",
268269
"//pkg/testutils",
270+
"//pkg/testutils/fingerprintutils",
269271
"//pkg/testutils/floatcmp",
270272
"//pkg/testutils/jobutils",
271273
"//pkg/testutils/release",

pkg/cmd/roachtest/tests/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func RegisterTests(r registry.Registry) {
8686
registerKVScalability(r)
8787
registerKVSplits(r)
8888
registerKVRestartImpact(r)
89+
registerKVStopAndCopy(r)
8990
registerKnex(r)
9091
registerLOQRecovery(r)
9192
registerLargeRange(r)
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package tests
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
14+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
15+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
16+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
17+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
19+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
20+
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
21+
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
22+
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
23+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
24+
"github.com/cockroachdb/errors"
25+
)
26+
27+
const (
28+
nodesPerCluster = 3
29+
dataDir = "/mnt/data1/"
30+
tarFile = dataDir + "cockroach.tar.gz"
31+
)
32+
33+
type multiClusterConfig struct {
34+
srcCluster option.NodeListOption
35+
dstCluster option.NodeListOption
36+
}
37+
38+
func setupClusters(c cluster.Cluster) multiClusterConfig {
39+
return multiClusterConfig{
40+
srcCluster: c.Range(1, nodesPerCluster),
41+
dstCluster: c.Range(nodesPerCluster+1, 2*nodesPerCluster),
42+
}
43+
}
44+
45+
func startCluster(
46+
ctx context.Context, t test.Test, c cluster.Cluster, nodes option.NodeListOption, skipInit bool,
47+
) error {
48+
clusterSetting := install.MakeClusterSettings()
49+
srcStartOpts := option.NewStartOpts(
50+
option.NoBackupSchedule,
51+
option.WithInitTarget(nodes[0]),
52+
)
53+
if skipInit {
54+
srcStartOpts.RoachprodOpts.SkipInit = true
55+
}
56+
57+
t.Status("starting cluster with nodes ", nodes)
58+
return c.StartE(ctx, t.L(), srcStartOpts, clusterSetting, nodes)
59+
}
60+
61+
func copyAndTransferData(
62+
ctx context.Context, t test.Test, c cluster.Cluster, cfg multiClusterConfig,
63+
) error {
64+
t.Status("creating archive and transferring data to destination cluster")
65+
// This command creates a tar archive on the source cluster.
66+
if err := c.RunE(ctx, option.WithNodes(cfg.srcCluster), "tar", "-czf", tarFile, "-C", dataDir, "cockroach"); err != nil {
67+
return errors.Wrapf(err, "creating tar archive")
68+
}
69+
70+
// Fetch IP address of nodes from destination cluster.
71+
dstNodeIps, err := c.InternalIP(ctx, t.L(), cfg.dstCluster)
72+
if err != nil {
73+
return errors.Wrapf(err, "getting destination IPs")
74+
}
75+
76+
// This code copies data in parallel from source cluster nodes to their corresponding destination nodes.
77+
m := t.NewGroup(task.WithContext(ctx))
78+
for idx, n := range cfg.srcCluster {
79+
node := n
80+
dstIP := dstNodeIps[idx]
81+
m.Go(func(ctx context.Context, l *logger.Logger) error {
82+
return c.RunE(ctx, option.WithNodes(option.NodeListOption{node}), "scp", tarFile, fmt.Sprintf("ubuntu@%s:%s", dstIP, dataDir))
83+
})
84+
}
85+
m.Wait()
86+
87+
// This command extracts the tar archive in the destination data directory.
88+
if err = c.RunE(ctx, option.WithNodes(cfg.dstCluster), "tar", "-xzf", tarFile, "-C", dataDir); err != nil {
89+
return errors.Wrapf(err, "extracting tar archive:")
90+
}
91+
return nil
92+
}
93+
94+
func fingerprintDatabases(
95+
ctx context.Context, t test.Test, c cluster.Cluster, node int,
96+
) (map[string]map[string]int64, error) {
97+
dbConn := c.Conn(ctx, t.L(), node)
98+
defer dbConn.Close()
99+
100+
fingerPrints, err := fingerprintutils.FingerprintAllDatabases(ctx, dbConn, false, fingerprintutils.Stripped())
101+
if err != nil {
102+
return nil, err
103+
}
104+
return fingerPrints, nil
105+
}
106+
107+
// registerKVStopAndCopy tests cluster data persistence and recovery when storage data volumes
108+
// are relocated. It:
109+
// 1. Starts a cluster and runs a workload
110+
// 2. Performs a complete cluster shutdown
111+
// 3. Copies the storage data volumes to different nodes
112+
// 4. Restarts the cluster with the relocated data
113+
func registerKVStopAndCopy(r registry.Registry) {
114+
runStopAndCopy := func(ctx context.Context, t test.Test, c cluster.Cluster) {
115+
cfg := setupClusters(c)
116+
117+
if err := startCluster(ctx, t, c, cfg.srcCluster, false); err != nil {
118+
t.Fatal(err)
119+
}
120+
121+
// Initialize the database.
122+
t.Status("running workload on source cluster")
123+
c.Run(ctx, option.WithNodes(c.Node(1)), "./cockroach workload run kv --init --read-percent=0 --splits=1000 --duration=5m {pgurl:1}")
124+
125+
// Calculate the fingerprints for all databases on the source cluster, excluding the system database.
126+
t.Status("computing databases fingerprint on source cluster")
127+
srcFingerprints, err := fingerprintDatabases(ctx, t, c, cfg.srcCluster[0])
128+
if err != nil {
129+
t.Fatal(err)
130+
}
131+
132+
// Shut down the source cluster.
133+
t.Status("stopping source cluster")
134+
c.Stop(ctx, t.L(), option.DefaultStopOpts(), cfg.srcCluster)
135+
136+
if err := copyAndTransferData(ctx, t, c, cfg); err != nil {
137+
t.Fatal(err)
138+
}
139+
140+
// Start the destination cluster.
141+
if err := startCluster(ctx, t, c, cfg.dstCluster, true); err != nil {
142+
t.Fatal(err)
143+
}
144+
145+
// Calculate the fingerprints for all databases on the destination cluster, excluding the system database.
146+
t.Status("computing databases fingerprint on destination cluster")
147+
dstFingerprints, err := fingerprintDatabases(ctx, t, c, cfg.dstCluster[0])
148+
if err != nil {
149+
t.Fatal(err)
150+
}
151+
152+
// Match the database fingerprints of source and destination cluster.
153+
if err := fingerprintutils.CompareMultipleDatabaseFingerprints(srcFingerprints, dstFingerprints); err != nil {
154+
t.Fatal(err)
155+
}
156+
157+
t.Status("checking for replica divergence on destination cluster")
158+
db := c.Conn(ctx, t.L(), cfg.dstCluster[0], option.VirtualClusterName(install.SystemInterfaceName))
159+
defer db.Close()
160+
err = timeutil.RunWithTimeout(ctx, "consistency check", 20*time.Minute,
161+
func(ctx context.Context) error {
162+
return roachtestutil.CheckReplicaDivergenceOnDB(ctx, t.L(), db)
163+
},
164+
)
165+
if err != nil {
166+
t.Fatal(err)
167+
}
168+
}
169+
170+
r.Add(registry.TestSpec{
171+
Name: "stop-and-copy/nodes=3plus3",
172+
Owner: registry.OwnerKV,
173+
Cluster: r.MakeClusterSpec(2*nodesPerCluster, spec.CPU(8)),
174+
CompatibleClouds: registry.AllClouds,
175+
Suites: registry.Suites(registry.Weekly),
176+
Run: runStopAndCopy,
177+
Timeout: defaultTimeout,
178+
})
179+
}

0 commit comments

Comments
 (0)