Skip to content

Commit 1330e84

Browse files
ymakedaqiSecloud
authored andcommitted
refactor(dbm-services): 切换流程代码优化 TencentBlueKing#8668
1 parent 5c0c723 commit 1330e84

File tree

10 files changed

+292
-14
lines changed

10 files changed

+292
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Package spiderctlcmd TODO
2+
/*
3+
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
4+
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
5+
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
7+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
* specific language governing permissions and limitations under the License.
10+
*/
11+
package spiderctlcmd
12+
13+
import (
14+
"fmt"
15+
16+
"github.com/spf13/cobra"
17+
18+
"dbm-services/common/go-pubpkg/logger"
19+
"dbm-services/mysql/db-tools/dbactuator/internal/subcmd"
20+
"dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl"
21+
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
22+
)
23+
24+
// CheckTdbctlWithSpiderRouterAct check tdbctl with spider schema
25+
type CheckTdbctlWithSpiderRouterAct struct {
26+
Service spiderctl.CheckTdbctlWithSpideRouterComp
27+
}
28+
29+
// NewChkTdbctlSpiderRouterCommand create new subcommand
30+
func NewChkTdbctlSpiderRouterCommand() *cobra.Command {
31+
act := CheckTdbctlWithSpiderRouterAct{}
32+
cmd := &cobra.Command{
33+
Use: "check-tdbctl-with-spider-router",
34+
Short: "检查中控和spider的路由是否一致",
35+
Example: fmt.Sprintf(`dbactuator spiderctl cluster-backend-switch %s %s`,
36+
subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Service.Example()),
37+
),
38+
Run: func(cmd *cobra.Command, args []string) {
39+
util.CheckErr(act.Init())
40+
util.CheckErr(act.Run())
41+
},
42+
}
43+
return cmd
44+
}
45+
46+
// Init prepare run env
47+
func (c *CheckTdbctlWithSpiderRouterAct) Init() (err error) {
48+
if _, err = subcmd.Deserialize(&c.Service.Params); err != nil {
49+
logger.Error("DeserializeAndValidate failed, %v", err)
50+
return err
51+
}
52+
c.Service.GeneralParam = subcmd.GeneralRuntimeParam
53+
return nil
54+
}
55+
56+
// Run Command Run
57+
func (c *CheckTdbctlWithSpiderRouterAct) Run() (err error) {
58+
steps := subcmd.Steps{
59+
{
60+
FunName: "检查集群路由是否和中控一致",
61+
Func: c.Service.Run,
62+
},
63+
}
64+
if err = steps.Run(); err != nil {
65+
return err
66+
}
67+
logger.Info("check tdbctl with spider routers successfully")
68+
return
69+
}

dbm-services/mysql/db-tools/dbactuator/internal/subcmd/spiderctlcmd/check_tdbctl_with_spider_schema.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func NewChkTdbctlSpiderSchaCommand() *cobra.Command {
3131
act := CheckTdbctlWithSpiderSchemaAct{}
3232
cmd := &cobra.Command{
3333
Use: "check-tdbctl-with-spider-schema",
34-
Short: "spider集群后端切换",
34+
Short: "检查中控和spider的schema数量是否一致",
3535
Example: fmt.Sprintf(`dbactuator spiderctl cluster-backend-switch %s %s`,
3636
subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Service.Example()),
3737
),

dbm-services/mysql/db-tools/dbactuator/internal/subcmd/spiderctlcmd/cmd.go

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func NewSpiderCtlCommand() *cobra.Command {
4747
NewCreateToDBViaCtlCommand(),
4848
NewRenameDropFromViaCtlCommand(),
4949
NewChkTdbctlSpiderSchaCommand(),
50+
NewChkTdbctlSpiderRouterCommand(),
5051
},
5152
},
5253
}

dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/backend_switch.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func (r *SpiderClusterBackendSwitchComp) CutOver() (err error) {
446446
if err = r.recordBinLogPos(); err != nil {
447447
return err
448448
}
449-
// flush 到中控生效
449+
// flush 到中控生效,此时才算真正切换,中控更改后端的路由
450450
logger.Info("execute:tdbctl flush routing force")
451451
return r.flushrouting()
452452
}
@@ -495,7 +495,7 @@ func (r *SpiderClusterBackendSwitchComp) CutOverSlave() (err error) {
495495
return r.flushrouting()
496496
}
497497

