Skip to content

Commit

Permalink
add docker cri support
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed Jan 8, 2024
1 parent 37ab880 commit 581cea8
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 20 deletions.
61 changes: 41 additions & 20 deletions pkg/exporter/nettop/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
internalapi "k8s.io/cri-api/pkg/apis"
v1 "k8s.io/cri-api/pkg/apis/runtime/v1"

log "github.com/sirupsen/logrus"
Expand All @@ -25,9 +24,8 @@ var (
pidCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval)
ipCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval)

control = make(chan struct{})
lock sync.Mutex
criClient internalapi.RuntimeService
control = make(chan struct{})
lock sync.Mutex

defaultEntity = &Entity{}
)
Expand Down Expand Up @@ -183,6 +181,10 @@ func StartCache(ctx context.Context, sidecarMode bool) error {
if err := initCriClient(runtimeEndpoints); err != nil {
return err
}
if err := initCriInfo(); err != nil {
return err
}

if err := initDefaultEntity(sidecarMode); err != nil {
return err
}
Expand Down Expand Up @@ -263,13 +265,41 @@ func contextDone(ctx context.Context) bool {
}
}

type CRIInfo struct {
Version string
RuntimeName string
RuntimeVersion string
}

func getSandboxInfoSpec(sandboxStatus *v1.PodSandboxStatusResponse) (*sandboxInfoSpec, error) {
if criInfo.RuntimeName == "docker" {
return getSandboxInfoSpecForDocker(sandboxStatus.Status.Id)
}

infoString := sandboxStatus.Info["info"]
if infoString == "" {
return nil, fmt.Errorf("sandbox status does not contains \"info\" field")
}
info := &sandboxInfoSpec{}
if err := json.Unmarshal([]byte(infoString), info); err != nil {
return nil, fmt.Errorf("failed unmarsh info to struct, err: %v", err)
}

return info, nil
}

