From 581cea828e48a3f146841422d97045337dd5fa25 Mon Sep 17 00:00:00 2001 From: "jizhong.jiangjz" Date: Mon, 8 Jan 2024 16:12:49 +0800 Subject: [PATCH] add docker cri support --- pkg/exporter/nettop/cache.go | 61 ++++++++++----- pkg/exporter/nettop/docker.go | 143 ++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 20 deletions(-) create mode 100644 pkg/exporter/nettop/docker.go diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go index 41211f0c..ba0a7b23 100644 --- a/pkg/exporter/nettop/cache.go +++ b/pkg/exporter/nettop/cache.go @@ -10,7 +10,6 @@ import ( "github.com/vishvananda/netlink" "golang.org/x/sys/unix" - internalapi "k8s.io/cri-api/pkg/apis" v1 "k8s.io/cri-api/pkg/apis/runtime/v1" log "github.com/sirupsen/logrus" @@ -25,9 +24,8 @@ var ( pidCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) ipCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) - control = make(chan struct{}) - lock sync.Mutex - criClient internalapi.RuntimeService + control = make(chan struct{}) + lock sync.Mutex defaultEntity = &Entity{} ) @@ -183,6 +181,10 @@ func StartCache(ctx context.Context, sidecarMode bool) error { if err := initCriClient(runtimeEndpoints); err != nil { return err } + if err := initCriInfo(); err != nil { + return err + } + if err := initDefaultEntity(sidecarMode); err != nil { return err } @@ -263,13 +265,41 @@ func contextDone(ctx context.Context) bool { } } +type CRIInfo struct { + Version string + RuntimeName string + RuntimeVersion string +} + +func getSandboxInfoSpec(sandboxStatus *v1.PodSandboxStatusResponse) (*sandboxInfoSpec, error) { + if criInfo.RuntimeName == "docker" { + return getSandboxInfoSpecForDocker(sandboxStatus.Status.Id) + } + + infoString := sandboxStatus.Info["info"] + if infoString == "" { + return nil, fmt.Errorf("sandbox status does not contains \"info\" field") + } + info := &sandboxInfoSpec{} + if err := json.Unmarshal([]byte(infoString), info); err != nil { + return nil, fmt.Errorf("failed unmarsh info to struct, err: %v", err) + } + + return info, nil +} + func cacheNetTopology(ctx context.Context) error { lock.Lock() defer lock.Unlock() addEntityToCache(defaultEntity) - sandboxList, err := criClient.ListPodSandbox(nil) + sandboxList, err := criClient.ListPodSandbox(&v1.PodSandboxFilter{ + State: &v1.PodSandboxStateValue{ + State: v1.PodSandboxState_SANDBOX_READY, + }, + }) + if err != nil { return fmt.Errorf("failed list pod sandboxes: %w", err) } @@ -286,30 +316,20 @@ func cacheNetTopology(ctx context.Context) error { namespace := sandbox.Metadata.Namespace name := sandbox.Metadata.Name - if sandbox.State != v1.PodSandboxState_SANDBOX_READY { - log.Infof("sandbox %s/%s is not ready ,skip", namespace, name) - continue - } - sandboxStatus, err := criClient.PodSandboxStatus(sandbox.Id, true) if err != nil { - log.Errorf("failed get sandbox status for %s/%s, err: %v", namespace, name, err) + log.Errorf("sandbox: %s/%s failed get status err: %v", namespace, name, err) continue } - if sandboxStatus.Status == nil || sandboxStatus.Info == nil { + if sandboxStatus.Status == nil { log.Errorf("sandbox %s/%s: invalid sandbox status", sandbox.Metadata.Namespace, sandbox.Metadata.Name) continue } - infoString := sandboxStatus.Info["info"] - if infoString == "" { - log.Errorf("sandbox status does not contains \"info\" field, sandbox id %s", sandbox.Id) - continue - } - info := sandboxInfoSpec{} - if err := json.Unmarshal([]byte(infoString), &info); err != nil { - log.Errorf("failed unmarsh info to struct, err: %v", err) + info, err := getSandboxInfoSpec(sandboxStatus) + if err != nil { + log.Errorf("failed get sandbox info: %v", err) continue } @@ -323,6 +343,7 @@ func cacheNetTopology(ctx context.Context) error { var pids []int if podCgroupPath != "" { pids = tasksInsidePodCgroup(podCgroupPath) + log.Debugf("found %d pids under cgroup %s", len(pids), podCgroupPath) } status := sandboxStatus.Status diff --git a/pkg/exporter/nettop/docker.go b/pkg/exporter/nettop/docker.go new file mode 100644 index 00000000..e21c2071 --- /dev/null +++ b/pkg/exporter/nettop/docker.go @@ -0,0 +1,143 @@ +package nettop + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + + log "github.com/sirupsen/logrus" +) + +var ( + dockerhttpc *http.Client + dockerInfo *slimDockerInfo +) + +const dockersock = "/var/run/docker.sock" + +type slimDocker struct { + State slimDockerState `json:"State"` + HostConfig slimDockerHostConfig `json:"HostConfig"` +} + +type slimDockerState struct { + Status string `json:"Status"` + Pid int `json:"Pid"` +} + +type slimDockerHostConfig struct { + CgroupParent string `json:"CgroupParent"` +} + +type slimDockerInfo struct { + CgroupDriver string `json:"CgroupDriver"` +} + +func initializeDockerClient() { + if dockerhttpc != nil { + return + } + dockerhttpc = &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", dockersock) + }, + }, + } + +} + +func initializeDockerInfo() error { + info, err := dockerHTTPRequest("/info") + if err != nil { + return err + } + dockerInfo = &slimDockerInfo{} + if err := json.Unmarshal(info, dockerInfo); err != nil { + return fmt.Errorf("failed unmarshal %s to slimDockerInfo: %w", string(info), err) + } + return nil +} + +func dockerHTTPRequest(path string) ([]byte, error) { + + path = strings.TrimPrefix(path, "/") + + url := fmt.Sprintf("http://localhost/%s", path) + + resp, err := dockerhttpc.Get(url) + if err != nil { + return nil, fmt.Errorf("failed request docker %s: %w", url, err) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed read docker response: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed request docker %s, status code: %d, body: %s", url, resp.StatusCode, string(data)) + } + + return data, nil +} + +func getSandboxInfoSpecForDocker(id string) (*sandboxInfoSpec, error) { + if dockerhttpc == nil { + initializeDockerClient() + } + + if dockerInfo == nil { + if err := initializeDockerInfo(); err != nil { + return nil, fmt.Errorf("failed get docker info: %w", err) + } + } + + b, err := dockerHTTPRequest(fmt.Sprintf("/containers/%s/json", id)) + if err != nil { + return nil, fmt.Errorf("failed read container detail for %s from docker docker: %w", id, err) + } + + docker := &slimDocker{} + if err := json.Unmarshal(b, docker); err != nil { + return nil, fmt.Errorf("failed unmarsh docker response for %s to slimDocker: %w", id, err) + } + + return buildSandboxInfoSpecForDocker(docker) + +} + +func adjustCgroupParent(rawCgroupParent string) string { + if dockerInfo.CgroupDriver == "cgroupfs" { + return rawCgroupParent + } + + arr := strings.Split(rawCgroupParent, "-") + + if len(arr) == 2 { + //guaranteed pod, cgroup parent: kubepods-podfd9ea419_d65a_454e_9697_f7312cf47af7.slice + return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s", arr[0], arr[1]) + } else if len(arr) == 3 { + //besteffort and burstable pod, cgroup parent: kubepods-burstable-podf4c0cdd8e0920b18d85d50904cb0f13d.slice + return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s.slice/%[1]s-%[2]s-%[3]s", arr[0], arr[1], arr[2]) + } + + log.Errorf("invalid cgroup parent path: %s", rawCgroupParent) + return "" +} + +func buildSandboxInfoSpecForDocker(docker *slimDocker) (*sandboxInfoSpec, error) { + cgroupParent := adjustCgroupParent(docker.HostConfig.CgroupParent) + return &sandboxInfoSpec{ + Pid: docker.State.Pid, + Config: Config{ + Linux: Linux{ + CgroupParent: cgroupParent, + }, + }, + }, nil +}