Skip to content

Commit

Permalink
refactor: replace kubectl operation with k8s apiclient call
Browse files Browse the repository at this point in the history
Signed-off-by: Allen Sun <[email protected]>
  • Loading branch information
allencloud committed Oct 4, 2022
1 parent 7b1bb6b commit 011c7d4
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 66 deletions.
17 changes: 7 additions & 10 deletions pkg/runtime/kubernetes/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,16 @@ import (
)

const (
RemoteCmdCopyStatic = "mkdir -p %s && cp -f %s %s"
RemoteApplyYaml = `echo '%s' | kubectl apply -f -`
RemoteCmdGetNetworkInterface = "ls /sys/class/net"
RemoteCmdExistNetworkInterface = "ip addr show %s | egrep \"%s\" || true"
WriteKubeadmConfigCmd = `cd %s && echo '%s' > etc/kubeadm.yml`
DefaultVIP = "10.103.97.2"
DefaultAPIserverDomain = "apiserver.cluster.local"
DefaultRegistryPort = 5000
DockerCertDir = "/etc/docker/certs.d"
RemoteCmdCopyStatic = "mkdir -p %s && cp -f %s %s"
WriteKubeadmConfigCmd = `cd %s && echo '%s' > etc/kubeadm.yml`
DefaultVIP = "10.103.97.2"
DefaultAPIserverDomain = "apiserver.cluster.local"
DefaultRegistryPort = 5000
DockerCertDir = "/etc/docker/certs.d"
)