func cacheNetTopology(ctx context.Context) error {
lock.Lock()
defer lock.Unlock()

addEntityToCache(defaultEntity)

sandboxList, err := criClient.ListPodSandbox(nil)
sandboxList, err := criClient.ListPodSandbox(&v1.PodSandboxFilter{
State: &v1.PodSandboxStateValue{
State: v1.PodSandboxState_SANDBOX_READY,
},
})

if err != nil {
return fmt.Errorf("failed list pod sandboxes: %w", err)
}
Expand All @@ -286,30 +316,20 @@ func cacheNetTopology(ctx context.Context) error {
namespace := sandbox.Metadata.Namespace
name := sandbox.Metadata.Name

if sandbox.State != v1.PodSandboxState_SANDBOX_READY {
log.Infof("sandbox %s/%s is not ready ,skip", namespace, name)
continue
}

sandboxStatus, err := criClient.PodSandboxStatus(sandbox.Id, true)
if err != nil {
log.Errorf("failed get sandbox status for %s/%s, err: %v", namespace, name, err)
log.Errorf("sandbox: %s/%s failed get status err: %v", namespace, name, err)
continue
}

if sandboxStatus.Status == nil || sandboxStatus.Info == nil {
if sandboxStatus.Status == nil {
log.Errorf("sandbox %s/%s: invalid sandbox status", sandbox.Metadata.Namespace, sandbox.Metadata.Name)
continue
}

infoString := sandboxStatus.Info["info"]
if infoString == "" {
log.Errorf("sandbox status does not contains \"info\" field, sandbox id %s", sandbox.Id)
continue
}
info := sandboxInfoSpec{}
if err := json.Unmarshal([]byte(infoString), &info); err != nil {
log.Errorf("failed unmarsh info to struct, err: %v", err)
info, err := getSandboxInfoSpec(sandboxStatus)
if err != nil {
log.Errorf("failed get sandbox info: %v", err)
continue
}

Expand All @@ -323,6 +343,7 @@ func cacheNetTopology(ctx context.Context) error {
var pids []int
if podCgroupPath != "" {
pids = tasksInsidePodCgroup(podCgroupPath)
log.Debugf("found %d pids under cgroup %s", len(pids), podCgroupPath)
}

status := sandboxStatus.Status
Expand Down
143 changes: 143 additions & 0 deletions pkg/exporter/nettop/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package nettop

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"

log "github.com/sirupsen/logrus"
)

var (
dockerhttpc *http.Client
dockerInfo *slimDockerInfo
)

const dockersock = "/var/run/docker.sock"

type slimDocker struct {
State slimDockerState `json:"State"`
HostConfig slimDockerHostConfig `json:"HostConfig"`
}

type slimDockerState struct {
Status string `json:"Status"`
Pid int `json:"Pid"`
}

type slimDockerHostConfig struct {
CgroupParent string `json:"CgroupParent"`
}

type slimDockerInfo struct {
CgroupDriver string `json:"CgroupDriver"`
}

func initializeDockerClient() {
if dockerhttpc != nil {
return
}
dockerhttpc = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", dockersock)
},
},
}

}

func initializeDockerInfo() error {
info, err := dockerHTTPRequest("/info")
if err != nil {
return err
}
dockerInfo = &slimDockerInfo{}
if err := json.Unmarshal(info, dockerInfo); err != nil {
return fmt.Errorf("failed unmarshal %s to slimDockerInfo: %w", string(info), err)
}
return nil
}

func dockerHTTPRequest(path string) ([]byte, error) {

path = strings.TrimPrefix(path, "/")

url := fmt.Sprintf("http://localhost/%s", path)

resp, err := dockerhttpc.Get(url)
if err != nil {
return nil, fmt.Errorf("failed request docker %s: %w", url, err)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed read docker response: %w", err)
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed request docker %s, status code: %d, body: %s", url, resp.StatusCode, string(data))
}

return data, nil
}

func getSandboxInfoSpecForDocker(id string) (*sandboxInfoSpec, error) {
if dockerhttpc == nil {
initializeDockerClient()
}

if dockerInfo == nil {
if err := initializeDockerInfo(); err != nil {
return nil, fmt.Errorf("failed get docker info: %w", err)
}
}

b, err := dockerHTTPRequest(fmt.Sprintf("/containers/%s/json", id))
if err != nil {
return nil, fmt.Errorf("failed read container detail for %s from docker docker: %w", id, err)
}

docker := &slimDocker{}
if err := json.Unmarshal(b, docker); err != nil {
return nil, fmt.Errorf("failed unmarsh docker response for %s to slimDocker: %w", id, err)
}

return buildSandboxInfoSpecForDocker(docker)

}

func adjustCgroupParent(rawCgroupParent string) string {
if dockerInfo.CgroupDriver == "cgroupfs" {
return rawCgroupParent
}

arr := strings.Split(rawCgroupParent, "-")

if len(arr) == 2 {
//guaranteed pod, cgroup parent: kubepods-podfd9ea419_d65a_454e_9697_f7312cf47af7.slice
return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s", arr[0], arr[1])
} else if len(arr) == 3 {
//besteffort and burstable pod, cgroup parent: kubepods-burstable-podf4c0cdd8e0920b18d85d50904cb0f13d.slice
return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s.slice/%[1]s-%[2]s-%[3]s", arr[0], arr[1], arr[2])
}

log.Errorf("invalid cgroup parent path: %s", rawCgroupParent)
return ""
}

func buildSandboxInfoSpecForDocker(docker *slimDocker) (*sandboxInfoSpec, error) {
cgroupParent := adjustCgroupParent(docker.HostConfig.CgroupParent)
return &sandboxInfoSpec{
Pid: docker.State.Pid,
Config: Config{
Linux: Linux{
CgroupParent: cgroupParent,
},
},
}, nil
}

0 comments on commit 581cea8

Please sign in to comment.