Skip to content

Commit 3bb8fdb

Browse files
Merge pull request #732 from cybozu-go/issue-731
issue-731: Cannot replicate because the master purged required binary logs
2 parents 0c4d537 + c0128a6 commit 3bb8fdb

File tree

4 files changed

+77
-8
lines changed

4 files changed

+77
-8
lines changed

clustering/operations.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,14 @@ func (p *managerProcess) configureReplica(ctx context.Context, ss *StatusSet, in
560560
}
561561
}
562562

563+
// if the binlog required by the replica instance is not existing in the new primary instance, some data may be missing when `CHANGE MASTER TO' is executed.
564+
// use SubtractGTID to ensure no data is missing when switching.
565+
if sub, err := op.SubtractGTID(ctx, ss.MySQLStatus[ss.Primary].GlobalVariables.PurgedGTID, ss.MySQLStatus[index].GlobalVariables.ExecutedGTID); err != nil {
566+
return false, err
567+
} else if sub != "" {
568+
return false, fmt.Errorf("new primary %d does not have binlog containing transactions %s required for instance %d", ss.Primary, sub, index)
569+
}
570+
563571
ai := dbop.AccessInfo{
564572
Host: ss.Cluster.PodHostname(ss.Primary),
565573
Port: constants.MySQLPort,

e2e/replication_test.go

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"strconv"
99
"strings"
10+
"time"
1011

1112
mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
1213
. "github.com/onsi/ginkgo"
@@ -116,6 +117,7 @@ var _ = Context("replication", func() {
116117

117118
It("should switch the primary if requested", func() {
118119
kubectlSafe(nil, "moco", "-n", "repl", "switchover", "test")
120+
time.Sleep(10 * time.Second)
119121
Eventually(func() int {
120122
cluster, err := getCluster("repl", "test")
121123
if err != nil {
@@ -142,6 +144,61 @@ var _ = Context("replication", func() {
142144
}).Should(Succeed())
143145
})
144146

147+
It("should switch the primary even with replication delays and new primary has initialized", func() {
148+
kubectlSafe(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "test", "--index", "2", "--",
149+
"-e", "STOP REPLICA SQL_THREAD")
150+
kubectlSafe(nil, "moco", "-n", "donor", "mysql", "-u", "moco-admin", "single", "--",
151+
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('eee')")
152+
kubectlSafe(nil, "delete", "pvc", "-n", "repl", "--wait=false", "mysql-data-moco-test-0")
153+
kubectlSafe(nil, "delete", "pod", "-n", "repl", "--grace-period=1", "moco-test-0")
154+
time.Sleep(60 * time.Second)
155+
Eventually(func() error {
156+
cluster, err := getCluster("repl", "test")
157+
if err != nil {
158+
return err
159+
}
160+
for _, cond := range cluster.Status.Conditions {
161+
if cond.Type != mocov1beta2.ConditionAvailable {
162+
continue
163+
}
164+
if cond.Status == metav1.ConditionTrue {
165+
return nil
166+
}
167+
return fmt.Errorf("cluster is not available: %s", cond.Status)
168+
}
169+
return errors.New("no available condition")
170+
}).Should(Succeed())
171+
172+
kubectlSafe(nil, "moco", "-n", "repl", "switchover", "test")
173+
time.Sleep(10 * time.Second)
174+
Eventually(func() int {
175+
cluster, err := getCluster("repl", "test")
176+
if err != nil {
177+
return 0
178+
}
179+
return cluster.Status.CurrentPrimaryIndex
180+
}).ShouldNot(Equal(1))
181+
time.Sleep(10 * time.Second)
182+
kubectlSafe(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "test", "--index", "2", "--",
183+
"-e", "START REPLICA SQL_THREAD")
184+
Eventually(func() error {
185+
cluster, err := getCluster("repl", "test")
186+
if err != nil {
187+
return err
188+
}
189+
for _, cond := range cluster.Status.Conditions {
190+
if cond.Type != mocov1beta2.ConditionHealthy {
191+
continue
192+
}
193+
if cond.Status == metav1.ConditionTrue {
194+
return nil
195+
}
196+
return fmt.Errorf("cluster is not healthy: %s", cond.Status)
197+
}
198+
return errors.New("no health condition")
199+
}).Should(Succeed())
200+
})
201+
145202
It("should be able to scale out the cluster", func() {
146203
Eventually(func() error {
147204
cluster, err := getCluster("repl", "test")
@@ -177,12 +234,12 @@ var _ = Context("replication", func() {
177234

178235
It("should detect errant transactions", func() {
179236
Eventually(func() error {
180-
_, err := kubectl(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "--index", "0", "test", "--",
237+
_, err := kubectl(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "--index", "2", "test", "--",
181238
"-e", "SET GLOBAL read_only=0")
182239
if err != nil {
183240
return err
184241
}
185-
_, err = kubectl(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "--index", "0", "test", "--",
242+
_, err = kubectl(nil, "moco", "-n", "repl", "mysql", "-u", "moco-admin", "--index", "2", "test", "--",
186243
"-e", "CREATE DATABASE errant")
187244
return err
188245
}).Should(Succeed())
@@ -258,11 +315,11 @@ var _ = Context("replication", func() {
258315
cluster, err := getCluster("repl", "test")
259316
Expect(err).NotTo(HaveOccurred())
260317

261-
kubectlSafe(nil, "delete", "-n", "repl", "--wait=false", "pvc", "mysql-data-moco-test-0")
262-
kubectlSafe(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(0))
318+
kubectlSafe(nil, "delete", "-n", "repl", "--wait=false", "pvc", "mysql-data-moco-test-2")
319+
kubectlSafe(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(2))
263320

264321
Eventually(func() error {
265-
out, err := kubectl(nil, "-n", "repl", "get", "pod", cluster.PodName(0), "-o", "json")
322+
out, err := kubectl(nil, "-n", "repl", "get", "pod", cluster.PodName(2), "-o", "json")
266323
if err != nil {
267324
return err
268325
}
@@ -277,7 +334,7 @@ var _ = Context("replication", func() {
277334
}
278335
if cond.Reason == "Unschedulable" {
279336
fmt.Println("re-deleting pending pod...")
280-
_, err := kubectl(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(0))
337+
_, err := kubectl(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(2))
281338
if err != nil {
282339
return fmt.Errorf("failed to delete pod: %w", err)
283340
}
@@ -322,7 +379,7 @@ var _ = Context("replication", func() {
322379

323380
Eventually(func() error {
324381
_, err := kubectl(nil, "moco", "-n", "repl", "mysql", "-u", "moco-writable", "test", "--",
325-
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('eee')")
382+
"-D", "test", "--init_command=SET autocommit=1", "-e", "INSERT INTO t (data) VALUES ('fff')")
326383
return err
327384
}).Should(Succeed())
328385
})
@@ -341,7 +398,7 @@ var _ = Context("replication", func() {
341398
}
342399
count, _ := strconv.Atoi(strings.TrimSpace(string(out)))
343400
return count
344-
}).Should(Equal(5))
401+
}).Should(Equal(6))
345402
})
346403

347404
It("should delete clusters", func() {

pkg/dbop/status_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var _ = Describe("status", func() {
2828
Expect(err).NotTo(HaveOccurred())
2929
Expect(status).NotTo(BeNil())
3030
Expect(status.GlobalVariables.ExecutedGTID).To(BeEmpty())
31+
Expect(status.GlobalVariables.PurgedGTID).To(BeEmpty())
3132
Expect(status.GlobalVariables.ReadOnly).To(BeTrue())
3233
Expect(status.GlobalVariables.SuperReadOnly).To(BeTrue())
3334
Expect(status.GlobalVariables.WaitForSlaveCount).To(Equal(1))
@@ -43,6 +44,7 @@ var _ = Describe("status", func() {
4344
Expect(err).NotTo(HaveOccurred())
4445
Expect(status).NotTo(BeNil())
4546
Expect(status.GlobalVariables.ExecutedGTID).NotTo(BeEmpty())
47+
Expect(status.GlobalVariables.PurgedGTID).To(BeEmpty())
4648
Expect(status.GlobalVariables.ReadOnly).To(BeFalse())
4749
Expect(status.GlobalVariables.SuperReadOnly).To(BeFalse())
4850
Expect(status.GlobalVariables.WaitForSlaveCount).To(Equal(1))

pkg/dbop/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type MySQLInstanceStatus struct {
2323
var statusGlobalVars = []string{
2424
"@@server_uuid",
2525
"@@gtid_executed",
26+
"@@gtid_purged",
2627
"@@read_only",
2728
"@@super_read_only",
2829
"@@rpl_semi_sync_master_wait_for_slave_count",
@@ -34,6 +35,7 @@ var statusGlobalVars = []string{
3435
type GlobalVariables struct {
3536
UUID string `db:"@@server_uuid"`
3637
ExecutedGTID string `db:"@@gtid_executed"`
38+
PurgedGTID string `db:"@@gtid_purged"`
3739
ReadOnly bool `db:"@@read_only"`
3840
SuperReadOnly bool `db:"@@super_read_only"`
3941
WaitForSlaveCount int `db:"@@rpl_semi_sync_master_wait_for_slave_count"`

0 commit comments

Comments
 (0)