diff --git a/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/mysql_rpc.go b/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/mysql_rpc.go deleted file mode 100644 index 42337c6330..0000000000 --- a/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/mysql_rpc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package mysql_rpc mysql rpc -package mysql_rpc diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_core/execute_cmds_on_addr.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/execute_cmds_on_addr.go index cffb888f73..0cb6891cf4 100644 --- a/dbm-services/mysql/db-remote-service/pkg/rpc_core/execute_cmds_on_addr.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_core/execute_cmds_on_addr.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" ) -func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) { +func (c *RPCWrapper) executeOneAddr(address string) (res []CmdResultType, err error) { db, err := c.MakeConnection(address, c.user, c.password, c.connectTimeout, c.timezone) if err != nil { @@ -48,7 +48,7 @@ func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) slog.String("address", address), slog.String("command", command), ) res = append( - res, cmdResult{ + res, CmdResultType{ Cmd: command, RowsAffected: 0, TableData: nil, @@ -61,7 +61,7 @@ func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) continue } res = append( - res, cmdResult{ + res, CmdResultType{ Cmd: command, TableData: tableData, RowsAffected: 0, @@ -77,7 +77,7 @@ func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) slog.String("address", address), slog.String("command", command), ) res = append( - res, cmdResult{ + res, CmdResultType{ Cmd: command, TableData: nil, RowsAffected: 0, @@ -90,7 +90,7 @@ func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) continue } res = append( - res, cmdResult{ + res, CmdResultType{ Cmd: command, TableData: nil, RowsAffected: rowsAffected, @@ -101,7 +101,7 @@ func (c *RPCWrapper) executeOneAddr(address string) (res []cmdResult, err error) err = errors.Errorf("commands[%d]: %s not support", idx, command) slog.Error("dispatch command", slog.String("error", err.Error())) res = append( - res, cmdResult{Cmd: command, TableData: nil, RowsAffected: 0, ErrorMsg: err.Error()}, + res, CmdResultType{Cmd: command, TableData: nil, RowsAffected: 0, ErrorMsg: err.Error()}, ) if !c.force { return res, err diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_core/init.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/init.go index d14a12f9f5..7bc691574c 100644 --- a/dbm-services/mysql/db-remote-service/pkg/rpc_core/init.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_core/init.go @@ -2,15 +2,15 @@ package rpc_core type tableDataType []map[string]interface{} -type cmdResult struct { +type CmdResultType struct { Cmd string `json:"cmd"` TableData tableDataType `json:"table_data"` RowsAffected int64 `json:"rows_affected"` ErrorMsg string `json:"error_msg"` } -type oneAddressResult struct { - Address string `json:"address"` - CmdResults []cmdResult `json:"cmd_results"` - ErrorMsg string `json:"error_msg"` +type OneAddressResultType struct { + Address string `json:"address"` + CmdResults []CmdResultType `json:"cmd_results"` + ErrorMsg string `json:"error_msg"` } diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_core/interface.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/interface.go index c3f1f93571..8ba0c357f2 100644 --- a/dbm-services/mysql/db-remote-service/pkg/rpc_core/interface.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_core/interface.go @@ -1,8 +1,6 @@ package rpc_core import ( - "dbm-services/mysql/db-remote-service/pkg/parser" - "github.com/jmoiron/sqlx" ) @@ -15,9 +13,9 @@ type RPCEmbedInterface interface { timeout int, timezone string, ) (*sqlx.DB, error) - ParseCommand(command string) (*parser.ParseQueryBase, error) - IsQueryCommand(*parser.ParseQueryBase) bool - IsExecuteCommand(*parser.ParseQueryBase) bool + ParseCommand(command string) (*ParseQueryBase, error) + IsQueryCommand(*ParseQueryBase) bool + IsExecuteCommand(*ParseQueryBase) bool User() string Password() string } diff --git a/dbm-services/mysql/db-remote-service/pkg/parser/parser.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/parser.go similarity index 93% rename from dbm-services/mysql/db-remote-service/pkg/parser/parser.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_core/parser.go index 39cfb2d392..edd56b9d72 100644 --- a/dbm-services/mysql/db-remote-service/pkg/parser/parser.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_core/parser.go @@ -1,5 +1,5 @@ // Package parser sql解析 -package parser +package rpc_core // ParseQueryBase query result base field type ParseQueryBase struct { diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_core/rpc_core.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/rpc_core.go deleted file mode 100644 index 1e5669c967..0000000000 --- a/dbm-services/mysql/db-remote-service/pkg/rpc_core/rpc_core.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package rpc_core rpc 核心实现 -package rpc_core diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_core/run.go b/dbm-services/mysql/db-remote-service/pkg/rpc_core/run.go index d6268fb337..943b7a99d2 100644 --- a/dbm-services/mysql/db-remote-service/pkg/rpc_core/run.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_core/run.go @@ -8,8 +8,8 @@ import ( ) // Run 执行 -func (c *RPCWrapper) Run() (res []oneAddressResult) { - addrResChan := make(chan oneAddressResult) +func (c *RPCWrapper) Run() (res []OneAddressResultType) { + addrResChan := make(chan OneAddressResultType) tokenBulkChan := make(chan struct{}, config.RuntimeConfig.Concurrent) slog.Debug("init bulk chan", slog.Int("concurrent", config.RuntimeConfig.Concurrent)) @@ -27,7 +27,7 @@ func (c *RPCWrapper) Run() (res []oneAddressResult) { if err != nil { errMsg = err.Error() } - addrResChan <- oneAddressResult{ + addrResChan <- OneAddressResultType{ Address: address, CmdResults: addrRes, ErrorMsg: errMsg, diff --git a/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_complex_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_complex_rpc/init.go new file mode 100644 index 0000000000..91fd8abbea --- /dev/null +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_complex_rpc/init.go @@ -0,0 +1,7 @@ +package mysql_complex_rpc + +import "github.com/gin-gonic/gin" + +func Handler(c *gin.Context) { + +} diff --git a/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/embed.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc/embed.go similarity index 92% rename from dbm-services/mysql/db-remote-service/pkg/mysql_rpc/embed.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc/embed.go index 4505cce297..af0407b6e7 100644 --- a/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/embed.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc/embed.go @@ -12,7 +12,7 @@ package mysql_rpc import ( "dbm-services/mysql/db-remote-service/pkg/config" - "dbm-services/mysql/db-remote-service/pkg/parser" + "dbm-services/mysql/db-remote-service/pkg/rpc_core" "fmt" "log/slog" "net/url" @@ -92,8 +92,8 @@ func (c *MySQLRPCEmbed) MakeConnection(address string, user string, password str } // ParseCommand mysql 解析命令 -func (c *MySQLRPCEmbed) ParseCommand(command string) (*parser.ParseQueryBase, error) { - return &parser.ParseQueryBase{ +func (c *MySQLRPCEmbed) ParseCommand(command string) (*rpc_core.ParseQueryBase, error) { + return &rpc_core.ParseQueryBase{ QueryId: 0, Command: command, ErrorCode: 0, @@ -102,12 +102,12 @@ func (c *MySQLRPCEmbed) ParseCommand(command string) (*parser.ParseQueryBase, er } // IsQueryCommand mysql 解析命令 -func (c *MySQLRPCEmbed) IsQueryCommand(pc *parser.ParseQueryBase) bool { +func (c *MySQLRPCEmbed) IsQueryCommand(pc *rpc_core.ParseQueryBase) bool { return isQueryCommand(pc.Command) } // IsExecuteCommand mysql 解析命令 -func (c *MySQLRPCEmbed) IsExecuteCommand(pc *parser.ParseQueryBase) bool { +func (c *MySQLRPCEmbed) IsExecuteCommand(pc *rpc_core.ParseQueryBase) bool { return !isQueryCommand(pc.Command) } diff --git a/dbm-services/mysql/db-remote-service/pkg/mysql_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc/init.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/mysql_rpc/init.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc/init.go diff --git a/dbm-services/mysql/db-remote-service/pkg/proxy_rpc/proxy_rpc.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/proxy_rpc/proxy_rpc.go similarity index 86% rename from dbm-services/mysql/db-remote-service/pkg/proxy_rpc/proxy_rpc.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/proxy_rpc/proxy_rpc.go index f3302cae59..a7d692a02b 100644 --- a/dbm-services/mysql/db-remote-service/pkg/proxy_rpc/proxy_rpc.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/proxy_rpc/proxy_rpc.go @@ -3,7 +3,7 @@ package proxy_rpc import ( "dbm-services/mysql/db-remote-service/pkg/config" - "dbm-services/mysql/db-remote-service/pkg/parser" + "dbm-services/mysql/db-remote-service/pkg/rpc_core" "fmt" "log/slog" "strings" @@ -27,8 +27,8 @@ type ProxyRPCEmbed struct { } // ParseCommand proxy 解析命令 -func (c *ProxyRPCEmbed) ParseCommand(command string) (*parser.ParseQueryBase, error) { - return &parser.ParseQueryBase{ +func (c *ProxyRPCEmbed) ParseCommand(command string) (*rpc_core.ParseQueryBase, error) { + return &rpc_core.ParseQueryBase{ QueryId: 0, Command: command, ErrorCode: 0, @@ -73,7 +73,7 @@ func (c *ProxyRPCEmbed) MakeConnection(address string, user string, password str } // IsQueryCommand proxy 解析命令 -func (c *ProxyRPCEmbed) IsQueryCommand(pc *parser.ParseQueryBase) bool { +func (c *ProxyRPCEmbed) IsQueryCommand(pc *rpc_core.ParseQueryBase) bool { for _, ele := range proxyQueryParseCommands { if strings.HasPrefix(strings.ToLower(pc.Command), ele) { return true @@ -84,7 +84,7 @@ func (c *ProxyRPCEmbed) IsQueryCommand(pc *parser.ParseQueryBase) bool { } // IsExecuteCommand proxy 解析命令 -func (c *ProxyRPCEmbed) IsExecuteCommand(pc *parser.ParseQueryBase) bool { +func (c *ProxyRPCEmbed) IsExecuteCommand(pc *rpc_core.ParseQueryBase) bool { for _, ele := range proxyExecuteParseCommands { if strings.HasPrefix(strings.ToLower(pc.Command), ele) { return true diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/client.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/client.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/client.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/client.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/common.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/common.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/common.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/init.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/init.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/init.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_cli.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/redis_cli.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_cli.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/redis_cli.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/redis_rpc.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/redis_rpc.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/redis_rpc.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/twemproxy_rpc.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/twemproxy_rpc.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/twemproxy_rpc.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/twemproxy_rpc.go diff --git a/dbm-services/mysql/db-remote-service/pkg/redis_rpc/webconsole.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/webconsole.go similarity index 100% rename from dbm-services/mysql/db-remote-service/pkg/redis_rpc/webconsole.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc/webconsole.go diff --git a/dbm-services/mysql/db-remote-service/pkg/sqlserver_rpc/sqlserver_rpc.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/sqlserver_rpc/sqlserver_rpc.go similarity index 86% rename from dbm-services/mysql/db-remote-service/pkg/sqlserver_rpc/sqlserver_rpc.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/sqlserver_rpc/sqlserver_rpc.go index 48e0730fed..fb9edafae7 100644 --- a/dbm-services/mysql/db-remote-service/pkg/sqlserver_rpc/sqlserver_rpc.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/sqlserver_rpc/sqlserver_rpc.go @@ -2,13 +2,13 @@ package sqlserver_rpc import ( "context" + "dbm-services/mysql/db-remote-service/pkg/rpc_core" "fmt" "log/slog" "strings" "time" "dbm-services/mysql/db-remote-service/pkg/config" - "dbm-services/mysql/db-remote-service/pkg/parser" _ "github.com/denisenkom/go-mssqldb" // go-mssqldb TODO "github.com/jmoiron/sqlx" @@ -39,8 +39,8 @@ type SqlserverRPCEmbed struct { } // ParseCommand sqlserver 解析命令 -func (c *SqlserverRPCEmbed) ParseCommand(command string) (*parser.ParseQueryBase, error) { - return &parser.ParseQueryBase{ +func (c *SqlserverRPCEmbed) ParseCommand(command string) (*rpc_core.ParseQueryBase, error) { + return &rpc_core.ParseQueryBase{ QueryId: 0, Command: command, ErrorCode: 0, @@ -76,7 +76,7 @@ func (c *SqlserverRPCEmbed) MakeConnection(address string, user string, password } // IsQueryCommand sqlserver 解析命令 -func (c *SqlserverRPCEmbed) IsQueryCommand(pc *parser.ParseQueryBase) bool { +func (c *SqlserverRPCEmbed) IsQueryCommand(pc *rpc_core.ParseQueryBase) bool { for _, ele := range sqlserverQueryParseCommands { if strings.HasPrefix(strings.ToLower(pc.Command), ele) { return true @@ -87,7 +87,7 @@ func (c *SqlserverRPCEmbed) IsQueryCommand(pc *parser.ParseQueryBase) bool { } // IsExecuteCommand sqlserver 解析命令 -func (c *SqlserverRPCEmbed) IsExecuteCommand(pc *parser.ParseQueryBase) bool { +func (c *SqlserverRPCEmbed) IsExecuteCommand(pc *rpc_core.ParseQueryBase) bool { for _, ele := range sqlserverExecuteParseCommands { if strings.HasPrefix(strings.ToLower(pc.Command), ele) { return true diff --git a/dbm-services/mysql/db-remote-service/pkg/webconsole_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/webconsole_rpc/init.go similarity index 82% rename from dbm-services/mysql/db-remote-service/pkg/webconsole_rpc/init.go rename to dbm-services/mysql/db-remote-service/pkg/rpc_implement/webconsole_rpc/init.go index bbf95a1c34..092de33c72 100644 --- a/dbm-services/mysql/db-remote-service/pkg/webconsole_rpc/init.go +++ b/dbm-services/mysql/db-remote-service/pkg/rpc_implement/webconsole_rpc/init.go @@ -2,7 +2,7 @@ package webconsole_rpc import ( "dbm-services/mysql/db-remote-service/pkg/config" - "dbm-services/mysql/db-remote-service/pkg/mysql_rpc" + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc" ) type WebConsoleRPC struct { diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_parser.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_parser.go deleted file mode 100644 index 004ac07141..0000000000 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_parser.go +++ /dev/null @@ -1,39 +0,0 @@ -package service - -//type parseRequest struct { -// Statements string `form:"statements" json:"statements"` -//} - -//// parseHandler parser 服务 -//func parseHandler(c *gin.Context) { -// var req parseRequest -// if err := c.ShouldBindJSON(&req); err != nil { -// c.JSON( -// http.StatusBadRequest, gin.H{ -// "code": 1, -// "data": "", -// "msg": err.Error(), -// }, -// ) -// return -// } -// -// resp, err := parser.Parse(req.Statements) -// if err != nil { -// c.JSON( -// http.StatusInternalServerError, gin.H{ -// "code": 1, -// "data": "", -// "msg": err.Error(), -// }, -// ) -// } -// -// c.JSON( -// http.StatusOK, gin.H{ -// "code": 0, -// "data": resp, -// "msg": "", -// }, -// ) -//} diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/handler_rpc.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/handler_rpc.go deleted file mode 100644 index 23cd9d7e1f..0000000000 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/handler_rpc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package handler_rpc proxy rpc -package handler_rpc diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/init.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/init.go index 41ab23f62c..292440940c 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/init.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/init.go @@ -15,6 +15,7 @@ type queryRequest struct { // TrimSpace delete space around address func (r *queryRequest) TrimSpace() { + r.Timezone = strings.TrimSpace(r.Timezone) for idx, val := range r.Addresses { r.Addresses[idx] = strings.TrimSpace(val) } diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql.go index 9b6d04a8b5..8b255fb18a 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql.go @@ -1,6 +1,8 @@ package handler_rpc -import "dbm-services/mysql/db-remote-service/pkg/mysql_rpc" +import ( + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc" +) // MySQLRPCHandler mysql 请求响应 var MySQLRPCHandler = generalHandler(&mysql_rpc.MySQLRPCEmbed{}) diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql_complex.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql_complex.go new file mode 100644 index 0000000000..753d975f58 --- /dev/null +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/mysql_complex.go @@ -0,0 +1,115 @@ +package handler_rpc + +import ( + "dbm-services/mysql/db-remote-service/pkg/config" + "dbm-services/mysql/db-remote-service/pkg/rpc_core" + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/mysql_rpc" + "fmt" + "log/slog" + "net/http" + "strings" + "sync" + + "github.com/gin-gonic/gin" +) + +func MySQLComplexHandler(c *gin.Context) { + var postRequests []queryRequest + if err := c.ShouldBindJSON(&postRequests); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "data": "", + "error": err.Error(), + }) + return + } + slog.Info( + "enter mysql complex rpc handler", + slog.Any("original post requests", postRequests), + ) + + var allDupAddr []string + for _, postReq := range postRequests { + postReq.TrimSpace() + if len(postReq.Timezone) == 0 { + postReq.Timezone = config.RuntimeConfig.Timezone + } + if postReq.ConnectTimeout <= 0 { + postReq.ConnectTimeout = 2 + } + if postReq.QueryTimeout <= 0 { + postReq.QueryTimeout = 600 + } + + dupAddrs := findDuplicateAddresses(postReq.Addresses) + slog.Info("duplicate address", slog.String("addresses", strings.Join(dupAddrs, ","))) + + if len(dupAddrs) > 0 { + allDupAddr = append(allDupAddr, dupAddrs...) + } + } + if len(allDupAddr) > 0 { + c.JSON( + http.StatusBadRequest, gin.H{ + "code": 1, + "data": "", + "msg": fmt.Sprintf("duplicate addresses %s in some sub request", allDupAddr), + }, + ) + } + + slog.Info( + "enter mysql complex rpc handler", + slog.Any("fill default post requests", postRequests), + ) + + var respCollect []rpc_core.OneAddressResultType + var respChan = make(chan rpc_core.OneAddressResultType) + var quitChange = make(chan int) + var bucketChan = make(chan int, 30) + go func() { + wg := sync.WaitGroup{} + wg.Add(len(postRequests)) + + for _, postReq := range postRequests { + bucketChan <- 1 + go func(postReq queryRequest) { + defer func() { + <-bucketChan + wg.Done() + }() + rpcWrapper := rpc_core.NewRPCWrapper( + postReq.Addresses, postReq.Cmds, + config.RuntimeConfig.MySQLAdminUser, config.RuntimeConfig.MySQLAdminPassword, + postReq.ConnectTimeout, postReq.QueryTimeout, postReq.Timezone, postReq.Force, + &mysql_rpc.MySQLRPCEmbed{}, + ) + + for _, r := range rpcWrapper.Run() { + slog.Info("send response", slog.Any("result", r)) + respChan <- r + } + }(postReq) + } + + wg.Wait() + quitChange <- 1 + }() + + for { + select { + case r := <-respChan: + slog.Info("collected response", slog.Any("response", r)) + respCollect = append(respCollect, r) + case <-quitChange: + slog.Info("finish", slog.Any("response", respCollect)) + c.JSON( + http.StatusOK, gin.H{ + "code": 0, + "data": respCollect, + "msg": "", + }) + return + } + } +} diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/proxy.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/proxy.go index fc2cfceb1a..c950784f31 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/proxy.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/proxy.go @@ -1,7 +1,7 @@ package handler_rpc import ( - "dbm-services/mysql/db-remote-service/pkg/proxy_rpc" + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/proxy_rpc" ) // ProxyRPCHandler proxy 请求响应 diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/redis.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/redis.go index d5f153c46a..6e61091969 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/redis.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/redis.go @@ -1,9 +1,11 @@ package handler_rpc -import "dbm-services/mysql/db-remote-service/pkg/redis_rpc" +import ( + redis_rpc2 "dbm-services/mysql/db-remote-service/pkg/rpc_implement/redis_rpc" +) // RedisRPCHandler TODO -var RedisRPCHandler = redis_rpc.NewRedisRPCEmbed().DoCommand +var RedisRPCHandler = redis_rpc2.NewRedisRPCEmbed().DoCommand // TwemproxyRPCHandler TODO -var TwemproxyRPCHandler = redis_rpc.NewTwemproxyRPCEmbed().DoCommand +var TwemproxyRPCHandler = redis_rpc2.NewTwemproxyRPCEmbed().DoCommand diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/sqlserver.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/sqlserver.go index 758c97741b..ede9479511 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/sqlserver.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/sqlserver.go @@ -1,7 +1,7 @@ package handler_rpc import ( - "dbm-services/mysql/db-remote-service/pkg/sqlserver_rpc" + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/sqlserver_rpc" ) // SqlserverRPCHandler TODO diff --git a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/webconsole.go b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/webconsole.go index 8144c73468..5eef2148f6 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/webconsole.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/handler_rpc/webconsole.go @@ -1,5 +1,7 @@ package handler_rpc -import "dbm-services/mysql/db-remote-service/pkg/webconsole_rpc" +import ( + "dbm-services/mysql/db-remote-service/pkg/rpc_implement/webconsole_rpc" +) var WebConsoleRPCHandler = generalHandler(&webconsole_rpc.WebConsoleRPC{}) diff --git a/dbm-services/mysql/db-remote-service/pkg/service/router.go b/dbm-services/mysql/db-remote-service/pkg/service/router.go index daf135bafa..727257fec2 100644 --- a/dbm-services/mysql/db-remote-service/pkg/service/router.go +++ b/dbm-services/mysql/db-remote-service/pkg/service/router.go @@ -10,6 +10,8 @@ import ( func RegisterRouter(engine *gin.Engine) { mysqlGroup := engine.Group("/mysql") mysqlGroup.POST("/rpc", handler_rpc.MySQLRPCHandler) + // 复杂接口的设计和其他的都不一样, 所以只能直接添加 handler, 不太好复用 rpc interface + mysqlGroup.POST("/complex-rpc", handler_rpc.MySQLComplexHandler) proxyGroup := engine.Group("/proxy-admin") proxyGroup.POST("/rpc", handler_rpc.ProxyRPCHandler) diff --git a/dbm-services/mysql/db-remote-service/pkg/service/service.go b/dbm-services/mysql/db-remote-service/pkg/service/service.go deleted file mode 100644 index 7396d459be..0000000000 --- a/dbm-services/mysql/db-remote-service/pkg/service/service.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package service 服务 -package service