498-
// PersistenceRollbackFile TODO
498+
// PersistenceRollbackFile 持久化需要回滚的切换路由的SQL文件
499499
func (r *SpiderClusterBackendSwitchComp) PersistenceRollbackFile() (err error) {
500500
var masterRbSqls, slaveRbSqls, w []string
501501
for _, ins_pair := range r.realSwitchSvrPairs {
@@ -724,24 +724,16 @@ func (c *CutOverCtx) flushrouting() (err error) {
724724
return
725725
}
726726

727-
// me, ok := err.(*mysql.MySQLError)
728-
// if !ok {
729-
// return
730-
// }
731-
// if me.Number == 12028 {
732-
// partFlushed = true
733-
// }
734-
// partFlushed, err
735-
736727
func (c *CutOverCtx) initRollbackRouteFile() (err error) {
737728
fileName := "rollback.sql"
738729
currentPath, err := os.Getwd()
739730
if err != nil {
740731
return
741732
}
733+
var fsInfo os.FileInfo
742734
logger.Info("init rollback route sql file in %s", currentPath)
743735
if cmutil.FileExists(fileName) {
744-
fsInfo, err := os.Stat(fileName)
736+
fsInfo, err = os.Stat(fileName)
745737
if err != nil {
746738
return err
747739
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
3+
* Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
4+
* Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at https://opensource.org/licenses/MIT
6+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
7+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
8+
* specific language governing permissions and limitations under the License.
9+
*/
10+
11+
package spiderctl
12+
13+
import (
14+
"errors"
15+
"fmt"
16+
"sync"
17+
18+
"github.com/samber/lo"
19+
20+
"dbm-services/common/go-pubpkg/logger"
21+
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
22+
"dbm-services/mysql/db-tools/dbactuator/pkg/native"
23+
)
24+
25+
// CheckTdbctlWithSpideRouterComp 检查spider和中控路由是否一致
26+
type CheckTdbctlWithSpideRouterComp struct {
27+
GeneralParam *components.GeneralParam `json:"general"`
28+
Params CheckTdbctlWithSpideRouterParam `json:"extend"`
29+
}
30+
31+
// CheckTdbctlWithSpideRouterParam 检查参数
32+
type CheckTdbctlWithSpideRouterParam struct {
33+
Host string `json:"host" validate:"required,ip"` // 当前实例的主机地址
34+
Port int `json:"port" validate:"required,lt=65536,gte=3306"` // 当前实例的端口
35+
}
36+
37+
// Example subcommand example input
38+
func (c CheckTdbctlWithSpideRouterComp) Example() interface{} {
39+
return CheckTdbctlWithSpideRouterComp{
40+
Params: CheckTdbctlWithSpideRouterParam{
41+
Host: "127.0.0.1",
42+
Port: 26000,
43+
},
44+
}
45+
}
46+
47+
// Run Run
48+
func (c *CheckTdbctlWithSpideRouterComp) Run() (err error) {
49+
user := c.GeneralParam.RuntimeAccountParam.MonitorUser
50+
pwd := c.GeneralParam.RuntimeAccountParam.MonitorPwd
51+
conn, err := native.InsObject{
52+
Host: c.Params.Host,
53+
Port: c.Params.Port,
54+
User: user,
55+
Pwd: pwd,
56+
}.Conn()
57+
if err != nil {
58+
logger.Error("connect to tdbctl failed, err: %s", err.Error())
59+
return err
60+
}
61+
defer conn.Close()
62+
tdbCtlConn := &native.TdbctlDbWork{DbWorker: *conn}
63+
logger.Info("开始检查SpiderMaster路由关系 ...")
64+
mspNodes, err := tdbCtlConn.GetMasterSpiderNodes()
65+
if err != nil {
66+
logger.Error("查询SpiderMaster节点信息失败: %s", err.Error())
67+
return err
68+
}
69+
masterSptRouters, err := tdbCtlConn.GetMasterSptRouters()
70+
if err != nil {
71+
logger.Error("查询主分片节点信息失败: %s", err.Error())
72+
return err
73+
}
74+
tdbCtlmasterSptRouters := lo.SliceToMap(masterSptRouters, func(item native.Server) (string, native.Server) {
75+
return item.ServerName, item
76+
})
77+
err = checkRouter(mspNodes, tdbCtlmasterSptRouters)
78+
if err != nil {
79+
return err
80+
}
81+
sspNodes, err := tdbCtlConn.GetSlaveSpiderNodes()
82+
if err != nil {
83+
logger.Error("查询SpiderSlave节点信息失败: %s", err.Error())
84+
return err
85+
}
86+
if len(sspNodes) == 0 {
87+
return err
88+
}
89+
slaveSptRouters, err := tdbCtlConn.GetSlaveSptRouters()
90+
if err != nil {
91+
logger.Error("查询Slave分片节点信息失败: %s", err.Error())
92+
return err
93+
}
94+
tdbCtlslaveSptRouters := lo.SliceToMap(slaveSptRouters, func(item native.Server) (string, native.Server) {
95+
return item.ServerName, item
96+
})
97+
logger.Info("检查从分片路由关系")
98+
err = checkRouter(sspNodes, tdbCtlslaveSptRouters)
99+
return err
100+
}
101+
102+
func checkRouter(nodes []native.Server, tdbctlRouters map[string]native.Server) (err error) {
103+
var errs []error
104+
wg := sync.WaitGroup{}
105+
errChan := make(chan error)
106+
cChan := make(chan struct{}, 5)
107+
for _, node := range nodes {
108+
wg.Add(1)
109+
cChan <- struct{}{}
110+
go func(spiderNode native.Server) {
111+
logger.Info("开始检查 %s-%s的路由", spiderNode.ServerName, spiderNode.GetEndPoint())
112+
defer func() { wg.Done(); <-cChan }()
113+
sconn, errx := spiderNode.GetConn()
114+
if errx != nil {
115+
logger.Error("connect to spider %s failed, err: %s", spiderNode.GetEndPoint(), err.Error())
116+
errChan <- errx
117+
return
118+
}
119+
defer sconn.Close()
120+
var spiderSptRouters []native.Server
121+
if native.SvrNameIsSlaveSpiderShard(spiderNode.ServerName) {
122+
spiderSptRouters, errx = sconn.GetSlaveSptRouters()
123+
} else {
124+
spiderSptRouters, errx = sconn.GetMasterSptRouters()
125+
}
126+
if errx != nil {
127+
logger.Error("query mysql.servers failed, err: %s", err.Error())
128+
errChan <- errx
129+
return
130+
}
131+
errx = compareRouter(tdbctlRouters, spiderSptRouters)
132+
if errx != nil {
133+
errChan <- fmt.Errorf("[%s-%s]:%w", spiderNode.ServerName, spiderNode.GetEndPoint(), errx)
134+
}
135+
136+
}(node)
137+
}
138+
go func() {
139+
wg.Wait()
140+
close(errChan)
141+
}()
142+
for err = range errChan {
143+
errs = append(errs, err)
144+
}
145+
return errors.Join(errs...)
146+
}
147+
148+
func compareRouter(tdbctlRouters map[string]native.Server, spiderrRouters []native.Server) (err error) {
149+
spiderrRoutersMap := lo.SliceToMap(spiderrRouters, func(item native.Server) (string, native.Server) {
150+
return item.ServerName, item
151+
})
152+
for svrName, rt := range tdbctlRouters {
153+
spiderrRouter, ok := spiderrRoutersMap[svrName]
154+
if !ok {
155+
logger.Error("spider router not found router: %s", svrName)
156+
return err
157+
}
158+
if rt.Host != spiderrRouter.Host || rt.Port != spiderrRouter.Port {
159+
errMsg := "spider router not match tdbctl router:"
160+
errMsg += fmt.Sprintf("tdbctl router:%s %s %d\n", rt.ServerName, rt.Host, rt.Port)
161+
errMsg += fmt.Sprintf("spider router:%s %s %d\n", spiderrRouter.ServerName, spiderrRouter.Host, spiderrRouter.Port)
162+
return fmt.Errorf("%s", errMsg)
163+
}
164+
}
165+
return
166+
}

dbm-services/mysql/db-tools/dbactuator/pkg/native/spider.go

+12
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,15 @@ func (o InsObject) ConnSpiderAdmin() (*SpiderAdminDbWork, error) {
1010
dbwork, err := NewDbWorkerNoPing(o.spiderAdminTcpDsn(), o.User, o.Pwd)
1111
return &SpiderAdminDbWork{DbWorker: *dbwork}, err
1212
}
13+
14+
// GetMasterSptRouters 获取主分片路由
15+
func (t *DbWorker) GetMasterSptRouters() (servers []Server, err error) {
16+
err = t.Queryx(&servers, "select * from mysql.servers where Wrapper='mysql'")
17+
return
18+
}
19+
20+
// GetSlaveSptRouters 获取从分片路由
21+
func (t *DbWorker) GetSlaveSptRouters() (servers []Server, err error) {
22+
err = t.Queryx(&servers, "select * from mysql.servers where Wrapper='mysql_slave'")
23+
return
24+
}

dbm-services/mysql/db-tools/dbactuator/pkg/native/tdbctl.go

+12
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,18 @@ func (t *TdbctlDbWork) SelectServers() (servers []Server, err error) {
176176
return
177177
}
178178

179+
// GetMasterSpiderNodes 获取所有spider节点
180+
func (t *TdbctlDbWork) GetMasterSpiderNodes() (servers []Server, err error) {
181+
err = t.Queryx(&servers, "select * from mysql.servers where Wrapper in ('SPIDER');")
182+
return
183+
}
184+
185+
// GetSlaveSpiderNodes 获取所有spider slave节点
186+
func (t *TdbctlDbWork) GetSlaveSpiderNodes() (servers []Server, err error) {
187+
err = t.Queryx(&servers, "select * from mysql.servers where Wrapper in ('SPIDER_SLAVE');")
188+
return
189+
}
190+
179191
// get_exec_special_node_cmd TODO
180192
func (t *TdbctlDbWork) get_exec_special_node_cmd(serverName string) string {
181193
return fmt.Sprintf("TDBCTL CONNECT NODE %s EXECUTE", serverName)

dbm-ui/backend/flow/consts.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ class DBActuatorActionEnum(str, StructuredEnum):
382382
Deploy = EnumField("deploy", _("deploy"))
383383
AppendDeploy = EnumField("append-deploy", _("append-deploy"))
384384
ImportSchemaToTdbctl = EnumField("import-schema-to-tdbctl", _("import-schema-to-tdbctl"))
385-
CheckTdbctlWithSpiderSchema = EnumField("check-tdbctl-with-spider-schema", _("icheck-tdbctl-with-spider-schema"))
385+
CheckTdbctlWithSpiderSchema = EnumField("check-tdbctl-with-spider-schema", _("check-tdbctl-with-spider-schema"))
386+
CheckTdbctlWithSpiderRouter = EnumField("check-tdbctl-with-spider-router", _("check-tdbctl-with-spider-router"))
386387
GetBackupFile = EnumField("find-local-backup", _("find-local-backup"))
387388
RestoreSlave = EnumField("restore-dr", _("restore-dr"))
388389
FastExecuteSqlFile = EnumField("fast-execute-sql-file", _("fast-execute-sql-file"))

dbm-ui/backend/flow/engine/bamboo/scene/spider/append_deploy_ctl_flow.py

+12
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ def run(self):
391391
act_component_code=ExecuteDBActuatorScriptComponent.code,
392392
kwargs=asdict(exec_act_kwargs),
393393
)
394+
394395
# 校验spider节点和tdbctldb表的数量
395396
exec_act_kwargs.cluster = {"ctl_port": ctl_port, "spider_port": leader_spider.port}
396397
exec_act_kwargs.exec_ip = primary_ctl_ip
@@ -411,6 +412,17 @@ def run(self):
411412
act_component_code=ExecuteDBActuatorScriptComponent.code,
412413
kwargs=asdict(exec_act_kwargs),
413414
)
415+
416+
# 校验spider节点和tdbctldb表的数量
417+
exec_act_kwargs.cluster = {"ctl_port": ctl_port}
418+
exec_act_kwargs.exec_ip = primary_ctl_ip
419+
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_check_router_payload.__name__
420+
migrate_pipeline.add_act(
421+
act_name=_("校验spider和tdbctl节点间路由是否一致"),
422+
act_component_code=ExecuteDBActuatorScriptComponent.code,
423+
kwargs=asdict(exec_act_kwargs),
424+
)
425+
414426
sub_pipelines.append(
415427
migrate_pipeline.build_sub_process(
416428
sub_name=_("[{}]追加部署tdbctl&迁移表结构").format(cluster_obj.immute_domain)

dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py

+13
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,19 @@ def get_check_schema_payload(self, **kwargs):
426426
},
427427
}
428428

429+
def get_check_router_payload(self, **kwargs):
430+
return {
431+
"db_type": DBActuatorTypeEnum.SpiderCtl.value,
432+
"action": DBActuatorActionEnum.CheckTdbctlWithSpiderRouter.value,
433+
"payload": {
434+
"general": {"runtime_account": self.account},
435+
"extend": {
436+
"host": kwargs["ip"],
437+
"port": self.cluster["ctl_port"],
438+
},
439+
},
440+
}
441+
429442
def get_install_spider_ctl_payload(self, **kwargs):
430443
"""
431444
拼接spider-ctl节点安装的payload, ctl是单机单实例, 所以代码兼容多实例传入

0 commit comments

Comments
 (0)