diff --git a/bpf/netns.c b/bpf/netns.c new file mode 100644 index 00000000..3ecf221e --- /dev/null +++ b/bpf/netns.c @@ -0,0 +1,4 @@ + +SEC("tracepoint/task/newtask") +int netns(){ +} diff --git a/pkg/exporter/cmd/list_probe.go b/pkg/exporter/cmd/list_probe.go index b83bbee3..c871b3ae 100644 --- a/pkg/exporter/cmd/list_probe.go +++ b/pkg/exporter/cmd/list_probe.go @@ -25,7 +25,6 @@ var ( indent := " " for _, s := range l { fmt.Printf("%s%s\n", indent, s) - } } }, diff --git a/pkg/exporter/cmd/root.go b/pkg/exporter/cmd/root.go index d5d4a1a9..bc692708 100644 --- a/pkg/exporter/cmd/root.go +++ b/pkg/exporter/cmd/root.go @@ -5,8 +5,6 @@ import ( log "github.com/sirupsen/logrus" - "github.com/alibaba/kubeskoop/pkg/exporter/nettop" - "github.com/spf13/cobra" ) @@ -16,7 +14,6 @@ var ( Use: "inspector", Short: "network inspection tool", PersistentPreRun: func(cmd *cobra.Command, args []string) { - nettop.Init(sidecar) if debug { log.SetLevel(log.DebugLevel) } else { diff --git a/pkg/exporter/cmd/server.go b/pkg/exporter/cmd/server.go index 3b8d05eb..61de430c 100644 --- a/pkg/exporter/cmd/server.go +++ b/pkg/exporter/cmd/server.go @@ -57,7 +57,11 @@ var ( } // nolint - go nettop.StartCache(insp.ctx) + if err := nettop.StartCache(insp.ctx, sidecar); err != nil { + log.Errorf("failed start cache: %v", err) + return + } + defer nettop.StopCache() // config hot reload process diff --git a/pkg/exporter/nettop/cache.go b/pkg/exporter/nettop/cache.go index 456b06c1..dd61e7f5 100644 --- a/pkg/exporter/nettop/cache.go +++ b/pkg/exporter/nettop/cache.go @@ -2,116 +2,184 @@ package nettop import ( "context" + "encoding/base64" + "encoding/json" "fmt" "os" + "strings" + "sync" "time" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + v1 "k8s.io/cri-api/pkg/apis/runtime/v1" + log "github.com/sirupsen/logrus" "github.com/patrickmn/go-cache" "github.com/vishvananda/netns" ) -const ( - hostNetwork = "hostNetwork" - unknowNetwork = "unknow" -) - var ( cacheUpdateInterval = 10 * time.Second - podCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) nsCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) pidCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) + ipCache = cache.New(20*cacheUpdateInterval, 20*cacheUpdateInterval) control = make(chan struct{}) + lock sync.Mutex + + defaultEntity = &Entity{} ) -type netnsMeta struct { - inum int - mountPath string - pids []int - isHostNetwork bool -} +func podNameFromServiceAccountToken() (string, error) { + token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") + if err != nil { + return "", fmt.Errorf("failed get pod token, err: %w", err) + } + arr := strings.Split(string(token), ".") + if len(arr) != 3 { + return "", fmt.Errorf("invalid serviceaccount token format") + } -type podMeta struct { - name string - namespace string - sandbox string - pid int - nspath string - app string // app label from cri response - ip string // ip addr from cri response - labels map[string]string -} + data, err := base64.RawStdEncoding.DecodeString(arr[1]) + if err != nil { + return "", fmt.Errorf("failed decode serviceaccount token: %w", err) + } -type Entity struct { - netnsMeta - podMeta - pids []int -} + s := struct { + K8s struct { + Pod struct { + Name string `json:"name"` + } `json:"pod"` + } `json:"kubernetes.io"` + }{} -func (e *Entity) GetIP() string { - return e.podMeta.ip + if err := json.Unmarshal(data, &s); err != nil { + return "", fmt.Errorf("failed unmarshal serviceaccount token: %w", err) + } + return s.K8s.Pod.Name, nil } -func (e *Entity) GetAppLabel() string { - if e.netnsMeta.isHostNetwork { - return hostNetwork +func currentPodInfo() (string, string, error) { + namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return "", "", fmt.Errorf("failed get namespace in sidecar mode, err: %v", err) } - return e.podMeta.app -} -func (e *Entity) GetLabel(labelkey string) (string, bool) { - if e.podMeta.labels != nil { - if v, ok := e.podMeta.labels[labelkey]; ok { - return v, true + name, err := podNameFromServiceAccountToken() + if err != nil { + log.Warnf("failed get pod name from /var/run/secrets/kubernetes.io/serviceaccount/token, fallback to hostname") + + name, err := os.ReadFile("/etc/hostname") + if err != nil { + return "", "", fmt.Errorf("failed get namespace in sidecar mode, err: %v", err) } + + return string(namespace), string(name), nil } - return "", false + return string(namespace), name, nil } -func (e *Entity) GetPodName() string { - if env := os.Getenv("INSPECTOR_PODNAME"); env != "" { - return env +func initDefaultEntity(sidecarMode bool) error { + self := os.Getpid() + hostNetNSId, err := getNsInumByPid(self) + if err != nil { + return fmt.Errorf("failed get host nsnum id, err: %w", err) } - if e.netnsMeta.isHostNetwork { - return hostNetwork + ipList, err := hostIPList() + if err != nil { + return err } - if e.podMeta.name != "" { - return e.podMeta.name + //add host network + defaultEntity = &Entity{ + netnsMeta: netnsMeta{ + inum: hostNetNSId, + mountPath: fmt.Sprintf("/proc/%d/ns/net", self), + isHostNetwork: !sidecarMode, + ipList: ipList, + }, + initPid: self, } - return unknowNetwork -} + if sidecarMode { + namespace, name, err := currentPodInfo() + if err != nil { + return fmt.Errorf("failed get current pod info: %w", err) + } -func (e *Entity) GetPodNamespace() string { - if env := os.Getenv("INSPECTOR_PODNAMESPACE"); env != "" { - return env + podIP := "" + if len(ipList) > 0 { + podIP = ipList[0] + } + + defaultEntity.podMeta = podMeta{ + namespace: namespace, + name: name, + ip: podIP, + } } - if e.netnsMeta.isHostNetwork { - return hostNetwork + return nil +} + +func hostIPList() ([]string, error) { + links, err := netlink.LinkList() + if err != nil { + return nil, fmt.Errorf("failed get host link list: %w", err) } - if e.podMeta.namespace != "" { - return e.podMeta.namespace + var ret []string + + for _, link := range links { + addrs, err := netlink.AddrList(link, unix.AF_INET) + if err != nil { + log.Errorf("failed get addr from link %s: %v", link.Attrs().Name, err) + continue + } + for _, addr := range addrs { + if !addr.IP.IsGlobalUnicast() { + continue + } + ret = append(ret, addr.IP.String()) + } } + return ret, nil +} + +type netnsMeta struct { + inum int + mountPath string + isHostNetwork bool + ipList []string +} - return unknowNetwork +type podMeta struct { + name string + namespace string + ip string } -func (e *Entity) GetMeta(name string) (string, error) { - switch name { - case "ip": - return e.GetIP(), nil - case "netns": - return fmt.Sprintf("ns%d", e.GetNetns()), nil - default: - return "", fmt.Errorf("unkonw or unsupported meta %s", name) - } +type Entity struct { + netnsMeta + podMeta + initPid int + pids []int +} + +func (e *Entity) GetIP() string { + return e.podMeta.ip +} + +func (e *Entity) GetPodName() string { + return e.podMeta.name +} + +func (e *Entity) GetPodNamespace() string { + return e.podMeta.namespace } func (e *Entity) IsHostNetwork() bool { @@ -126,15 +194,8 @@ func (e *Entity) GetNetnsMountPoint() string { return e.netnsMeta.mountPath } -func (e *Entity) GetPodSandboxID() string { - return e.podMeta.sandbox -} - func (e *Entity) GetNsHandle() (netns.NsHandle, error) { - if len(e.pids) != 0 { - return netns.GetFromPid(e.pids[0]) - } - + //TODO check whether we should close the opened file return netns.GetFromPath(e.netnsMeta.mountPath) } @@ -147,27 +208,44 @@ func (e *Entity) GetNetNsFd() (int, error) { return int(h), nil } -// GetPid return a random pid of entify, if no process in netns,return 0 +// GetPid return a random initPid of entify, if no process in netns,return 0 func (e *Entity) GetPid() int { - if len(e.pids) == 0 { - return 0 - } - return e.pids[0] + return e.initPid } func (e *Entity) GetPids() []int { return e.pids } -func StartCache(ctx context.Context) error { - log.Infof("nettop cache loop start, interval: %d", cacheUpdateInterval) - return cacheDaemonLoop(ctx, control) +func StartCache(ctx context.Context, sidecarMode bool) error { + if err := initCriClient(runtimeEndpoints); err != nil { + return err + } + if err := initCriInfo(); err != nil { + return err + } + + if err := initDefaultEntity(sidecarMode); err != nil { + return err + } + if sidecarMode { + return nil + } + + if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { + return fmt.Errorf("failed cache pods, err: %v", err) + } + + go cacheDaemonLoop(ctx, control) + return nil } func StopCache() { control <- struct{}{} } -func cacheDaemonLoop(_ context.Context, control chan struct{}) error { +func cacheDaemonLoop(_ context.Context, control chan struct{}) { + log.Infof("nettop cache loop start") + t := time.NewTicker(cacheUpdateInterval) defer t.Stop() @@ -175,178 +253,167 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) error { select { case <-control: log.Info("cache daemon loop exit of control signal") - return nil case <-t.C: - go cacheProcess() + if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil { + log.Errorf("failed cache pods: %v", err) + } } - } } -func cacheProcess() { +func cachePodsWithTimeout(timeout time.Duration) error { start := time.Now() - ctx, cancelf := context.WithTimeout(context.Background(), cacheUpdateInterval) - defer cancelf() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var err error cacheDone := make(chan struct{}) go func(done chan struct{}) { - err := cacheNetTopology() - if err != nil { - log.Errorf("failed cache process, err: %v", err) - } - done <- struct{}{} + err = cacheNetTopology(ctx) + close(done) }(cacheDone) select { case <-ctx.Done(): log.Infof("cache process time exceeded, latency: %fs", time.Since(start).Seconds()) - return + return fmt.Errorf("timeout process pods") case <-cacheDone: log.Infof("cache process finished, latency: %fs", time.Since(start).Seconds()) + return err } +} +func addEntityToCache(e *Entity) { + nsCache.Set(fmt.Sprintf("%d", e.inum), e, 3*cacheUpdateInterval) + for _, ip := range e.ipList { + ipCache.Set(ip, e, 3*cacheUpdateInterval) + } + for _, pid := range e.pids { + pidCache.Set(fmt.Sprintf("%d", pid), e, 3*cacheUpdateInterval) + } } -func SyncNetTopology() error { - return cacheNetTopology() +func contextDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + +type CRIInfo struct { + Version string + RuntimeName string + RuntimeVersion string +} + +func getSandboxInfoSpec(sandboxStatus *v1.PodSandboxStatusResponse) (*sandboxInfoSpec, error) { + if criInfo.RuntimeName == "docker" { + return getSandboxInfoSpecForDocker(sandboxStatus.Status.Id) + } + + infoString := sandboxStatus.Info["info"] + if infoString == "" { + return nil, fmt.Errorf("sandbox status does not contains \"info\" field") + } + info := &sandboxInfoSpec{} + if err := json.Unmarshal([]byte(infoString), info); err != nil { + return nil, fmt.Errorf("failed unmarsh info to struct, err: %v", err) + } + + return info, nil } -func cacheNetTopology() error { - // get all process - pids, err := getAllPids() +func cacheNetTopology(ctx context.Context) error { + lock.Lock() + defer lock.Unlock() + + addEntityToCache(defaultEntity) + + sandboxList, err := criClient.ListPodSandbox(&v1.PodSandboxFilter{ + State: &v1.PodSandboxStateValue{ + State: v1.PodSandboxState_SANDBOX_READY, + }, + }) + if err != nil { - log.Warnf("cache pids failed %s", err) - return err + return fmt.Errorf("failed list pod sandboxes: %w", err) } - log.Debug("finished get all pids") - // get all netns by process - netnsMap := map[int]netnsMeta{} - for _, pid := range pids { - nsinum, err := getNsInumByPid(pid) + for _, sandbox := range sandboxList { + + if contextDone(ctx) { + return fmt.Errorf("timeout") + } + if sandbox.Metadata == nil { + log.Errorf("invalid sandbox who has no metadata, id %s", sandbox.Id) + } + + namespace := sandbox.Metadata.Namespace + name := sandbox.Metadata.Name + + sandboxStatus, err := criClient.PodSandboxStatus(sandbox.Id, true) if err != nil { - log.Warnf("get ns inum of %d failed %s", pid, err) + log.Errorf("sandbox: %s/%s failed get status err: %v", namespace, name, err) continue } - if v, ok := netnsMap[nsinum]; !ok { - nsm := netnsMeta{ - inum: nsinum, - pids: []int{pid}, - } - if pid == 1 { - nsm.isHostNetwork = true - } - netnsMap[nsinum] = nsm - } else { - v.pids = append(v.pids, pid) - if pid == 1 { - v.isHostNetwork = true - } - netnsMap[nsinum] = v + if sandboxStatus.Status == nil { + log.Errorf("sandbox %s/%s: invalid sandbox status", sandbox.Metadata.Namespace, sandbox.Metadata.Name) + continue } - } + info, err := getSandboxInfoSpec(sandboxStatus) + if err != nil { + log.Errorf("failed get sandbox info: %v", err) + continue + } - log.Debug("finished get all netns") + netnsNum, err := getNsInumByPid(info.Pid) + if err != nil { + log.Errorf("failed get netns for initPid %d, err: %v", info.Pid, err) + continue + } - // get netns mount point aka cni presentation - namedns, err := findNsfsMountpoint() - if err != nil { - log.Warnf("get nsfs mount point failed %s", err) - } else { - for _, mp := range namedns { - nsinum, err := getNsInumByNsfsMountPoint(mp) - if err != nil { - log.Warnf("get ns inum from %s point failed %s", mp, err) - continue - } - if v, ok := netnsMap[nsinum]; !ok { - // in rund case, netns does not have any live process - netnsMap[nsinum] = netnsMeta{ - inum: nsinum, - mountPath: mp, - } - } else { - v.mountPath = mp - netnsMap[nsinum] = v - } + podCgroupPath := info.Config.Linux.CgroupParent + var pids []int + if podCgroupPath != "" { + pids = tasksInsidePodCgroup(podCgroupPath) + log.Debugf("found %d pids under cgroup %s", len(pids), podCgroupPath) } - } - log.Debug("finished get all nsfs mount point") + status := sandboxStatus.Status - var podMap map[string]podMeta - if !sidecarEnabled { - // get pod meta info - podMap, err = getPodMetas(rcrisvc) - if err != nil { - log.Warnf("get pod meta failed %s", err) - return err + if netnsNum == defaultEntity.inum { + log.Infof("skip host network pod %s/%s", namespace, name) + continue } - // if use docker, get docker sandbox - if top.Crimeta != nil && top.Crimeta.RuntimeName == "docker" { - for sandbox, pm := range podMap { - if pm.nspath == "" && pm.pid == 0 { - pid, err := getPidForContainerBySock(sandbox) - if err != nil { - log.Warnf("get docker container error, sandbox: %s, err: %v", sandbox, err) - continue - } - pm.pid = pid - } - podMap[sandbox] = pm - } + if sandboxStatus.Status.Network == nil || sandboxStatus.Status.Network.Ip == "" { + log.Errorf("sanbox %s/%s: invalid sandbox status, no ip", sandbox.Metadata.Namespace, sandbox.Metadata.Name) + continue } - } - // combine netns and pod cache, - for nsinum, nsmeta := range netnsMap { - ent := &Entity{ - netnsMeta: nsmeta, - pids: nsmeta.pids, + e := &Entity{ + netnsMeta: netnsMeta{ + inum: netnsNum, + mountPath: fmt.Sprintf("/proc/%d/ns/net", info.Pid), + isHostNetwork: false, + ipList: []string{status.Network.Ip}, + }, + podMeta: podMeta{ + name: name, + namespace: namespace, + ip: sandboxStatus.Status.Network.Ip, + }, + initPid: info.Pid, + pids: pids, } - log.Debugf("try associate pod with netns %d (%s)", nsinum, nsmeta.mountPath) - for sandbox, pm := range podMap { - // 1. use cri infospec/nspath to match - if pm.nspath != "" && pm.nspath == nsmeta.mountPath { - ent.podMeta = pm - log.Debugf("associate pod %s with mount point %d", pm.name, nsmeta.inum) - podCache.Set(sandbox, ent, 3*cacheUpdateInterval) - for _, pid := range nsmeta.pids { - pidCache.Set(fmt.Sprintf("%d", pid), ent, 3*cacheUpdateInterval) - } - continue - } - // 2. use pid nsinum to match - pidns, err := getNsInumByPid(pm.pid) - if err == nil { - if nsinum == pidns { - ent.podMeta = pm - log.Debugf("associate pod %s with netns %d", pm.name, nsmeta.inum) - podCache.Set(sandbox, ent, 3*cacheUpdateInterval) - for _, pid := range nsmeta.pids { - pidCache.Set(fmt.Sprintf("%d", pid), ent, 3*cacheUpdateInterval) - } - } - } else { - // 3. try to use pid to match - for _, pid := range nsmeta.pids { - if pm.pid == pid { - ent.podMeta = pm - log.Debugf("associate pod pid, pod: %s, netns %d", pm.name, nsmeta.inum) - podCache.Set(sandbox, ent, 3*cacheUpdateInterval) - for _, pid := range nsmeta.pids { - pidCache.Set(fmt.Sprintf("%d", pid), ent, 3*cacheUpdateInterval) - } - } - } - } - } - nsCache.Set(fmt.Sprintf("%d", nsinum), ent, 3*cacheUpdateInterval) + addEntityToCache(e) } log.Debug("finished cache process") diff --git a/pkg/exporter/nettop/cgroup.go b/pkg/exporter/nettop/cgroup.go new file mode 100644 index 00000000..e67be2c6 --- /dev/null +++ b/pkg/exporter/nettop/cgroup.go @@ -0,0 +1,72 @@ +package nettop + +import ( + "fmt" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" + + log "github.com/sirupsen/logrus" +) + +var ( + cgroupRoot = "" +) + +func init() { + root, err := lookupCgroupRoot() + if err != nil { + log.Errorf("failed lookup cgroup root: %v", err) + return + } + cgroupRoot = root +} + +func lookupCgroupRoot() (string, error) { + // TODO lookup from /proc/mount + return "/sys/fs/cgroup", nil +} + +func tasksInsidePodCgroup(path string) []int { + //TODO watch file changes by inotify + if cgroupRoot == "" || path == "" { + return nil + } + base := filepath.Join(cgroupRoot, "memory", path) + m := make(map[int]int) + err := filepath.Walk(base, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && strings.HasSuffix(path, "/tasks") { + tasks, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed read cgroup tasks %s: %w", path, err) + } + for _, s := range strings.Split(string(tasks), "\n") { + s = strings.TrimSpace(s) + if s == "" { + continue + } + i, err := strconv.Atoi(s) + if err != nil { + return fmt.Errorf("invalid tasks pid format in %s : %w", path, err) + } + m[i] = 1 + } + } + return nil + }) + + if err != nil { + log.Errorf("failed list tasks: %v", err) + } + + var ret []int + for k := range m { + ret = append(ret, k) + } + return ret +} diff --git a/pkg/exporter/nettop/cri.go b/pkg/exporter/nettop/cri.go index cd2265fc..a354c27e 100644 --- a/pkg/exporter/nettop/cri.go +++ b/pkg/exporter/nettop/cri.go @@ -22,67 +22,72 @@ import ( runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) +var ( + criClient internalapi.RuntimeService + criInfo *CRIInfo +) + const ( unixProtocol = "unix" maxMsgSize = 1024 * 1024 * 16 kubeAPIVersion = "0.1.0" ) -var ( - rcrisvc internalapi.RuntimeService -) - -func (c *CriMeta) Update() error { - criclient, crisock, err := getCriClient(runtimeEndpoints) - if err != nil { - return err - } - - c.RuntimeSock = crisock - rcrisvc = criclient +var runtimeEndpoints = []string{"/var/run/dockershim.sock", "/run/containerd/containerd.sock", "/run/k3s/containerd/containerd.sock"} - version, err := rcrisvc.Version(kubeAPIVersion) - if err != nil { - return err +func initCriClient(eps []string) (err error) { + if criClient != nil { + return } - c.RuntimeName = version.RuntimeName - c.RuntimeVersion = version.RuntimeVersion - c.Version = version.RuntimeApiVersion - return nil -} - -// remoteRuntimeService is a gRPC implementation of internalapi.RuntimeService. -type remoteRuntimeService struct { - timeout time.Duration - runtimeClient runtimeapi.RuntimeServiceClient - runtimeClientV1alpha2 runtimeapiV1alpha2.RuntimeServiceClient -} - -func getCriClient(eps []string) (internalapi.RuntimeService, string, error) { if sock, ok := os.LookupEnv("RUNTIME_SOCK"); ok { - if _, err := os.Stat(sock); os.IsNotExist(err) { - return nil, "", fmt.Errorf("cannot find cri sock %s", sock) + if _, err = os.Stat(sock); os.IsNotExist(err) { + return fmt.Errorf("cannot find cri sock %s", sock) } - client, err := NewRemoteRuntimeService(sock, 10*time.Second) + criClient, err = NewRemoteRuntimeService(sock, 10*time.Second) if err != nil { - return nil, "", fmt.Errorf("connect cri sock %s error: %w", sock, err) + return fmt.Errorf("connect cri sock %s error: %w", sock, err) } - return client, sock, nil + return } for _, candidate := range eps { if _, err := os.Stat(candidate); os.IsNotExist(err) { continue } - client, err := NewRemoteRuntimeService(candidate, 10*time.Second) + criClient, err = NewRemoteRuntimeService(candidate, 10*time.Second) if err != nil { continue } - return client, candidate, nil + return } - return nil, "", fmt.Errorf("cannot find valid cri sock in %s", strings.Join(eps, ",")) + return fmt.Errorf("cannot find valid cri sock in %s", strings.Join(eps, ",")) +} + +func initCriInfo() error { + if criInfo != nil { + return nil + } + + version, err := criClient.Version(kubeAPIVersion) + if err != nil { + return fmt.Errorf("failed get runtime version: %w", err) + } + criInfo = &CRIInfo{ + Version: version.RuntimeApiVersion, + RuntimeName: version.RuntimeName, + RuntimeVersion: version.RuntimeVersion, + } + log.Infof("cri info: version=%s runtime=%s runtimeVersion=%s", criInfo.Version, criInfo.RuntimeName, criInfo.RuntimeVersion) + return nil +} + +// remoteRuntimeService is a gRPC implementation of internalapi.RuntimeService. +type remoteRuntimeService struct { + timeout time.Duration + runtimeClient runtimeapi.RuntimeServiceClient + runtimeClientV1alpha2 runtimeapiV1alpha2.RuntimeServiceClient } // useV1API returns true if the v1 CRI API should be used instead of v1alpha2. @@ -594,6 +599,7 @@ func (r *remoteRuntimeService) determineAPIVersion(conn *grpc.ClientConn) error if _, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{}); err == nil { log.Warn("Using CRI v1 runtime API") } else if status.Code(err) == codes.Unimplemented { + log.Warn("Using CRI v1alpha2 runtime API") r.runtimeClientV1alpha2 = runtimeapiV1alpha2.NewRuntimeServiceClient(conn) } else { return fmt.Errorf("unable to determine runtime API version: %w", err) diff --git a/pkg/exporter/nettop/docker.go b/pkg/exporter/nettop/docker.go index 31562c5b..8f81841c 100644 --- a/pkg/exporter/nettop/docker.go +++ b/pkg/exporter/nettop/docker.go @@ -4,20 +4,24 @@ import ( "context" "encoding/json" "fmt" - io "io" + "io" "net" "net/http" + "strings" log "github.com/sirupsen/logrus" ) var ( dockerhttpc *http.Client + dockerInfo *slimDockerInfo ) +const dockersock = "/var/run/docker.sock" + type slimDocker struct { - ID string `json:"Id,omitempty"` - State slimDockerState `json:"State"` + State slimDockerState `json:"State"` + HostConfig slimDockerHostConfig `json:"HostConfig"` } type slimDockerState struct { @@ -25,38 +29,117 @@ type slimDockerState struct { Pid int `json:"Pid"` } -func getPidForContainerBySock(id string) (int, error) { - // logger.Infof("start get pid of %s", id) - dockersock := "/var/run/docker.sock" - if dockerhttpc == nil { - dockerhttpc = &http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", dockersock) - }, +type slimDockerHostConfig struct { + CgroupParent string `json:"CgroupParent"` +} + +type slimDockerInfo struct { + CgroupDriver string `json:"CgroupDriver"` +} + +func initializeDockerClient() { + if dockerhttpc != nil { + return + } + dockerhttpc = &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", dockersock) }, - } + }, } - url := fmt.Sprintf("http://localhost/containers/%s/json", id) - response, err := dockerhttpc.Get(url) +} + +func initializeDockerInfo() error { + info, err := dockerHTTPRequest("/info") if err != nil { - log.Errorf("failed get docker response, err: %v", err) - return 0, err + return err + } + dockerInfo = &slimDockerInfo{} + if err := json.Unmarshal(info, dockerInfo); err != nil { + return fmt.Errorf("failed unmarshal %s to slimDockerInfo: %w", string(info), err) } + return nil +} + +func dockerHTTPRequest(path string) ([]byte, error) { + + path = strings.TrimPrefix(path, "/") - b, err := io.ReadAll(response.Body) + url := fmt.Sprintf("http://localhost/%s", path) + + resp, err := dockerhttpc.Get(url) if err != nil { - log.Errorf("failed get docker response, err: %v", err) - return 0, err + return nil, fmt.Errorf("failed request docker %s: %w", url, err) } - sd := &slimDocker{} - err = json.Unmarshal(b, &sd) + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) if err != nil { - log.Errorf("failed get docker response, err: %v", err) - return 0, err + return nil, fmt.Errorf("failed read docker response: %w", err) } - log.Infof("finish get pid, sandbox: %s, pid: %d", id, sd.State.Pid) - return sd.State.Pid, nil + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed request docker %s, status code: %d, body: %s", url, resp.StatusCode, string(data)) + } + + return data, nil +} + +func getSandboxInfoSpecForDocker(id string) (*sandboxInfoSpec, error) { + if dockerhttpc == nil { + initializeDockerClient() + } + + if dockerInfo == nil { + if err := initializeDockerInfo(); err != nil { + return nil, fmt.Errorf("failed get docker info: %w", err) + } + } + + b, err := dockerHTTPRequest(fmt.Sprintf("/containers/%s/json", id)) + if err != nil { + return nil, fmt.Errorf("failed read container detail for %s from docker docker: %w", id, err) + } + + docker := &slimDocker{} + if err := json.Unmarshal(b, docker); err != nil { + return nil, fmt.Errorf("failed unmarsh docker response for %s to slimDocker: %w", id, err) + } + + return buildSandboxInfoSpecForDocker(docker) + +} + +func adjustCgroupParent(rawCgroupParent string) string { + if dockerInfo.CgroupDriver == "cgroupfs" { + return rawCgroupParent + } + + arr := strings.Split(rawCgroupParent, "-") + + if len(arr) == 2 { + //guaranteed pod, cgroup parent: kubepods-podfd9ea419_d65a_454e_9697_f7312cf47af7.slice + return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s", arr[0], arr[1]) + } else if len(arr) == 3 { + //besteffort and burstable pod, cgroup parent: kubepods-burstable-podf4c0cdd8e0920b18d85d50904cb0f13d.slice + return fmt.Sprintf("/%[1]s.slice/%[1]s-%[2]s.slice/%[1]s-%[2]s-%[3]s", arr[0], arr[1], arr[2]) + } + + log.Errorf("invalid cgroup parent path: %s", rawCgroupParent) + return "" +} + +func buildSandboxInfoSpecForDocker(docker *slimDocker) (*sandboxInfoSpec, error) { + cgroupParent := adjustCgroupParent(docker.HostConfig.CgroupParent) + return &sandboxInfoSpec{ + Pid: docker.State.Pid, + Config: Config{ + Linux: Linux{ + CgroupParent: cgroupParent, + }, + }, + }, nil } diff --git a/pkg/exporter/nettop/interface.go b/pkg/exporter/nettop/interface.go index 210c0e99..c672acdd 100644 --- a/pkg/exporter/nettop/interface.go +++ b/pkg/exporter/nettop/interface.go @@ -2,8 +2,7 @@ package nettop import ( "fmt" - - "github.com/vishvananda/netns" + "os" ) var ( @@ -14,7 +13,7 @@ var ( func GetEntityByNetns(nsinum int) (*Entity, error) { // if use nsinum 0, represent node level metrics if nsinum == 0 { - return GetHostnetworlEntity() + return GetHostNetworkEntity() } v, found := nsCache.Get(fmt.Sprintf("%d", nsinum)) if found { @@ -23,25 +22,8 @@ func GetEntityByNetns(nsinum int) (*Entity, error) { return nil, fmt.Errorf("entify for netns %d not found", nsinum) } -func GetHostnetworlEntity() (*Entity, error) { - return GetEntityByPid(1) -} - -func GetHostnetworkNetnsFd() (int, error) { - nsh, err := netns.GetFromPid(1) - if err != nil { - return 0, err - } - - return int(nsh), nil -} - -func GetEntityByPod(sandbox string) (*Entity, error) { - v, found := podCache.Get(sandbox) - if found { - return v.(*Entity), nil - } - return nil, fmt.Errorf("entify for pod %s not found", sandbox) +func GetHostNetworkEntity() (*Entity, error) { + return defaultEntity, nil } func GetEntityByPid(pid int) (*Entity, error) { @@ -68,6 +50,14 @@ func GetAllEntity() []*Entity { return res } -func GetEntityNetnsByPid(pid int) (int, error) { - return getNsInumByPid(pid) +func GetNodeName() string { + if os.Getenv("INSPECTOR_NODENAME") != "" { + return os.Getenv("INSPECTOR_NODENAME") + } + node, err := os.Hostname() + if err != nil { + return "Unknow" + } + + return node } diff --git a/pkg/exporter/nettop/libnettop.pb.go b/pkg/exporter/nettop/libnettop.pb.go deleted file mode 100644 index 8c978bad..00000000 --- a/pkg/exporter/nettop/libnettop.pb.go +++ /dev/null @@ -1,341 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.12 -// source: libnettop.proto - -package nettop - -import ( - reflect "reflect" - sync "sync" - - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Kernel struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Release string `protobuf:"bytes,1,opt,name=release,proto3" json:"release,omitempty"` - Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` - Architecture string `protobuf:"bytes,3,opt,name=architecture,proto3" json:"architecture,omitempty"` -} - -func (x *Kernel) Reset() { - *x = Kernel{} - if protoimpl.UnsafeEnabled { - mi := &file_libnettop_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Kernel) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Kernel) ProtoMessage() {} - -func (x *Kernel) ProtoReflect() protoreflect.Message { - mi := &file_libnettop_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Kernel.ProtoReflect.Descriptor instead. -func (*Kernel) Descriptor() ([]byte, []int) { - return file_libnettop_proto_rawDescGZIP(), []int{0} -} - -func (x *Kernel) GetRelease() string { - if x != nil { - return x.Release - } - return "" -} - -func (x *Kernel) GetVersion() string { - if x != nil { - return x.Version - } - return "" -} - -func (x *Kernel) GetArchitecture() string { - if x != nil { - return x.Architecture - } - return "" -} - -type CriMeta struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` - RuntimeName string `protobuf:"bytes,2,opt,name=runtime_name,json=runtimeName,proto3" json:"runtime_name,omitempty"` - RuntimeVersion string `protobuf:"bytes,3,opt,name=runtime_version,json=runtimeVersion,proto3" json:"runtime_version,omitempty"` - RuntimeSock string `protobuf:"bytes,4,opt,name=runtime_sock,json=runtimeSock,proto3" json:"runtime_sock,omitempty"` -} - -func (x *CriMeta) Reset() { - *x = CriMeta{} - if protoimpl.UnsafeEnabled { - mi := &file_libnettop_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *CriMeta) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*CriMeta) ProtoMessage() {} - -func (x *CriMeta) ProtoReflect() protoreflect.Message { - mi := &file_libnettop_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use CriMeta.ProtoReflect.Descriptor instead. -func (*CriMeta) Descriptor() ([]byte, []int) { - return file_libnettop_proto_rawDescGZIP(), []int{1} -} - -func (x *CriMeta) GetVersion() string { - if x != nil { - return x.Version - } - return "" -} - -func (x *CriMeta) GetRuntimeName() string { - if x != nil { - return x.RuntimeName - } - return "" -} - -func (x *CriMeta) GetRuntimeVersion() string { - if x != nil { - return x.RuntimeVersion - } - return "" -} - -func (x *CriMeta) GetRuntimeSock() string { - if x != nil { - return x.RuntimeSock - } - return "" -} - -type NodeMeta struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` - Kernel *Kernel `protobuf:"bytes,2,opt,name=kernel,proto3" json:"kernel,omitempty"` - Crimeta *CriMeta `protobuf:"bytes,3,opt,name=crimeta,proto3" json:"crimeta,omitempty"` -} - -func (x *NodeMeta) Reset() { - *x = NodeMeta{} - if protoimpl.UnsafeEnabled { - mi := &file_libnettop_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *NodeMeta) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*NodeMeta) ProtoMessage() {} - -func (x *NodeMeta) ProtoReflect() protoreflect.Message { - mi := &file_libnettop_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use NodeMeta.ProtoReflect.Descriptor instead. -func (*NodeMeta) Descriptor() ([]byte, []int) { - return file_libnettop_proto_rawDescGZIP(), []int{2} -} - -func (x *NodeMeta) GetNodeName() string { - if x != nil { - return x.NodeName - } - return "" -} - -func (x *NodeMeta) GetKernel() *Kernel { - if x != nil { - return x.Kernel - } - return nil -} - -func (x *NodeMeta) GetCrimeta() *CriMeta { - if x != nil { - return x.Crimeta - } - return nil -} - -var File_libnettop_proto protoreflect.FileDescriptor - -var file_libnettop_proto_rawDesc = []byte{ - 0x0a, 0x0f, 0x6c, 0x69, 0x62, 0x6e, 0x65, 0x74, 0x74, 0x6f, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x06, 0x6e, 0x65, 0x74, 0x74, 0x6f, 0x70, 0x22, 0x60, 0x0a, 0x06, 0x4b, 0x65, 0x72, - 0x6e, 0x65, 0x6c, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x72, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x18, 0x0a, - 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x61, 0x72, 0x63, 0x68, 0x69, - 0x74, 0x65, 0x63, 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, - 0x72, 0x63, 0x68, 0x69, 0x74, 0x65, 0x63, 0x74, 0x75, 0x72, 0x65, 0x22, 0x92, 0x01, 0x0a, 0x07, - 0x43, 0x72, 0x69, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, - 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5f, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, - 0x0c, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x6f, 0x63, 0x6b, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0b, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x6f, 0x63, 0x6b, - 0x22, 0x7a, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1b, 0x0a, 0x09, - 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x26, 0x0a, 0x06, 0x6b, 0x65, 0x72, - 0x6e, 0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6e, 0x65, 0x74, 0x74, - 0x6f, 0x70, 0x2e, 0x4b, 0x65, 0x72, 0x6e, 0x65, 0x6c, 0x52, 0x06, 0x6b, 0x65, 0x72, 0x6e, 0x65, - 0x6c, 0x12, 0x29, 0x0a, 0x07, 0x63, 0x72, 0x69, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6e, 0x65, 0x74, 0x74, 0x6f, 0x70, 0x2e, 0x43, 0x72, 0x69, 0x4d, - 0x65, 0x74, 0x61, 0x52, 0x07, 0x63, 0x72, 0x69, 0x6d, 0x65, 0x74, 0x61, 0x42, 0x0a, 0x5a, 0x08, - 0x2e, 0x3b, 0x6e, 0x65, 0x74, 0x74, 0x6f, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_libnettop_proto_rawDescOnce sync.Once - file_libnettop_proto_rawDescData = file_libnettop_proto_rawDesc -) - -func file_libnettop_proto_rawDescGZIP() []byte { - file_libnettop_proto_rawDescOnce.Do(func() { - file_libnettop_proto_rawDescData = protoimpl.X.CompressGZIP(file_libnettop_proto_rawDescData) - }) - return file_libnettop_proto_rawDescData -} - -var file_libnettop_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_libnettop_proto_goTypes = []interface{}{ - (*Kernel)(nil), // 0: nettop.Kernel - (*CriMeta)(nil), // 1: nettop.CriMeta - (*NodeMeta)(nil), // 2: nettop.NodeMeta -} -var file_libnettop_proto_depIdxs = []int32{ - 0, // 0: nettop.NodeMeta.kernel:type_name -> nettop.Kernel - 1, // 1: nettop.NodeMeta.crimeta:type_name -> nettop.CriMeta - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name -} - -func init() { file_libnettop_proto_init() } -func file_libnettop_proto_init() { - if File_libnettop_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_libnettop_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Kernel); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_libnettop_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CriMeta); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_libnettop_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NodeMeta); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_libnettop_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_libnettop_proto_goTypes, - DependencyIndexes: file_libnettop_proto_depIdxs, - MessageInfos: file_libnettop_proto_msgTypes, - }.Build() - File_libnettop_proto = out.File - file_libnettop_proto_rawDesc = nil - file_libnettop_proto_goTypes = nil - file_libnettop_proto_depIdxs = nil -} diff --git a/pkg/exporter/nettop/libnettop.proto b/pkg/exporter/nettop/libnettop.proto deleted file mode 100644 index a8bda701..00000000 --- a/pkg/exporter/nettop/libnettop.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax="proto3"; - -package nettop; -option go_package = ".;nettop"; - -message Kernel { - string release = 1; - string version = 2; - string architecture = 3; -} - -message CriMeta { - string version = 1; - string runtime_name = 2; - string runtime_version = 3; - string runtime_sock = 4; -} - -message NodeMeta { - string node_name = 1; - Kernel kernel = 2; - CriMeta crimeta = 3; -} diff --git a/pkg/exporter/nettop/netns.go b/pkg/exporter/nettop/netns.go index add7a281..29a9aebb 100644 --- a/pkg/exporter/nettop/netns.go +++ b/pkg/exporter/nettop/netns.go @@ -5,33 +5,8 @@ import ( "os" "strconv" "strings" - "syscall" ) -func getAllPids() ([]int, error) { - d, err := os.Open("/proc") - if err != nil { - return nil, err - } - defer d.Close() - - names, err := d.Readdirnames(-1) - if err != nil { - return nil, err - } - - pidlist := []int{} - for _, name := range names { - pid, err := strconv.ParseInt(name, 10, 64) - if err != nil { - continue - } - pidlist = append(pidlist, int(pid)) - } - - return pidlist, nil -} - func getNsInumByPid(pid int) (int, error) { d, err := os.Open(fmt.Sprintf("/proc/%d/ns", pid)) if err != nil { @@ -68,61 +43,3 @@ func getNsInumByPid(pid int) (int, error) { return 0, fmt.Errorf("net namespace of %d not found", pid) } - -func findNsfsMountpoint() (mounts []string, err error) { - output, err := os.ReadFile("/proc/mounts") - if err != nil { - return nil, err - } - - // /proc/mounts has 6 fields per line, one mount per line, e.g. - for _, line := range strings.Split(string(output), "\n") { - parts := strings.Split(line, " ") - if len(parts) == 6 { - switch parts[2] { - case "nsfs": - mounts = append(mounts, parts[1]) - } - } - } - - return -} - -func getNsInumByNsfsMountPoint(file string) (int, error) { - fileinfo, err := os.Stat(file) - if os.IsNotExist(err) || err != nil { - return 0, err - } - if fileinfo.Mode()&os.ModeSymlink != 0 { - target, err := os.Readlink(file) - if err != nil { - return 0, err - } - - fields := strings.SplitN(target, ":", 2) - if len(fields) != 2 { - return 0, fmt.Errorf("failed to parse namespace type and inode from %q", target) - } - - if fields[0] == "net" { - inode, err := strconv.ParseUint(strings.Trim(fields[1], "[]"), 10, 32) - if err != nil { - return 0, fmt.Errorf("failed to parse inode from %q: %w", fields[1], err) - } - - return int(inode), nil - } - } else { - stat, ok := fileinfo.Sys().(*syscall.Stat_t) - if !ok { - return 0, fmt.Errorf("cannot parse file stat %s", file) - } - - if stat.Ino != 0 { - return int(stat.Ino), nil - } - } - - return 0, fmt.Errorf("cannot find valid inode of %s", file) -} diff --git a/pkg/exporter/nettop/netns_test.go b/pkg/exporter/nettop/netns_test.go deleted file mode 100644 index f21b9b9b..00000000 --- a/pkg/exporter/nettop/netns_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package nettop - -import ( - "testing" -) - -func TestFindNsInum(_ *testing.T) { -} diff --git a/pkg/exporter/nettop/nodemeta.go b/pkg/exporter/nettop/nodemeta.go deleted file mode 100644 index 3dcca9d8..00000000 --- a/pkg/exporter/nettop/nodemeta.go +++ /dev/null @@ -1,95 +0,0 @@ -package nettop - -import ( - fmt "fmt" - "os" - "strings" - "unsafe" - - log "github.com/sirupsen/logrus" - - "golang.org/x/sys/unix" -) - -//go:generate protoc --go_out=. ./libnettop.proto - -var ( - top = metacache{} - runtimeEndpoints = []string{"/var/run/dockershim.sock", "/run/containerd/containerd.sock", "/run/k3s/containerd/containerd.sock"} - - sidecarEnabled bool -) - -type metacache struct { - NodeMeta -} - -func Init(sidecar bool) { - top.NodeName = getNodeName() - kr, err := getKernelRelease() - if err != nil { - log.Errorf("failed to get node kernel info %v", err) - } else { - top.Kernel = kr - } - if !sidecar { - // use empty cri meta, fulfillize it when updated - c := &CriMeta{} - err = c.Update() - if err != nil { - log.Errorf("update cri meta failed %v", err) - } - - top.Crimeta = c - } - - sidecarEnabled = sidecar -} - -func getNodeName() string { - if os.Getenv("INSPECTOR_NODENAME") != "" { - return os.Getenv("INSPECTOR_NODENAME") - } - node, err := os.Hostname() - if err != nil { - return "Unknow" - } - - return node -} - -func getKernelRelease() (*Kernel, error) { - k := &Kernel{} - var uname unix.Utsname - if err := unix.Uname(&uname); err != nil { - return k, fmt.Errorf("uname failed: %w", err) - } - k.Version = unix.ByteSliceToString(uname.Version[:]) - k.Release = unix.ByteSliceToString(uname.Release[:]) - k.Architecture = strings.TrimRight(string((*[65]byte)(unsafe.Pointer(&uname.Machine))[:]), "\000") - return k, nil -} - -func GetRuntimeName() string { - return top.NodeMeta.GetCrimeta().GetRuntimeName() -} - -func GetRuntimeVersion() string { - return top.NodeMeta.GetCrimeta().GetRuntimeVersion() -} - -func GetRuntimeSock() string { - return top.NodeMeta.GetCrimeta().GetRuntimeSock() -} - -func GetRuntimeAPIVersion() string { - return top.NodeMeta.GetCrimeta().GetVersion() -} - -func GetNodeName() string { - return top.GetNodeName() -} - -func GetKernelRelease() string { - return top.Kernel.Release -} diff --git a/pkg/exporter/nettop/pod.go b/pkg/exporter/nettop/pod.go index 39553b6a..429889ec 100644 --- a/pkg/exporter/nettop/pod.go +++ b/pkg/exporter/nettop/pod.go @@ -1,86 +1,34 @@ package nettop -import ( - "encoding/json" - - log "github.com/sirupsen/logrus" - - internalapi "k8s.io/cri-api/pkg/apis" - runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" -) - -type NamespacesSpec struct { - Type string `json:"type"` - Path string `json:"path"` +// from containerd sandbox status +//type NamespacesSpec struct { +// Type string `json:"type"` +// Path string `json:"path"` +//} + +//type LinuxSpec struct { +// NamespaceSpec []NamespacesSpec `json:"namespaces"` +//} + +//type RuntimeSpec struct { +// Linux LinuxSpec `json:"linux"` +//} + +//type RuntimeOptions struct { +// SystemdCGroup bool `json:"systemd_cgroup"` +//} + +type Linux struct { + CgroupParent string `json:"cgroup_parent"` } -type LinuxSpec struct { - NamespaceSpec []NamespacesSpec `json:"namespaces"` +type Config struct { + Linux Linux `json:"linux"` } -type RuntimeSpec struct { - Linux LinuxSpec `json:"linux"` -} - -type InfoSpec struct { - Pid int `json:"pid"` - RuntimeSpec RuntimeSpec `json:"runtimeSpec"` -} - -func getPodMetas(client internalapi.RuntimeService) (map[string]podMeta, error) { - if client == nil { - return nil, nil - } - // only list live pods - filter := runtimeapi.PodSandboxFilter{ - State: &runtimeapi.PodSandboxStateValue{ - State: runtimeapi.PodSandboxState_SANDBOX_READY, - }, - } - listresponse, err := client.ListPodSandbox(&filter) - if err != nil { - return nil, err - } - resMap := make(map[string]podMeta) - for _, sandbox := range listresponse { - status, err := client.PodSandboxStatus(sandbox.GetId(), true) - if err != nil { - log.Debugf("get pod sandbox %s status failed with %s", sandbox.GetId(), err) - continue - } - pm := podMeta{ - sandbox: sandbox.GetId(), - name: status.GetStatus().GetMetadata().GetName(), - namespace: status.GetStatus().GetMetadata().GetNamespace(), - ip: status.GetStatus().GetNetwork().GetIp(), - labels: status.GetStatus().GetLabels(), - } - - if v, ok := status.GetStatus().GetLabels()["app"]; ok { - pm.app = v - } - - // get process pid - info := status.GetInfo()["info"] - if info != "" { - infospec := InfoSpec{} - err := json.Unmarshal([]byte(info), &infospec) - if err != nil { - log.Warnf("parse info spec %s failed with %v", pm.name, err) - continue - } - pm.pid = infospec.Pid - if infospec.RuntimeSpec.Linux.NamespaceSpec != nil && len(infospec.RuntimeSpec.Linux.NamespaceSpec) > 0 { - for _, ns := range infospec.RuntimeSpec.Linux.NamespaceSpec { - if ns.Type == "network" { - pm.nspath = ns.Path - } - } - } - } - - resMap[sandbox.GetId()] = pm - - } - return resMap, nil +type sandboxInfoSpec struct { + Pid int `json:"pid"` + //RuntimeSpec RuntimeSpec `json:"runtimeSpec"` + //RuntimeOptions RuntimeOptions `json:"runtimeOptions"` + Config Config `json:"config"` } diff --git a/pkg/exporter/probe/legacy.go b/pkg/exporter/probe/legacy.go index f14fa626..51c4b203 100644 --- a/pkg/exporter/probe/legacy.go +++ b/pkg/exporter/probe/legacy.go @@ -95,7 +95,7 @@ func (l *legacyBatchMetrics) Collect(metrics chan<- prometheus.Metric) { func LagacyEventLabels(netns uint32) []Label { et, err := nettop.GetEntityByNetns(int(netns)) - if err != nil { + if err != nil || et == nil { log.Infof("nettop get entity failed, netns: %d, err: %v", netns, err) return nil } diff --git a/pkg/exporter/probe/tracepacketloss/packetloss.go b/pkg/exporter/probe/tracepacketloss/packetloss.go index 8af00dcc..6ead4975 100644 --- a/pkg/exporter/probe/tracepacketloss/packetloss.go +++ b/pkg/exporter/probe/tracepacketloss/packetloss.go @@ -293,7 +293,7 @@ func (p *packetLossProbe) loadAndAttachBPF() error { func (p *packetLossProbe) perfLoop() { for { - anothor_loop: + anotherLoop: record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { @@ -335,7 +335,7 @@ func (p *packetLossProbe) perfLoop() { strs := []string{} for _, sym := range stacks { if _, ok := ignoreSymbolList[sym.GetName()]; ok { - goto anothor_loop + goto anotherLoop } if _, ok := uselessSymbolList[sym.GetName()]; ok { continue