Skip to content

Commit

Permalink
Merge pull request #169 from for-zero-one/api-informer
Browse files Browse the repository at this point in the history
add informer for pod list
  • Loading branch information
BSWANG authored Jan 31, 2024
2 parents 626d3b6 + c2e4673 commit 972390c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/controller/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *Server) Run(agentPort int, httpPort int) {
done := make(chan struct{})
go s.RunAgentServer(agentPort, done)
go s.RunHTTPServer(httpPort, done)
go s.controller.Run(done)

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/service/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Node struct {
}

func (c *controller) PodList(ctx context.Context) ([]*Pod, error) {
if c.podInformer != nil {
return c.podListWithInformer(ctx)
}

pods, err := c.k8sClient.CoreV1().Pods("").List(ctx, v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list pods failed: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

type ControllerService interface {
rpc.ControllerRegisterServiceServer
Run(done <-chan struct{})
GetAgentList() []*rpc.AgentInfo
Capture(ctx context.Context, capture *CaptureArgs) (int, error)
CaptureList(ctx context.Context) (map[int][]*CaptureTaskResult, error)
Expand Down Expand Up @@ -63,6 +64,10 @@ func NewControllerService() (ControllerService, error) {
return nil, fmt.Errorf("error create k8s client, err: %v", err)
}

if os.Getenv("KUBERNETES_RESOURCE_INFORMER") == "enable" {
ctrl.InitInformer()
}

if promEndpoint, ok := os.LookupEnv("PROMETHEUS_ENDPOINT"); ok {
promClient, err := api.NewClient(api.Config{
Address: promEndpoint,
Expand Down Expand Up @@ -100,6 +105,7 @@ func NewControllerService() (ControllerService, error) {

type controller struct {
rpc.UnimplementedControllerRegisterServiceServer
ControllerInformer
diagnostor diagnose.Controller
k8sClient *kubernetes.Clientset
taskWatcher sync.Map
Expand All @@ -109,3 +115,7 @@ type controller struct {
Namespace string
ConfigMapName string
}

func (c *controller) Run(stop <-chan struct{}) {
c.RunInformer(stop)
}
70 changes: 70 additions & 0 deletions pkg/controller/service/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package service

import (
"context"
"fmt"
"time"

"github.com/samber/lo"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)

type ControllerInformer struct {
k8sInformerFactory informers.SharedInformerFactory
podInformer v1.PodInformer
}

func (c *ControllerInformer) podListWithInformer(_ context.Context) ([]*Pod, error) {
pods, err := c.podInformer.Lister().Pods("").List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("list pods failed: %v", err)
}
return lo.Map[*corev1.Pod, *Pod](pods, func(pod *corev1.Pod, idx int) *Pod {
return &Pod{
Name: pod.Name,
Namespace: pod.Namespace,
Nodename: pod.Spec.NodeName,
Labels: pod.Labels,
}
}), nil
}

func (c *controller) InitInformer() {
if c.k8sClient == nil {
return
}

c.k8sInformerFactory = informers.NewSharedInformerFactory(c.k8sClient, time.Minute*1)
c.podInformer = c.k8sInformerFactory.Core().V1().Pods()
_ = c.podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return []string{}, nil
}

if pod.Status.Phase == corev1.PodRunning {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
}

func (c *controller) RunInformer(stop <-chan struct{}) {
if c.k8sInformerFactory == nil {
return
}

c.k8sInformerFactory.Start(stop)
log.Infof("start informer cache sync.")
if !cache.WaitForCacheSync(stop, c.podInformer.Informer().HasSynced) {
log.Errorf("failed to sync pod info")
}
log.Infof("informer cache sync finish.")
}

0 comments on commit 972390c

Please sign in to comment.