From 7444be40295ad7a2487532de88af0b69345503c0 Mon Sep 17 00:00:00 2001 From: 17110595 <64948939@qq.com> Date: Fri, 22 Jun 2018 17:27:14 +0800 Subject: [PATCH] feature_support_multi-endpoints --- core/core.go | 2 +- netmaster/daemon/daemon.go | 8 +++---- netmaster/docknet/docknet_test.go | 4 ++-- netmaster/k8snetwork/networkpolicy_test.go | 4 ++-- netmaster/main.go | 2 +- netmaster/objApi/objapi_test.go | 4 ++-- netplugin/agent/agent.go | 2 +- netplugin/agent/state_event.go | 16 ++++++------- state/consulstatedriver.go | 4 ++-- state/etcdstatedriver.go | 27 +++++++++++++--------- test/integration/npcluster_test.go | 22 +++++++++++++++--- utils/configs.go | 11 +++++---- 12 files changed, 64 insertions(+), 42 deletions(-) diff --git a/core/core.go b/core/core.go index 1b23f8e7c..fff92a6f9 100755 --- a/core/core.go +++ b/core/core.go @@ -83,7 +83,7 @@ type InstanceInfo struct { RouterIP string `json:"router-ip"` FwdMode string `json:"fwd-mode"` ArpMode string `json:"arp-mode"` - DbURL string `json:"db-url"` + DbURL []string `json:"db-url"` PluginMode string `json:"plugin-mode"` HostPvtNW int `json:"host-pvt-nw"` VxlanUDPPort int `json:"vxlan-port"` diff --git a/netmaster/daemon/daemon.go b/netmaster/daemon/daemon.go index 41b4c88a0..58f0a4767 100755 --- a/netmaster/daemon/daemon.go +++ b/netmaster/daemon/daemon.go @@ -50,7 +50,7 @@ type MasterDaemon struct { ListenURL string // URL where netmaster listens for ext requests ControlURL string // URL where netmaster listens for ctrl pkts ClusterStoreDriver string // state store driver name - ClusterStoreURL string // state store endpoint + ClusterStoreURL []string // state store endpoint ClusterMode string // cluster scheduler used docker/kubernetes/mesos etc NetworkMode string // network mode (vlan or vxlan) NetForwardMode string // forwarding mode (bridge or routing) @@ -81,7 +81,7 @@ func (d *MasterDaemon) Init() { // initialize state driver d.stateDriver, err = utils.NewStateDriver(d.ClusterStoreDriver, &core.InstanceInfo{DbURL: d.ClusterStoreURL}) if err != nil { - log.Fatalf("Failed to init state-store: driver %q, URLs %q. Error: %s", d.ClusterStoreDriver, d.ClusterStoreURL, err) + log.Fatalf("Failed to init state-store: driver %q, URLs %v. Error: %s", d.ClusterStoreDriver, d.ClusterStoreURL, err) } // Initialize resource manager @@ -91,9 +91,9 @@ func (d *MasterDaemon) Init() { } // Create an objdb client - d.objdbClient, err = objdb.InitClient(d.ClusterStoreDriver, []string{d.ClusterStoreURL}) + d.objdbClient, err = objdb.InitClient(d.ClusterStoreDriver, d.ClusterStoreURL) if err != nil { - log.Fatalf("Error connecting to state store: driver %q, URLs %q. Err: %v", d.ClusterStoreDriver, d.ClusterStoreURL, err) + log.Fatalf("Error connecting to state store: driver %q, URLs %v. Err: %v", d.ClusterStoreDriver, d.ClusterStoreURL, err) } } diff --git a/netmaster/docknet/docknet_test.go b/netmaster/docknet/docknet_test.go index 17b16aaf1..a8bd8127c 100644 --- a/netmaster/docknet/docknet_test.go +++ b/netmaster/docknet/docknet_test.go @@ -31,9 +31,9 @@ import ( // initStateDriver initialize etcd state driver func initStateDriver() (core.StateDriver, error) { - instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"} + instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}} - return utils.NewStateDriver(utils.EtcdNameStr, &instInfo) + return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil } // getDocknetState gets docknet oper state diff --git a/netmaster/k8snetwork/networkpolicy_test.go b/netmaster/k8snetwork/networkpolicy_test.go index 36ece6f1a..da64c53fc 100644 --- a/netmaster/k8snetwork/networkpolicy_test.go +++ b/netmaster/k8snetwork/networkpolicy_test.go @@ -112,9 +112,9 @@ const ( // initStateDriver initialize etcd state driver func initStateDriver() (core.StateDriver, error) { - instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"} + instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}} - return utils.NewStateDriver(utils.EtcdNameStr, &instInfo) + return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil } // cleanupState cleans up default tenant and other global state diff --git a/netmaster/main.go b/netmaster/main.go index 9c5dfcc14..eff648c07 100755 --- a/netmaster/main.go +++ b/netmaster/main.go @@ -107,7 +107,7 @@ func initNetMaster(ctx *cli.Context) (*daemon.MasterDaemon, error) { ListenURL: externalAddress, ControlURL: internalAddress, ClusterStoreDriver: dbConfigs.StoreDriver, - ClusterStoreURL: dbConfigs.StoreURL, //TODO: support more than one url + ClusterStoreURL: dbConfigs.StoreURL, ClusterMode: netConfigs.Mode, NetworkMode: netConfigs.NetworkMode, NetForwardMode: netConfigs.ForwardMode, diff --git a/netmaster/objApi/objapi_test.go b/netmaster/objApi/objapi_test.go index 60b0cc165..41981c189 100644 --- a/netmaster/objApi/objapi_test.go +++ b/netmaster/objApi/objapi_test.go @@ -53,9 +53,9 @@ var stateStore core.StateDriver // initStateDriver initialize etcd state driver func initStateDriver() (core.StateDriver, error) { - instInfo := core.InstanceInfo{DbURL: "etcd://127.0.0.1:2379"} + instInfo := core.InstanceInfo{DbURL: []string{"etcd://127.0.0.1:2379"}} - return utils.NewStateDriver(utils.EtcdNameStr, &instInfo) + return utils.NewStateDriver(utils.EtcdNameStr, &instInfo),nil } type restAPIFunc func(r *http.Request) (interface{}, error) diff --git a/netplugin/agent/agent.go b/netplugin/agent/agent.go index a6240d760..caa326300 100644 --- a/netplugin/agent/agent.go +++ b/netplugin/agent/agent.go @@ -50,7 +50,7 @@ func NewAgent(pluginConfig *plugin.Config) *Agent { netPlugin := &plugin.NetPlugin{} // init cluster state - err := cluster.Init(pluginConfig.Drivers.State, []string{opts.DbURL}) + err := cluster.Init(pluginConfig.Drivers.State, opts.DbURL) if err != nil { log.Fatalf("Error initializing cluster. Err: %v", err) } diff --git a/netplugin/agent/state_event.go b/netplugin/agent/state_event.go index e6d1db05d..fbce730be 100644 --- a/netplugin/agent/state_event.go +++ b/netplugin/agent/state_event.go @@ -350,11 +350,11 @@ func processEpgEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, ID str func processReinit(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, newCfg *mastercfg.GlobConfig) { // parse store URL - parts := strings.Split(opts.DbURL, "://") - if len(parts) < 2 { - log.Fatalf("Invalid cluster-store-url %s", opts.DbURL) + //parts := strings.Split(opts.DbURL, "://") + if len(opts.DbURL) == 0 { + log.Fatalf("Invalid cluster-store-url %v", opts.DbURL) } - stateStore := parts[0] + stateStore := strings.Split(opts.DbURL[0], "://")[0] // initialize the config pluginConfig := plugin.Config{ Drivers: plugin.Drivers{ @@ -412,11 +412,11 @@ func processGlobalConfigUpdEvent(netPlugin *plugin.NetPlugin, opts core.Instance func processARPModeChange(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, arpMode string) { // parse store URL - parts := strings.Split(opts.DbURL, "://") - if len(parts) < 2 { - log.Fatalf("Invalid cluster-store-url %s", opts.DbURL) + //parts := strings.Split(opts.DbURL, "://") + if len(opts.DbURL) == 0 { + log.Fatalf("Invalid cluster-store-url %v", opts.DbURL) } - stateStore := parts[0] + stateStore := strings.Split(opts.DbURL[0], "://")[0] // initialize the config pluginConfig := plugin.Config{ Drivers: plugin.Drivers{ diff --git a/state/consulstatedriver.go b/state/consulstatedriver.go index eec4f1025..7b1775186 100644 --- a/state/consulstatedriver.go +++ b/state/consulstatedriver.go @@ -48,10 +48,10 @@ func (d *ConsulStateDriver) Init(instInfo *core.InstanceInfo) error { var err error var endpoint *url.URL - if instInfo == nil || instInfo.DbURL == "" { + if instInfo == nil || len(instInfo.DbURL) == 0 { return errors.New("no consul config found") } - endpoint, err = url.Parse(instInfo.DbURL) + endpoint, err = url.Parse(instInfo.DbURL[0]) if err != nil { return err } diff --git a/state/etcdstatedriver.go b/state/etcdstatedriver.go index 8012d046f..4aaf542fd 100644 --- a/state/etcdstatedriver.go +++ b/state/etcdstatedriver.go @@ -55,21 +55,26 @@ func (d *EtcdStateDriver) Init(instInfo *core.InstanceInfo) error { var err error var endpoint *url.URL - if instInfo == nil || instInfo.DbURL == "" { + if instInfo == nil || len(instInfo.DbURL) == 0 { return errors.New("no etcd config found") } - endpoint, err = url.Parse(instInfo.DbURL) - if err != nil { - return err - } - if endpoint.Scheme == "etcd" { - endpoint.Scheme = "http" - } else if endpoint.Scheme != "http" && endpoint.Scheme != "https" { - return core.Errorf("invalid etcd URL scheme %q", endpoint.Scheme) + + for _,dburl := range instInfo.DbURL { + + endpoint, err = url.Parse(dburl) + if err != nil { + return err + } + + if endpoint.Scheme == "etcd" { + endpoint.Scheme = "http" + } else if endpoint.Scheme != "http" && endpoint.Scheme != "https" { + return core.Errorf("invalid etcd URL scheme %q", endpoint.Scheme) + } + } - // TODO: support multi-endpoints etcdConfig := client.Config{ - Endpoints: []string{endpoint.String()}, + Endpoints: instInfo.DbURL, } d.Client, err = client.New(etcdConfig) diff --git a/test/integration/npcluster_test.go b/test/integration/npcluster_test.go index 384878a01..02ecc6c97 100644 --- a/test/integration/npcluster_test.go +++ b/test/integration/npcluster_test.go @@ -18,7 +18,7 @@ package integration import ( "os" "time" - + "strings" log "github.com/Sirupsen/logrus" "github.com/contiv/netplugin/contivmodel" @@ -27,6 +27,9 @@ import ( "github.com/contiv/netplugin/netplugin/agent" "github.com/contiv/netplugin/netplugin/plugin" "github.com/contiv/netplugin/utils/netutils" + "github.com/contiv/netplugin/utils" + "net/url" + "fmt" ) // NPCluster holds a new neplugin/netmaster cluster stats @@ -40,6 +43,7 @@ type NPCluster struct { // NewNPCluster creates a new cluster of netplugin/netmaster func NewNPCluster(its *integTestSuite) (*NPCluster, error) { // get local host name + storeURL := []string{} hostLabel, err := os.Hostname() if err != nil { log.Fatalf("Failed to fetch hostname. Error: %s", err) @@ -51,13 +55,25 @@ func NewNPCluster(its *integTestSuite) (*NPCluster, error) { log.Fatalf("Error getting local address. Err: %v", err) } + + for _, endpoint := range utils.FilterEmpty(strings.Split(its.clusterStoreURL, ",")) { + _, err := url.Parse(endpoint) + if err != nil { + return nil, fmt.Errorf("invalid endpoint: %v", endpoint) + } + // TODO: support multi-endpoints + storeURL = append(storeURL,endpoint) + log.Infof("Using state db endpoints: %v", storeURL) + + } + // create master daemon md := &daemon.MasterDaemon{ ListenURL: "0.0.0.0:9999", ControlURL: "0.0.0.0:9999", ClusterMode: "test", ClusterStoreDriver: its.clusterStoreDriver, - ClusterStoreURL: its.clusterStoreURL, + ClusterStoreURL: storeURL, NetworkMode: its.encap, NetForwardMode: its.fwdMode, NetInfraType: its.fabricMode, @@ -73,7 +89,7 @@ func NewNPCluster(its *integTestSuite) (*NPCluster, error) { CtrlIP: localIP, VtepIP: localIP, UplinkIntf: []string{"eth2"}, - DbURL: its.clusterStoreURL, + DbURL: storeURL, PluginMode: "test", }, } diff --git a/utils/configs.go b/utils/configs.go index 97f546e6c..97800bbbf 100644 --- a/utils/configs.go +++ b/utils/configs.go @@ -73,7 +73,7 @@ func BuildDBFlags(binary string) []cli.Flag { // DBConfigs validated db configs type DBConfigs struct { StoreDriver string - StoreURL string + StoreURL []string } // BuildLogFlags CLI logging flags for given binary @@ -177,8 +177,9 @@ func InitLogging(binary string, ctx *cli.Context) error { // ValidateDBOptions returns error if db options are not valid func ValidateDBOptions(binary string, ctx *cli.Context) (*DBConfigs, error) { var storeDriver string - var storeURL string var storeURLs string + + storeURL := []string{} etcdURLs := ctx.String("etcd") consulURLs := ctx.String("consul") @@ -201,12 +202,12 @@ func ValidateDBOptions(binary string, ctx *cli.Context) (*DBConfigs, error) { return nil, fmt.Errorf("invalid %s %v endpoint: %v", binary, storeDriver, endpoint) } // TODO: support multi-endpoints - storeURL = endpoint + storeURL = append(storeURL,endpoint) logrus.Infof("Using %s state db endpoints: %v: %v", binary, storeDriver, storeURL) - break + } - if storeURL == "" { + if len(storeURL) == 0 { return nil, fmt.Errorf("invalid %s %s endpoints: empty", binary, storeDriver) }