diff --git a/hack/docker/redis-tools/redis-tools.sh b/hack/docker/redis-tools/redis-tools.sh index 0c157023b..72fe704b8 100644 --- a/hack/docker/redis-tools/redis-tools.sh +++ b/hack/docker/redis-tools/redis-tools.sh @@ -12,6 +12,7 @@ show_help() { echo "-h, --help show brief help" echo " --data-dir=DIR path to directory holding db data (default: /var/data)" echo " --host=HOST database host" + echo " --port=PORT database port" echo " --user=USERNAME database username" echo " --bucket=BUCKET name of bucket" echo " --location=LOCATION location of backend (:)" @@ -50,6 +51,10 @@ while test $# -gt 0; do export REDIS_HOST=$(echo $1 | sed -e 's/^[^=]*=//g') shift ;; + --port*) + export REDIS_PORT=$(echo $1 | sed -e 's/^[^=]*=//g') + shift + ;; --user*) export REDIS_USER=$(echo $1 | sed -e 's/^[^=]*=//g') shift @@ -108,8 +113,8 @@ case "$op" in # cleanup data dump dir rm -rf * - redis-cli --rdb dump.rdb -h "${REDIS_HOST}" -a "${REDIS_PASSWORD}" - redis-cli -h "${REDIS_HOST}" -a "${REDIS_PASSWORD}" CLUSTER NODES | grep myself > nodes.conf + redis-cli --rdb dump.rdb -h "${REDIS_HOST}" -p "${REDIS_PORT}" -a "${REDIS_PASSWORD}" + redis-cli -h "${REDIS_HOST}" -p "${REDIS_PORT}" -a "${REDIS_PASSWORD}" CLUSTER NODES | grep myself > nodes.conf pwd ls -lh "$SOURCE_DIR" echo "Uploading dump file to the backend......." diff --git a/make.Dockerfile b/make.Dockerfile new file mode 100644 index 000000000..41111ec95 --- /dev/null +++ b/make.Dockerfile @@ -0,0 +1,27 @@ +FROM golang:1.13.3 as go-builder + +RUN apt-get update && apt-get -y upgrade && \ + apt-get install -y ca-certificates git mercurial + +ARG PROJECT_NAME=redis-cluster-operator +ARG REPO_PATH=github.com/ucloud/$PROJECT_NAME +ARG BUILD_PATH=${REPO_PATH}/cmd/manager + +# Build version and commit should be passed in when performing docker build +ARG VERSION=0.1.1 +ARG GIT_SHA=0000000 + +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download + +COPY pkg ./ cmd ./ version ./ + +#RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o ${GOBIN}/${PROJECT_NAME} \ +# -ldflags "-X ${REPO_PATH}/version.Version=${VERSION} -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}" \ +# $BUILD_PATH + +COPY . ./ +RUN apt-get install -y bash git make cmake gcc openssh-client openssh-server +RUN apt-get install -y libc-dev +RUN make build \ No newline at end of file diff --git a/pkg/apis/redis/v1alpha1/default.go b/pkg/apis/redis/v1alpha1/default.go index 32f4dfed7..1583e0c21 100644 --- a/pkg/apis/redis/v1alpha1/default.go +++ b/pkg/apis/redis/v1alpha1/default.go @@ -16,6 +16,7 @@ const ( minClusterReplicas = 1 defaultRedisImage = "redis:5.0.4-alpine" defaultMonitorImage = "oliver006/redis_exporter:latest" + defaultClientPort = 6379 ) func (in *DistributedRedisCluster) DefaultSpec(log logr.Logger) bool { @@ -25,6 +26,16 @@ func (in *DistributedRedisCluster) DefaultSpec(log logr.Logger) bool { update = true } + if in.Spec.ClientPort == 0 { + in.Spec.ClientPort = defaultClientPort + update = true + } + + if in.Spec.GossipPort != (in.Spec.ClientPort + 10000) { + in.Spec.GossipPort = in.Spec.ClientPort + 10000 + update = true + } + if in.Spec.Image == "" { in.Spec.Image = defaultRedisImage update = true diff --git a/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go b/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go index bd78bca00..86c421aec 100644 --- a/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go +++ b/pkg/apis/redis/v1alpha1/distributedrediscluster_types.go @@ -25,18 +25,21 @@ type DistributedRedisClusterSpec struct { ServiceName string `json:"serviceName,omitempty"` Config map[string]string `json:"config,omitempty"` // Set RequiredAntiAffinity to force the master-slave node anti-affinity. - RequiredAntiAffinity bool `json:"requiredAntiAffinity,omitempty"` - Affinity *corev1.Affinity `json:"affinity,omitempty"` - NodeSelector map[string]string `json:"nodeSelector,omitempty"` - ToleRations []corev1.Toleration `json:"toleRations,omitempty"` - SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` - ContainerSecurityContext *corev1.SecurityContext `json:"containerSecurityContext,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - Storage *RedisStorage `json:"storage,omitempty"` - Resources *corev1.ResourceRequirements `json:"resources,omitempty"` - PasswordSecret *corev1.LocalObjectReference `json:"passwordSecret,omitempty"` - Monitor *AgentSpec `json:"monitor,omitempty"` - Init *InitSpec `json:"init,omitempty"` + RequiredAntiAffinity bool `json:"requiredAntiAffinity,omitempty"` + Affinity *corev1.Affinity `json:"affinity,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + ToleRations []corev1.Toleration `json:"toleRations,omitempty"` + SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` + ContainerSecurityContext *corev1.SecurityContext `json:"containerSecurityContext,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + Storage *RedisStorage `json:"storage,omitempty"` + Resources *corev1.ResourceRequirements `json:"resources,omitempty"` + PasswordSecret *corev1.LocalObjectReference `json:"passwordSecret,omitempty"` + Monitor *AgentSpec `json:"monitor,omitempty"` + Init *InitSpec `json:"init,omitempty"` + HostNetwork bool `json:"hostNetwork,omitempty"` + ClientPort int `json:"clientPort,omitempty"` + GossipPort int `json:"gossipPort,omitempty"` } type AgentSpec struct { diff --git a/pkg/apis/redis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/redis/v1alpha1/zz_generated.deepcopy.go index 16d8e247e..28d7a9bf0 100644 --- a/pkg/apis/redis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/redis/v1alpha1/zz_generated.deepcopy.go @@ -213,6 +213,7 @@ func (in *DistributedRedisClusterSpec) DeepCopyInto(out *DistributedRedisCluster *out = new(InitSpec) (*in).DeepCopyInto(*out) } + out.HostNetwork = in.HostNetwork return } diff --git a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go index cdac41db7..c3192e491 100644 --- a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go +++ b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go @@ -232,13 +232,13 @@ func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request) return reconcile.Result{}, Kubernetes.Wrap(err, "getClusterPassword") } - admin, err := newRedisAdmin(ctx.pods, password, config.RedisConf(), reqLogger) + admin, err := newRedisAdmin(ctx.pods, password, config.RedisConf(), reqLogger, instance.Spec.ClientPort) if err != nil { return reconcile.Result{}, Redis.Wrap(err, "newRedisAdmin") } defer admin.Close() - clusterInfos, err := admin.GetClusterInfos() + clusterInfos, err := admin.GetClusterInfos(ctx.cluster.Spec.ClientPort) if err != nil { if clusterInfos.Status == redisutil.ClusterInfosPartial { return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos") @@ -331,8 +331,7 @@ func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request) return reconcile.Result{}, err } } - - newClusterInfos, err := admin.GetClusterInfos() + newClusterInfos, err := admin.GetClusterInfos(ctx.cluster.Spec.ClientPort) if err != nil { if clusterInfos.Status == redisutil.ClusterInfosPartial { return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos") diff --git a/pkg/controller/distributedrediscluster/helper.go b/pkg/controller/distributedrediscluster/helper.go index 6df04bba7..7773c9841 100644 --- a/pkg/controller/distributedrediscluster/helper.go +++ b/pkg/controller/distributedrediscluster/helper.go @@ -3,6 +3,7 @@ package distributedrediscluster import ( "fmt" "net" + "strconv" "time" "github.com/go-logr/logr" @@ -29,10 +30,10 @@ func getLabels(cluster *redisv1alpha1.DistributedRedisCluster) map[string]string } // newRedisAdmin builds and returns new redis.Admin from the list of pods -func newRedisAdmin(pods []*corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger) (redisutil.IAdmin, error) { +func newRedisAdmin(pods []*corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger, clientPort int) (redisutil.IAdmin, error) { nodesAddrs := []string{} for _, pod := range pods { - redisPort := redisutil.DefaultRedisPort + redisPort := strconv.Itoa(clientPort) for _, container := range pod.Spec.Containers { if container.Name == "redis" { for _, port := range container.Ports { diff --git a/pkg/controller/distributedrediscluster/sync_handler.go b/pkg/controller/distributedrediscluster/sync_handler.go index 61af25e8b..9015ecd2a 100644 --- a/pkg/controller/distributedrediscluster/sync_handler.go +++ b/pkg/controller/distributedrediscluster/sync_handler.go @@ -161,7 +161,7 @@ func (r *ReconcileDistributedRedisCluster) initRestore(cluster *redisv1alpha1.Di } func (r *ReconcileDistributedRedisCluster) waitForClusterJoin(ctx *syncContext) error { - if infos, err := ctx.admin.GetClusterInfos(); err == nil { + if infos, err := ctx.admin.GetClusterInfos(ctx.cluster.Spec.ClientPort); err == nil { ctx.reqLogger.V(6).Info("debug waitForClusterJoin", "cluster infos", infos) return nil } @@ -180,7 +180,7 @@ func (r *ReconcileDistributedRedisCluster) waitForClusterJoin(ctx *syncContext) // the config as they are still empty with unassigned slots. time.Sleep(1 * time.Second) - _, err = ctx.admin.GetClusterInfos() + _, err = ctx.admin.GetClusterInfos(ctx.cluster.Spec.ClientPort) if err != nil { return Requeue.Wrap(err, "wait for cluster join") } @@ -278,7 +278,7 @@ func (r *ReconcileDistributedRedisCluster) scalingDown(ctx *syncContext, current if len(node.Slots) > 0 { return Redis.New(fmt.Sprintf("node %s is not empty! Reshard data away and try again", node.String())) } - if err := admin.ForgetNode(node.ID); err != nil { + if err := admin.ForgetNode(node.ID,ctx.cluster.Spec.ClientPort); err != nil { return Redis.Wrap(err, "ForgetNode") } } @@ -346,7 +346,7 @@ func (r *ReconcileDistributedRedisCluster) resetClusterPassword(ctx *syncContext } podSet := clusterPods(redisClusterPods.Items) - admin, err := newRedisAdmin(podSet, oldPassword, config.RedisConf(), ctx.reqLogger) + admin, err := newRedisAdmin(podSet, oldPassword, config.RedisConf(), ctx.reqLogger, ctx.cluster.Spec.ClientPort) if err != nil { return err } diff --git a/pkg/controller/heal/failednodes.go b/pkg/controller/heal/failednodes.go index c0e70ebf6..d425885f4 100644 --- a/pkg/controller/heal/failednodes.go +++ b/pkg/controller/heal/failednodes.go @@ -17,7 +17,7 @@ func (c *CheckAndHeal) FixFailedNodes(cluster *redisv1alpha1.DistributedRedisClu c.Logger.Info("[FixFailedNodes] Forgetting failed node, this command might fail, this is not an error", "node", id) if !c.DryRun { c.Logger.Info("[FixFailedNodes] try to forget node", "nodeId", id) - if err := admin.ForgetNode(id); err != nil { + if err := admin.ForgetNode(id, cluster.Spec.ClientPort); err != nil { errs = append(errs, err) } } diff --git a/pkg/controller/heal/untrustenodes.go b/pkg/controller/heal/untrustenodes.go index 3d6f45026..ed3c05c29 100644 --- a/pkg/controller/heal/untrustenodes.go +++ b/pkg/controller/heal/untrustenodes.go @@ -43,7 +43,7 @@ func (c *CheckAndHeal) FixUntrustedNodes(cluster *redisv1alpha1.DistributedRedis doneAnAction = true if !c.DryRun { c.Logger.Info("[FixUntrustedNodes] try to forget node", "nodeId", id) - if err := admin.ForgetNode(id); err != nil { + if err := admin.ForgetNode(id,cluster.Spec.ClientPort); err != nil { errs = append(errs, err) } } diff --git a/pkg/controller/redisclusterbackup/sync_handler.go b/pkg/controller/redisclusterbackup/sync_handler.go index ceb867279..29d571c77 100644 --- a/pkg/controller/redisclusterbackup/sync_handler.go +++ b/pkg/controller/redisclusterbackup/sync_handler.go @@ -326,6 +326,7 @@ func (r *ReconcileRedisClusterBackup) backupContainers(backup *redisv1alpha1.Red fmt.Sprintf(`--data-dir=%s`, redisv1alpha1.BackupDumpDir), fmt.Sprintf(`--location=%s`, location), fmt.Sprintf(`--host=%s`, node.IP), + fmt.Sprintf(`--port=%s`, node.Port), fmt.Sprintf(`--folder=%s`, folderName), fmt.Sprintf(`--snapshot=%s-%d`, backup.Name, i), "--", diff --git a/pkg/osm/osm.go b/pkg/osm/osm.go index 988185627..c35f3bda3 100644 --- a/pkg/osm/osm.go +++ b/pkg/osm/osm.go @@ -80,69 +80,75 @@ func NewOSMContext(client client.Client, spec api.Backend, namespace string) (*o if spec.S3 != nil { nc.Provider = s3.Kind - keyID, foundKeyID := config[awsconst.AWS_ACCESS_KEY_ID] key, foundKey := config[awsconst.AWS_SECRET_ACCESS_KEY] - if foundKey && foundKeyID { + if spec.S3.Endpoint == "" || spec.S3.Endpoint == "osm" { nc.Config[s3.ConfigAccessKeyID] = string(keyID) nc.Config[s3.ConfigSecretKey] = string(key) nc.Config[s3.ConfigAuthType] = "accesskey" + nc.Config[s3.ConfigRegion] = spec.S3.Region } else { - nc.Config[s3.ConfigAuthType] = "iam" - } - if spec.S3.Endpoint == "" || strings.HasSuffix(spec.S3.Endpoint, ".amazonaws.com") { - // Using s3 and not s3-compatible service like minio or rook, etc. Now, find region - var sess *session.Session - var err error - if nc.Config[s3.ConfigAuthType] == "iam" { - // The aws sdk does not currently support automatically setting the region based on an instances placement. - // This automatically sets region based on ec2 instance metadata when running on EC2. - // ref: https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/setting-region.html#setting-region-order-of-precedence - var c aws.Config - if s, e := session.NewSession(); e == nil { - if region, e := ec2metadata.New(s).Region(); e == nil { - c.WithRegion(region) + if foundKey && foundKeyID { + nc.Config[s3.ConfigAccessKeyID] = string(keyID) + nc.Config[s3.ConfigSecretKey] = string(key) + nc.Config[s3.ConfigAuthType] = "accesskey" + } else { + nc.Config[s3.ConfigAuthType] = "iam" + } + if spec.S3.Endpoint == "" || strings.HasSuffix(spec.S3.Endpoint, ".amazonaws.com") { + // Using s3 and not s3-compatible service like minio or rook, etc. Now, find region + var sess *session.Session + var err error + if nc.Config[s3.ConfigAuthType] == "iam" { + // The aws sdk does not currently support automatically setting the region based on an instances placement. + // This automatically sets region based on ec2 instance metadata when running on EC2. + // ref: https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/setting-region.html#setting-region-order-of-precedence + var c aws.Config + if s, e := session.NewSession(); e == nil { + if region, e := ec2metadata.New(s).Region(); e == nil { + c.WithRegion(region) + } } + sess, err = session.NewSessionWithOptions(session.Options{ + Config: c, + // Support MFA when authing using assumed roles. + SharedConfigState: session.SharedConfigEnable, + AssumeRoleTokenProvider: stscreds.StdinTokenProvider, + }) + } else { + sess, err = session.NewSessionWithOptions(session.Options{ + Config: aws.Config{ + Credentials: credentials.NewStaticCredentials(string(keyID), string(key), ""), + Region: aws.String("us-east-1"), + }, + // Support MFA when authing using assumed roles. + SharedConfigState: session.SharedConfigEnable, + AssumeRoleTokenProvider: stscreds.StdinTokenProvider, + }) } - sess, err = session.NewSessionWithOptions(session.Options{ - Config: c, - // Support MFA when authing using assumed roles. - SharedConfigState: session.SharedConfigEnable, - AssumeRoleTokenProvider: stscreds.StdinTokenProvider, + if err != nil { + return nil, err + } + svc := _s3.New(sess) + out, err := svc.GetBucketLocation(&_s3.GetBucketLocationInput{ + Bucket: types.StringP(spec.S3.Bucket), }) + if err != nil { + return nil, err + } + nc.Config[s3.ConfigRegion] = stringz.Val(types.String(out.LocationConstraint), "us-east-1") } else { - sess, err = session.NewSessionWithOptions(session.Options{ - Config: aws.Config{ - Credentials: credentials.NewStaticCredentials(string(keyID), string(key), ""), - Region: aws.String("us-east-1"), - }, - // Support MFA when authing using assumed roles. - SharedConfigState: session.SharedConfigEnable, - AssumeRoleTokenProvider: stscreds.StdinTokenProvider, - }) - } - if err != nil { - return nil, err - } - svc := _s3.New(sess) - out, err := svc.GetBucketLocation(&_s3.GetBucketLocationInput{ - Bucket: types.StringP(spec.S3.Bucket), - }) - if err != nil { - return nil, err - } - nc.Config[s3.ConfigRegion] = stringz.Val(types.String(out.LocationConstraint), "us-east-1") - } else { - nc.Config[s3.ConfigEndpoint] = spec.S3.Endpoint - u, err := url.Parse(spec.S3.Endpoint) - if err != nil { - return nil, err - } - nc.Config[s3.ConfigDisableSSL] = strconv.FormatBool(u.Scheme == "http") + nc.Config[s3.ConfigEndpoint] = spec.S3.Endpoint + u, err := url.Parse(spec.S3.Endpoint) + if err != nil { + return nil, err + } + nc.Config[s3.ConfigDisableSSL] = strconv.FormatBool(u.Scheme == "http") - cacertData, ok := config[awsconst.CA_CERT_DATA] - if ok && u.Scheme == "https" { - nc.Config[s3.ConfigCACertData] = string(cacertData) + cacertData, ok := config[awsconst.CA_CERT_DATA] + if ok && u.Scheme == "https" { + nc.Config[s3.ConfigCACertData] = string(cacertData) + } } } return nc, nil diff --git a/pkg/redisutil/admin.go b/pkg/redisutil/admin.go index af5e1455a..fc0d8f022 100644 --- a/pkg/redisutil/admin.go +++ b/pkg/redisutil/admin.go @@ -38,7 +38,7 @@ type IAdmin interface { // Close the admin connections Close() // GetClusterInfos get node infos for all nodes - GetClusterInfos() (*ClusterInfos, error) + GetClusterInfos(port int) (*ClusterInfos, error) // ClusterManagerNodeIsEmpty Checks whether the node is empty. Node is considered not-empty if it has // some key or if it already knows other nodes ClusterManagerNodeIsEmpty() (bool, error) @@ -56,7 +56,7 @@ type IAdmin interface { // DetachSlave dettach a slave to its master DetachSlave(slave *Node) error // ForgetNode execute the Redis command to force the cluster to forgot the the Node - ForgetNode(id string) error + ForgetNode(id string, port int) error // SetSlots exec the redis command to set slots in a pipeline, provide // and empty nodeID if the set slots commands doesn't take a nodeID in parameter SetSlots(addr string, action string, slots []Slot, nodeID string) error @@ -117,12 +117,12 @@ func (a *Admin) Close() { } // GetClusterInfos return the Nodes infos for all nodes -func (a *Admin) GetClusterInfos() (*ClusterInfos, error) { +func (a *Admin) GetClusterInfos(port int) (*ClusterInfos, error) { infos := NewClusterInfos() clusterErr := NewClusterInfosError() for addr, c := range a.Connections().GetAll() { - nodeinfos, err := a.getInfos(c, addr) + nodeinfos, err := a.getInfos(c, addr, port) if err != nil { a.log.WithValues("err", err).Info("get redis info failed") infos.Status = ClusterInfosPartial @@ -146,7 +146,7 @@ func (a *Admin) GetClusterInfos() (*ClusterInfos, error) { return infos, clusterErr } -func (a *Admin) getInfos(c IClient, addr string) (*NodeInfos, error) { +func (a *Admin) getInfos(c IClient, addr string, port int) (*NodeInfos, error) { resp := c.Cmd("CLUSTER", "NODES") if err := a.Connections().ValidateResp(resp, addr, "unable to retrieve node info"); err != nil { return nil, err @@ -160,7 +160,7 @@ func (a *Admin) getInfos(c IClient, addr string) (*NodeInfos, error) { return nil, fmt.Errorf("wrong format from CLUSTER NODES: %v", err) } - nodeInfos := DecodeNodeInfos(&raw, addr, a.log) + nodeInfos := DecodeNodeInfos(&raw, addr, a.log, port) return nodeInfos, nil } @@ -483,8 +483,8 @@ func (a *Admin) migrateCmdArgs(dest *Node, timeoutStr string, replace bool, keys } // ForgetNode used to force other redis cluster node to forget a specific node -func (a *Admin) ForgetNode(id string) error { - infos, _ := a.GetClusterInfos() +func (a *Admin) ForgetNode(id string,port int) error { + infos, _ := a.GetClusterInfos(port) for nodeAddr, nodeinfos := range infos.Infos { if nodeinfos.Node.ID == id { continue diff --git a/pkg/redisutil/clusterinfo.go b/pkg/redisutil/clusterinfo.go index 57b5fd097..3aebc2ebb 100644 --- a/pkg/redisutil/clusterinfo.go +++ b/pkg/redisutil/clusterinfo.go @@ -38,9 +38,9 @@ type ClusterInfos struct { } // NewNodeInfos returns an instance of NodeInfo -func NewNodeInfos() *NodeInfos { +func NewNodeInfos(port int) *NodeInfos { return &NodeInfos{ - Node: NewDefaultNode(), + Node: NewDefaultNode(port), Friends: Nodes{}, } } @@ -72,8 +72,8 @@ func DecodeNodeStartTime(input *string, log logr.Logger) (time.Time, error) { } // DecodeNodeInfos decode from the cmd output the Redis nodes info. Second argument is the node on which we are connected to request info -func DecodeNodeInfos(input *string, addr string, log logr.Logger) *NodeInfos { - infos := NewNodeInfos() +func DecodeNodeInfos(input *string, addr string, log logr.Logger, port int) *NodeInfos { + infos := NewNodeInfos(port) lines := strings.Split(*input, "\n") for _, line := range lines { values := strings.Split(line, " ") @@ -82,7 +82,7 @@ func DecodeNodeInfos(input *string, addr string, log logr.Logger) *NodeInfos { log.V(7).Info(fmt.Sprintf("not enough values in line split, ignoring line: '%s'", line)) continue } else { - node := NewDefaultNode() + node := NewDefaultNode(port) node.ID = values[0] //remove trailing port for cluster internal protocol diff --git a/pkg/redisutil/node.go b/pkg/redisutil/node.go index 9829ad400..97f45386e 100644 --- a/pkg/redisutil/node.go +++ b/pkg/redisutil/node.go @@ -5,18 +5,15 @@ import ( "fmt" "net" "sort" + "strconv" "strings" "time" - corev1 "k8s.io/api/core/v1" - redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1" "github.com/ucloud/redis-cluster-operator/pkg/utils" ) const ( - // DefaultRedisPort define the default Redis Port - DefaultRedisPort = "6379" // RedisMasterRole redis role master RedisMasterRole = "master" // RedisSlaveRole redis role slave @@ -79,26 +76,15 @@ func (n Nodes) String() string { } // NewDefaultNode builds and returns new defaultNode instance -func NewDefaultNode() *Node { +func NewDefaultNode(port int) *Node { return &Node{ - Port: DefaultRedisPort, + Port: strconv.Itoa(port), Slots: []Slot{}, MigratingSlots: map[Slot]string{}, ImportingSlots: map[Slot]string{}, } } -// NewNode builds and returns new Node instance -func NewNode(id, ip string, pod *corev1.Pod) *Node { - node := NewDefaultNode() - node.ID = id - node.IP = ip - node.PodName = pod.Name - node.NodeName = pod.Spec.NodeName - - return node -} - // SetRole from a flags string list set the Node's role func (n *Node) SetRole(flags string) error { n.Role = "" // reset value before setting the new one diff --git a/pkg/resources/configmaps/configmap.go b/pkg/resources/configmaps/configmap.go index 97735cfd2..64f06fb08 100644 --- a/pkg/resources/configmaps/configmap.go +++ b/pkg/resources/configmaps/configmap.go @@ -27,13 +27,13 @@ failover() { echo "Do CLUSTER FAILOVER" masterID=$(cat ${CLUSTER_CONFIG} | grep "myself" | awk '{print $1}') echo "Master: ${masterID}" - slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:6379@16379//') + slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:`+strconv.Itoa(cluster.Spec.ClientPort)+`@`+strconv.Itoa(cluster.Spec.GossipPort)+`//') echo "Slave: ${slave}" password=$(cat /data/redis_password) if [[ -z "${password}" ]]; then - redis-cli -h ${slave} CLUSTER FAILOVER + redis-cli -h ${slave} -p `+strconv.Itoa(cluster.Spec.ClientPort)+` CLUSTER FAILOVER else - redis-cli -h ${slave} -a "${password}" CLUSTER FAILOVER + redis-cli -h ${slave} -p `+strconv.Itoa(cluster.Spec.ClientPort)+` -a "${password}" CLUSTER FAILOVER fi echo "Wait for MASTER <-> SLAVE syncFinished" sleep 20 @@ -53,7 +53,7 @@ if [ -f ${CLUSTER_CONFIG} ]; then exit 1 fi echo "Updating my IP to ${POD_IP} in ${CLUSTER_CONFIG}" - sed -i.bak -e "/myself/ s/ .*:6379@16379/ ${POD_IP}:6379@16379/" ${CLUSTER_CONFIG} + sed -i.bak -e "/myself/ s/ .*:`+strconv.Itoa(cluster.Spec.ClientPort)+`@`+strconv.Itoa(cluster.Spec.GossipPort)+`/ ${POD_IP}:`+strconv.Itoa(cluster.Spec.ClientPort)+`@`+strconv.Itoa(cluster.Spec.GossipPort)+`/" ${CLUSTER_CONFIG} fi exec "$@"` diff --git a/pkg/resources/services/service.go b/pkg/resources/services/service.go index af97d33ca..4befff0a9 100644 --- a/pkg/resources/services/service.go +++ b/pkg/resources/services/service.go @@ -9,8 +9,8 @@ import ( // NewHeadLessSvcForCR creates a new headless service for the given Cluster. func NewHeadLessSvcForCR(cluster *redisv1alpha1.DistributedRedisCluster, name string, labels map[string]string) *corev1.Service { - clientPort := corev1.ServicePort{Name: "client", Port: 6379} - gossipPort := corev1.ServicePort{Name: "gossip", Port: 16379} + clientPort := corev1.ServicePort{Name: "client", Port: int32(cluster.Spec.ClientPort)} + gossipPort := corev1.ServicePort{Name: "gossip", Port: int32(cluster.Spec.GossipPort)} svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, @@ -30,8 +30,8 @@ func NewHeadLessSvcForCR(cluster *redisv1alpha1.DistributedRedisCluster, name st func NewSvcForCR(cluster *redisv1alpha1.DistributedRedisCluster, name string, labels map[string]string) *corev1.Service { var ports []corev1.ServicePort - clientPort := corev1.ServicePort{Name: "client", Port: 6379} - gossipPort := corev1.ServicePort{Name: "gossip", Port: 16379} + clientPort := corev1.ServicePort{Name: "client", Port: int32(cluster.Spec.ClientPort)} + gossipPort := corev1.ServicePort{Name: "gossip", Port: int32(cluster.Spec.GossipPort)} if cluster.Spec.Monitor == nil { ports = append(ports, clientPort, gossipPort) } else { diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index d69a003e4..c6a7a6b04 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -3,6 +3,7 @@ package statefulsets import ( "fmt" "sort" + "strconv" "strings" appsv1 "k8s.io/api/apps/v1" @@ -70,6 +71,7 @@ func NewStatefulSetForCR(cluster *redisv1alpha1.DistributedRedisCluster, ssName, redisServerContainer(cluster, password), }, Volumes: volumes, + HostNetwork: spec.HostNetwork, }, }, }, @@ -179,6 +181,7 @@ func getRedisCommand(cluster *redisv1alpha1.DistributedRedisCluster, password *c "/conf/fix-ip.sh", "redis-server", "/conf/redis.conf", + "--port " + strconv.Itoa(cluster.Spec.ClientPort), "--cluster-enabled yes", "--cluster-config-file /data/nodes.conf", } @@ -225,8 +228,22 @@ func mergeRenameCmds(userCmds []string, systemRenameCmdMap map[string]string) [] return cmds } +func createContainerPort(name string, port int32, hostNetwork bool) corev1.ContainerPort { + var containerPort = corev1.ContainerPort{ + Name: name, + ContainerPort: port, + Protocol: corev1.ProtocolTCP, + } + + if hostNetwork { + containerPort.HostPort = port + } + + return containerPort +} + func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, password *corev1.EnvVar) corev1.Container { - probeArg := "redis-cli -h $(hostname) ping" + probeArg := "redis-cli -h $(hostname) -p " + strconv.Itoa(cluster.Spec.ClientPort) + " ping" container := corev1.Container{ Name: redisServerName, @@ -234,16 +251,8 @@ func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, passwo ImagePullPolicy: cluster.Spec.ImagePullPolicy, SecurityContext: cluster.Spec.ContainerSecurityContext, Ports: []corev1.ContainerPort{ - { - Name: "client", - ContainerPort: 6379, - Protocol: corev1.ProtocolTCP, - }, - { - Name: "gossip", - ContainerPort: 16379, - Protocol: corev1.ProtocolTCP, - }, + createContainerPort("client", int32(cluster.Spec.ClientPort), cluster.Spec.HostNetwork), + createContainerPort("gossip", int32(cluster.Spec.GossipPort), cluster.Spec.HostNetwork), }, VolumeMounts: volumeMounts(), Command: getRedisCommand(cluster, password), @@ -317,12 +326,9 @@ func redisExporterContainer(cluster *redisv1alpha1.DistributedRedisCluster, pass }, cluster.Spec.Monitor.Args...), Image: cluster.Spec.Monitor.Image, ImagePullPolicy: corev1.PullAlways, + Ports: []corev1.ContainerPort{ - { - Name: "prom-http", - Protocol: corev1.ProtocolTCP, - ContainerPort: cluster.Spec.Monitor.Prometheus.Port, - }, + createContainerPort("prom-http", cluster.Spec.Monitor.Prometheus.Port, cluster.Spec.HostNetwork), }, Env: cluster.Spec.Monitor.Env, Resources: cluster.Spec.Monitor.Resources, diff --git a/test/e2e/drc/drc_test.go b/test/e2e/drc/drc_test.go index 6b335adea..f26748d80 100644 --- a/test/e2e/drc/drc_test.go +++ b/test/e2e/drc/drc_test.go @@ -23,7 +23,7 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { Ω(f.CreateRedisClusterPassword(f.PasswordName(), password)).Should(Succeed()) Ω(f.CreateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(name, f.Namespace(), password) + goredis = e2e.NewGoRedisClient(name, f.Namespace(), password, 6379) Expect(goredis.StuffingData(10, 300000)).NotTo(HaveOccurred()) dbsize, err = goredis.DBSize() Expect(err).NotTo(HaveOccurred()) @@ -41,14 +41,14 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { e2e.DeleteMasterPodForDRC(drc, f.Client) Eventually(e2e.IsDRCPodBeDeleted(f, drc), "5m", "10s").ShouldNot(HaveOccurred()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) It("should scale up a DistributedRedisCluster", func() { e2e.ScaleUPDRC(drc) Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) Context("when the scale up succeeded", func() { @@ -56,7 +56,7 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { e2e.ScaleUPDown(drc) Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) @@ -67,14 +67,14 @@ var _ = Describe("DistributedRedisCluster CRUD", func() { Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) time.Sleep(5 * time.Second) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), newPassword) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), newPassword, 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) It("should update the DistributedRedisCluster minor version", func() { e2e.RollingUpdateDRC(drc) Ω(f.UpdateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(drc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) diff --git a/test/e2e/drcb/drcb_test.go b/test/e2e/drcb/drcb_test.go index c04d785da..2a9405326 100644 --- a/test/e2e/drcb/drcb_test.go +++ b/test/e2e/drcb/drcb_test.go @@ -24,7 +24,7 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func Ω(f.CreateRedisClusterPassword(f.PasswordName(), password)).Should(Succeed()) Ω(f.CreateRedisCluster(drc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, drc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(name, f.Namespace(), password) + goredis = e2e.NewGoRedisClient(name, f.Namespace(), password, 6379) Expect(goredis.StuffingData(10, 300000)).NotTo(HaveOccurred()) dbsize, err = goredis.DBSize() Expect(err).NotTo(HaveOccurred()) @@ -50,7 +50,7 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func rdrc = e2e.RestoreDRC(drc, drcb) Ω(f.CreateRedisCluster(rdrc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) Context("when restore is succeeded", func() { @@ -64,14 +64,14 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func e2e.DeleteMasterPodForDRC(rdrc, f.Client) Eventually(e2e.IsDRCPodBeDeleted(f, rdrc), "5m", "10s").ShouldNot(HaveOccurred()) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) It("should scale up a DistributedRedisCluster", func() { e2e.ScaleUPDRC(rdrc) Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) Context("when the scale up succeeded", func() { @@ -79,7 +79,7 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func e2e.ScaleUPDown(rdrc) Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) @@ -90,14 +90,14 @@ var _ = Describe("Restore DistributedRedisCluster From RedisClusterBackup", func Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) time.Sleep(5 * time.Second) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), newPassword) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), newPassword, 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) It("should update the DistributedRedisCluster minor version", func() { e2e.RollingUpdateDRC(rdrc) Ω(f.UpdateRedisCluster(rdrc)).Should(Succeed()) Eventually(e2e.IsDistributedRedisClusterProperly(f, rdrc), "10m", "10s").ShouldNot(HaveOccurred()) - goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password()) + goredis = e2e.NewGoRedisClient(rdrc.Name, f.Namespace(), goredis.Password(), 6379) Expect(e2e.IsDBSizeConsistent(dbsize, goredis)).NotTo(HaveOccurred()) }) }) diff --git a/test/e2e/operator_util.go b/test/e2e/operator_util.go index 55630e9c4..18e67e0aa 100644 --- a/test/e2e/operator_util.go +++ b/test/e2e/operator_util.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "os" + "strconv" "time" "github.com/go-logr/logr" @@ -162,12 +163,12 @@ func IsDistributedRedisClusterProperly(f *Framework, drc *redisv1alpha1.Distribu RenameCommandsFile: renameCommandsFile, RenameCommandsPath: renameCommandsPath, } - redisAdmin, err := NewRedisAdmin(podList.Items, password, redisconf, logger) + redisAdmin, err := NewRedisAdmin(podList.Items, password, redisconf, logger, drc.Spec.ClientPort) if err != nil { f.Logf("NewRedisAdmin err: %s", err) return err } - if _, err := redisAdmin.GetClusterInfos(); err != nil { + if _, err := redisAdmin.GetClusterInfos(drc.Spec.ClientPort); err != nil { f.Logf("DistributedRedisCluster Cluster nodes: %s", err) return err } @@ -198,10 +199,10 @@ func getLabels(cluster *redisv1alpha1.DistributedRedisCluster) map[string]string } // NewRedisAdmin builds and returns new redis.Admin from the list of pods -func NewRedisAdmin(pods []corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger) (redisutil.IAdmin, error) { +func NewRedisAdmin(pods []corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger, clientPort int) (redisutil.IAdmin, error) { nodesAddrs := []string{} for _, pod := range pods { - redisPort := redisutil.DefaultRedisPort + redisPort := strconv.Itoa(clientPort) for _, container := range pod.Spec.Containers { if container.Name == "redis" { for _, port := range container.Ports { @@ -367,8 +368,8 @@ func IsRedisClusterBackupProperly(f *Framework, drcb *redisv1alpha1.RedisCluster } } -func NewGoRedisClient(svc, namespaces, password string) *GoRedis { - addr := fmt.Sprintf("%s.%s.svc.%s:6379", svc, namespaces, os.Getenv("CLUSTER_DOMAIN")) +func NewGoRedisClient(svc, namespaces, password string, port int) *GoRedis { + addr := fmt.Sprintf("%s.%s.svc.%s:%d", svc, namespaces, os.Getenv("CLUSTER_DOMAIN"), port) return NewGoRedis(addr, password) }