func (k *Runtime) ConfigKubeadmOnMaster0() error {
if err := k.LoadFromClusterfile(k.Config.ClusterFileKubeConfig); err != nil {
if err := k.LoadFromClusterfile(k.KubeadmConfigFromClusterfile); err != nil {
return fmt.Errorf("failed to load kubeadm config from clusterfile: %v", err)
}
// TODO handle the kubeadm config, like kubeproxy config
Expand Down
48 changes: 23 additions & 25 deletions pkg/runtime/kubernetes/join_masters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"
"sync"

"github.com/pkg/errors"
"github.com/sealerio/sealer/common"
"github.com/sealerio/sealer/pkg/clustercert"
"github.com/sealerio/sealer/pkg/ipvs"
Expand All @@ -32,6 +31,8 @@ import (
"github.com/sealerio/sealer/utils/ssh"
versionUtils "github.com/sealerio/sealer/utils/version"
"github.com/sealerio/sealer/utils/yaml"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -71,7 +72,6 @@ rm -rf /var/lib/etcd && rm -rf /var/etcd
RemoteRemoveRegistryCerts = "rm -rf " + DockerCertDir + "/%s*"
RemoveLvscareStaticPod = "rm -rf /etc/kubernetes/manifests/kube-sealyun-lvscare*"
CreateLvscareStaticPod = "mkdir -p /etc/kubernetes/manifests && echo '%s' > /etc/kubernetes/manifests/kube-sealyun-lvscare.yaml"
KubeDeleteNode = "kubectl delete node %s"
// TODO check kubernetes certs
RemoteCheckCerts = "kubeadm alpha certs check-expiration"
)
Expand Down Expand Up @@ -389,30 +389,32 @@ func (k *Runtime) deleteMasters(masters []net.IP) error {
return eg.Wait()
}

func (k *Runtime) isHostName(master, host net.IP) (string, error) {
hostString, err := k.CmdToString(master, "kubectl get nodes | grep -v NAME | awk '{print $1}'", ",")
// isHostInExistingCluster checks if the input host is contained in the existing cluster.
// It gets all nodes within existing cluster,
// judges each node name with the hostname of input host node
// and finially get the comparison result.
func (k *Runtime) isHostInExistingCluster(host net.IP) (string, error) {
nodes, err := k.ListNodes()
if err != nil {
return "", err
}
if len(nodes.Items) == 0 {
return "", fmt.Errorf("no node gotten by k8s client")
}

hostName, err := k.CmdToString(host, "hostname", "")
if err != nil {
return "", err
}
hosts := strings.Split(hostString, ",")
var name string
for _, h := range hosts {
if strings.TrimSpace(h) == "" {
continue
} else {
hh := strings.ToLower(h)
fromH := strings.ToLower(hostName)
if hh == fromH {
name = h
break
}

for _, nodeItem := range nodes.Items {
hh := strings.ToLower(nodeItem.Name)
fromH := strings.ToLower(hostName)
if hh == fromH {
return hh, nil
}
}
return name, nil
return "", fmt.Errorf("failed to get node: no node's hostname matches hostname of host(%s)", host)
}

func (k *Runtime) deleteMaster(master net.IP) error {
Expand Down Expand Up @@ -449,19 +451,15 @@ func (k *Runtime) deleteMaster(master net.IP) error {
}

if len(masterIPs) > 0 {
hostname, err := k.isHostName(k.cluster.GetMaster0IP(), master)
hostname, err := k.isHostInExistingCluster(master)
if err != nil {
return err
}
master0SSH, err := k.getHostSSHClient(k.cluster.GetMaster0IP())
if err != nil {
return fmt.Errorf("failed to get master0 ssh client: %v", err)
}

if err := master0SSH.CmdAsync(k.cluster.GetMaster0IP(), fmt.Sprintf(KubeDeleteNode, strings.TrimSpace(hostname))); err != nil {
return fmt.Errorf("failed to delete node %s: %v", hostname, err)
if err := k.DeleteNode(strings.TrimSpace(hostname)); err != nil {
return err
}
}

lvsImage := k.RegConfig.Repo() + "/fanux/lvscare:latest"
yaml := ipvs.LvsStaticPodYaml(k.getVIP(), masterIPs, lvsImage)
eg, _ := errgroup.WithContext(context.Background())
Expand Down
14 changes: 5 additions & 9 deletions pkg/runtime/kubernetes/join_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,17 @@ func (k *Runtime) deleteNode(node net.IP) error {
if err := ssh.CmdAsync(node, remoteCleanCmds...); err != nil {
return err
}
//remove node

// remove node
if len(k.cluster.GetMasterIPList()) > 0 {
hostname, err := k.isHostName(k.cluster.GetMaster0IP(), node)
hostname, err := k.isHostInExistingCluster(node)
if err != nil {
return err
}
ssh, err := k.getHostSSHClient(k.cluster.GetMaster0IP())
if err != nil {
return fmt.Errorf("failed to get master0 ssh client(%s): %v", k.cluster.GetMaster0IP(), err)
}
if err := ssh.CmdAsync(k.cluster.GetMaster0IP(), fmt.Sprintf(KubeDeleteNode, strings.TrimSpace(hostname))); err != nil {
return fmt.Errorf("failed to delete node %s: %v", hostname, err)
if err := k.DeleteNode(strings.TrimSpace(hostname)); err != nil {
return err
}
}

return nil
}

Expand Down
63 changes: 41 additions & 22 deletions pkg/runtime/kubernetes/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,53 @@ package kubernetes
import (
"context"
"fmt"
"strings"
"sync"

"github.com/sealerio/sealer/pkg/registry"
"github.com/sealerio/sealer/pkg/runtime/kubernetes/kubeadm"
"github.com/sealerio/sealer/utils"
versionUtils "github.com/sealerio/sealer/utils/version"

"net"
"path/filepath"
"strings"
"sync"
"time"

"github.com/sealerio/sealer/common"
"github.com/sealerio/sealer/pkg/client/k8s"
"github.com/sealerio/sealer/pkg/registry"
"github.com/sealerio/sealer/pkg/runtime"
"github.com/sealerio/sealer/pkg/runtime/kubernetes/kubeadm"
v2 "github.com/sealerio/sealer/types/api/v2"
"github.com/sealerio/sealer/utils"
"github.com/sealerio/sealer/utils/platform"
"github.com/sealerio/sealer/utils/ssh"
strUtils "github.com/sealerio/sealer/utils/strings"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2"
versionUtils "github.com/sealerio/sealer/utils/version"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta2"
)

type Config struct {
Vlog int
VIP string
RegConfig *registry.Config
// Clusterfile: the absolute path, we need to read kubeadm config from Clusterfile
ClusterFileKubeConfig *kubeadm.KubeadmConfig
APIServerDomain string
}

// Runtime struct is the runtime interface for kubernetes
type Runtime struct {
*sync.Mutex
// TODO: remove field cluster from runtime pkg
// just make runtime to use essential data from cluster, rather than the whole cluster scope.
cluster *v2.Cluster

// The KubeadmConfig used to setup the final cluster.
// Its data is from KubeadmConfig input from Clusterfile and KubeadmConfig in ClusterImage.
*kubeadm.KubeadmConfig
*Config
// *Config
KubeadmConfigFromClusterfile *kubeadm.KubeadmConfig
APIServerDomain string
Vlog int
VIP string

// RegConfig contains the embedded registry configuration of cluster
RegConfig *registry.Config
*k8s.Client
}

// NewDefaultRuntime arg "clusterfileKubeConfig" is the Clusterfile path/name, runtime need read kubeadm config from it
Expand All @@ -67,13 +75,24 @@ func NewDefaultRuntime(cluster *v2.Cluster, clusterfileKubeConfig *kubeadm.Kubea
func newKubernetesRuntime(cluster *v2.Cluster, clusterFileKubeConfig *kubeadm.KubeadmConfig) (runtime.Interface, error) {
k := &Runtime{
cluster: cluster,
Config: &Config{
/*Config: &Config{
ClusterFileKubeConfig: clusterFileKubeConfig,
APIServerDomain: DefaultAPIserverDomain,
},
KubeadmConfig: &kubeadm.KubeadmConfig{},
},*/
KubeadmConfigFromClusterfile: clusterFileKubeConfig,
KubeadmConfig: &kubeadm.KubeadmConfig{},
APIServerDomain: DefaultAPIserverDomain,
}
k.Config.RegConfig = registry.GetConfig(k.getImageMountDir(), k.cluster.GetMaster0IP())

var err error
if k.Client, err = k8s.Newk8sClient(); err != nil {
// In current design, as runtime controls all cluster operations including run, join, delete
// and so on, then when executing run operation, it will definitely fail when creating k8s client
// since no k8s cluster is setup. While when join and delete operation, the cluster already exists,
// we can make it to create k8s client. Therefore just throw a warn log to move on.
logrus.Warnf("failed to create k8s client: %v", err)
}

k.RegConfig = registry.GetConfig(k.getImageMountDir(), k.cluster.GetMaster0IP())
k.setCertSANS(append(
[]string{"127.0.0.1", k.getAPIServerDomain(), k.getVIP().String()},
k.cluster.GetMasterIPStrList()...),
Expand Down Expand Up @@ -239,7 +258,7 @@ func (k *Runtime) getDNSDomain() string {
}

func (k *Runtime) getAPIServerDomain() string {
return k.Config.APIServerDomain
return k.APIServerDomain
}

func (k *Runtime) getKubeVersion() string {
Expand Down Expand Up @@ -369,8 +388,8 @@ func (k *Runtime) MergeKubeadmConfig() error {
if k.getKubeVersion() != "" {
return nil
}
if k.Config.ClusterFileKubeConfig != nil {
if err := k.LoadFromClusterfile(k.Config.ClusterFileKubeConfig); err != nil {
if k.KubeadmConfigFromClusterfile != nil {
if err := k.LoadFromClusterfile(k.KubeadmConfigFromClusterfile); err != nil {
return fmt.Errorf("failed to load kubeadm config from clusterfile: %v", err)
}
}
Expand Down

0 comments on commit 011c7d4

Please sign in to comment.