Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MySQL 8.4.0 for moco-agent #96

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
name: Small Tests
strategy:
matrix:
mysql-version: ["8.0.18", "8.0.25", "8.0.26", "8.0.27", "8.0.28", "8.0.30", "8.0.31", "8.0.32", "8.0.33", "8.0.34", "8.0.35", "8.0.36", "8.0.37"]
mysql-version: ["8.0.28", "8.0.36", "8.0.37", "8.4.0"]
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
MYSQL_VERSION = 8.0.37
MYSQL_VERSION = 8.4.0

# For Go
GOOS := $(shell go env GOOS)
Expand Down
24 changes: 18 additions & 6 deletions server/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"path/filepath"
"strings"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
Expand Down Expand Up @@ -40,8 +41,13 @@ var _ = Describe("clone", func() {
Expect(err).NotTo(HaveOccurred())
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (100), (101), (102), (103)")
Expect(err).NotTo(HaveOccurred())
_, err = donorDB.Exec(`RESET MASTER`)
Expect(err).NotTo(HaveOccurred())
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = donorDB.Exec(`RESET BINARY LOGS AND GTIDS`)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = donorDB.Exec(`RESET MASTER`)
Expect(err).NotTo(HaveOccurred())
}
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (200), (800), (10000), (-3)")
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -99,10 +105,16 @@ var _ = Describe("clone", func() {
By("starting replication")
_, err = donorDB.Exec(`INSERT INTO foo.bar (i) VALUES (9), (999)`)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

Eventually(func() int {
Expand Down
16 changes: 14 additions & 2 deletions server/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,20 @@ func Init(ctx context.Context, db *sqlx.DB, socket string) error {
return err
}

if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil {
return fmt.Errorf("failed to reset master: %w", err)
var version string
err := db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`)
if err != nil {
return fmt.Errorf("failed to get version: %w", err)
}
if version == "8.4" {
if _, err := db.ExecContext(ctx, "RESET BINARY LOGS AND GTIDS"); err != nil {
return fmt.Errorf("failed to reset binary logs and gtids: %w", err)
}

} else {
if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil {
return fmt.Errorf("failed to reset master: %w", err)
}
}
if _, err := db.ExecContext(ctx, "SET GLOBAL super_read_only=ON"); err != nil {
return fmt.Errorf("failed to enable super_read_only: %w", err)
Expand Down
97 changes: 58 additions & 39 deletions server/mysql_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,26 @@ type MySQLPrimaryStatus struct {

// MySQLReplicaStatus defines the observed state of a replica
type MySQLReplicaStatus struct {
LastIOErrno int `db:"Last_IO_Errno"`
LastIOError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
MasterHost string `db:"Master_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
SlaveIORunning string `db:"Slave_IO_Running"`
SlaveSQLRunning string `db:"Slave_SQL_Running"`
LastIOErrno int `db:"Last_IO_Errno"`
LastIOError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
SourceHost string `db:"Source_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
ReplicaIORunning string `db:"Replica_IO_Running"`
ReplicaSQLRunning string `db:"Replica_SQL_Running"`

// All of variables from here are NOT used in MOCO's reconcile
SlaveIOState string `db:"Slave_IO_State"`
MasterUser string `db:"Master_User"`
MasterPort int `db:"Master_Port"`
ReplicaIOState string `db:"Replica_IO_State"`
SourceUser string `db:"Source_User"`
SourcePort int `db:"Source_Port"`
ConnectRetry int `db:"Connect_Retry"`
MasterLogFile string `db:"Master_Log_File"`
ReadMasterLogPos int `db:"Read_Master_Log_Pos"`
SourceLogFile string `db:"Source_Log_File"`
ReadSourceLogPos int `db:"Read_Source_Log_Pos"`
RelayLogFile string `db:"Relay_Log_File"`
RelayLogPos int `db:"Relay_Log_Pos"`
RelayMasterLogFile string `db:"Relay_Master_Log_File"`
RelaySourceLogFile string `db:"Relay_Source_Log_File"`
ReplicateDoDB string `db:"Replicate_Do_DB"`
ReplicateIgnoreDB string `db:"Replicate_Ignore_DB"`
ReplicateDoTable string `db:"Replicate_Do_Table"`
Expand All @@ -64,38 +64,38 @@ type MySQLReplicaStatus struct {
LastErrno int `db:"Last_Errno"`
LastError string `db:"Last_Error"`
SkipCounter int `db:"Skip_Counter"`
ExecMasterLogPos int `db:"Exec_Master_Log_Pos"`
ExecSourceLogPos int `db:"Exec_Source_Log_Pos"`
RelayLogSpace int `db:"Relay_Log_Space"`
UntilCondition string `db:"Until_Condition"`
UntilLogFile string `db:"Until_Log_File"`
UntilLogPos int `db:"Until_Log_Pos"`
MasterSSLAllowed string `db:"Master_SSL_Allowed"`
MasterSSLCAFile string `db:"Master_SSL_CA_File"`
MasterSSLCAPath string `db:"Master_SSL_CA_Path"`
MasterSSLCert string `db:"Master_SSL_Cert"`
MasterSSLCipher string `db:"Master_SSL_Cipher"`
MasterSSLKey string `db:"Master_SSL_Key"`
SecondsBehindMaster sql.NullInt64 `db:"Seconds_Behind_Master"`
MasterSSLVerifyServerCert string `db:"Master_SSL_Verify_Server_Cert"`
SourceSSLAllowed string `db:"Source_SSL_Allowed"`
SourceSSLCAFile string `db:"Source_SSL_CA_File"`
SourceSSLCAPath string `db:"Source_SSL_CA_Path"`
SourceSSLCert string `db:"Source_SSL_Cert"`
SourceSSLCipher string `db:"Source_SSL_Cipher"`
SourceSSLKey string `db:"Source_SSL_Key"`
SecondsBehindSource sql.NullInt64 `db:"Seconds_Behind_Source"`
SourceSSLVerifyServerCert string `db:"Source_SSL_Verify_Server_Cert"`
ReplicateIgnoreServerIds string `db:"Replicate_Ignore_Server_Ids"`
MasterServerID int `db:"Master_Server_Id"`
MasterUUID string `db:"Master_UUID"`
MasterInfoFile string `db:"Master_Info_File"`
SourceServerID int `db:"Source_Server_Id"`
SourceUUID string `db:"Source_UUID"`
SourceInfoFile string `db:"Source_Info_File"`
SQLDelay int `db:"SQL_Delay"`
SQLRemainingDelay sql.NullInt64 `db:"SQL_Remaining_Delay"`
SlaveSQLRunningState string `db:"Slave_SQL_Running_State"`
MasterRetryCount int `db:"Master_Retry_Count"`
MasterBind string `db:"Master_Bind"`
ReplicaSQLRunningState string `db:"Replica_SQL_Running_State"`
SourceRetryCount int `db:"Source_Retry_Count"`
SourceBind string `db:"Source_Bind"`
LastIOErrorTimestamp string `db:"Last_IO_Error_Timestamp"`
LastSQLErrorTimestamp string `db:"Last_SQL_Error_Timestamp"`
MasterSSLCrl string `db:"Master_SSL_Crl"`
MasterSSLCrlpath string `db:"Master_SSL_Crlpath"`
SourceSSLCrl string `db:"Source_SSL_Crl"`
SourceSSLCrlpath string `db:"Source_SSL_Crlpath"`
AutoPosition string `db:"Auto_Position"`
ReplicateRewriteDB string `db:"Replicate_Rewrite_DB"`
ChannelName string `db:"Channel_Name"`
MasterTLSVersion string `db:"Master_TLS_Version"`
Masterpublickeypath string `db:"Master_public_key_path"`
Getmasterpublickey string `db:"Get_master_public_key"`
SourceTLSVersion string `db:"Source_TLS_Version"`
Sourcepublickeypath string `db:"Source_public_key_path"`
GetSourcepublickey string `db:"Get_Source_public_key"`
NetworkNamespace string `db:"Network_Namespace"`
}

Expand All @@ -120,18 +120,37 @@ func (a *Agent) GetMySQLCloneStateStatus(ctx context.Context) (*MySQLCloneStateS
return status, nil
}

func (a *Agent) IsMySQL84(ctx context.Context) (bool, error) {
var version string
err := a.db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`)
if err != nil {
return false, fmt.Errorf("failed to get version: %w", err)
}
return version == "8.4", nil
}

func (a *Agent) GetMySQLPrimaryStatus(ctx context.Context) (*MySQLPrimaryStatus, error) {
status := &MySQLPrimaryStatus{}
if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil {
return nil, fmt.Errorf("failed to show master status: %w", err)
isMySQL84, err := a.IsMySQL84(ctx)
if err != nil {
return nil, err
}
if isMySQL84 {
if err := a.db.GetContext(ctx, status, `SHOW BINARY LOG STATUS`); err != nil {
return nil, fmt.Errorf("failed to show binary log status: %w", err)
}
} else {
if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil {
return nil, fmt.Errorf("failed to show master status: %w", err)
}
}
return status, nil
}

func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus, error) {
status := &MySQLReplicaStatus{}
if err := a.db.GetContext(ctx, status, `SHOW SLAVE STATUS`); err != nil {
return nil, fmt.Errorf("failed to show slave status: %w", err)
if err := a.db.GetContext(ctx, status, `SHOW REPLICA STATUS`); err != nil {
return nil, fmt.Errorf("failed to show replica status: %w", err)
}
return status, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var tmpBaseDir = path.Join(os.TempDir(), "moco-agent-test-server")

var MySQLVersion = func() string {
if ver := os.Getenv("MYSQL_VERSION"); ver == "" {
os.Setenv("MYSQL_VERSION", "8.0.28")
os.Setenv("MYSQL_VERSION", "8.4.0")
}
return os.Getenv("MYSQL_VERSION")
}()
Expand Down
2 changes: 1 addition & 1 deletion server/mysqld_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) {
return
}

if replicaStatus.SlaveIORunning != "Yes" || replicaStatus.SlaveSQLRunning != "Yes" {
if replicaStatus.ReplicaIORunning != "Yes" || replicaStatus.ReplicaSQLRunning != "Yes" {
a.logger.Info("replication threads are stopped")
http.Error(w, "replication thread are stopped", http.StatusServiceUnavailable)
return
Expand Down
29 changes: 21 additions & 8 deletions server/mysqld_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
Expand Down Expand Up @@ -105,10 +106,16 @@ var _ = Describe("health", func() {
_, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (?), (?), (?), (?)", items...)
Expect(err).NotTo(HaveOccurred())

_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

By("checking readiness")
Expand Down Expand Up @@ -173,10 +180,16 @@ var _ = Describe("health", func() {
_, err = donorDB.Exec("SET GLOBAL read_only=0")
Expect(err).NotTo(HaveOccurred())

_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
if strings.HasPrefix(MySQLVersion, "8.4") {
_, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
} else {
_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
}
_, err = replicaDB.Exec(`START REPLICA`)
Expect(err).NotTo(HaveOccurred())

By("checking readiness")
Expand Down