Skip to content

Commit

Permalink
crdwatcher: Reuse watcher.K8sWatcher to watch TracingPolicy
Browse files Browse the repository at this point in the history
Before, to watch TracingPolicy(Namespaced) custom resources, we were reading
Kubernetes config, creating an informer factory and starting it, all in the
WatchTracePolicy function. As now the watcher.K8sWatcher structs initialized
informer factories, we can reuse it instead of configuring everything from
scratch. This commit replaces the WatchTracePolicy function with
AddTracingPolicyInformer, which only adds informers to the K8sWatcher.
Initialization and start of the K8sWatcher happens in main.

Signed-off-by: Anna Kapuscinska <[email protected]>
  • Loading branch information
lambdanis committed Feb 17, 2025
1 parent 2141791 commit 12ec831
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 36 deletions.
9 changes: 6 additions & 3 deletions cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
if err != nil {
return err
}
if option.Config.EnableTracingPolicyCRD {
err := crdwatcher.AddTracingPolicyInformer(ctx, realK8sWatcher, observer.GetSensorManager())
if err != nil {
return err
}
}
} else {
log.Info("Disabling Kubernetes API")
k8sWatcher = watcher.NewFakeK8sWatcher(nil)
Expand Down Expand Up @@ -501,9 +507,6 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu
log.WithField("enabled", option.Config.ExportFilename != "").WithField("fileName", option.Config.ExportFilename).Info("Exporter configuration")
obs.AddListener(pm)
saveInitInfo()
if option.Config.EnableK8s && option.Config.EnableTracingPolicyCRD {
go crdwatcher.WatchTracePolicy(ctx, observer.GetSensorManager())
}

obs.LogPinnedBpf(observerDir)

Expand Down
35 changes: 17 additions & 18 deletions pkg/observer/observertesthelper/observer_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,38 @@ import (
"testing"
"time"

"github.com/cilium/tetragon/pkg/cgrouprate"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions"
"github.com/cilium/tetragon/pkg/metricsconfig"
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/policyfilter"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/bpf"
"github.com/cilium/tetragon/pkg/btf"
"github.com/cilium/tetragon/pkg/bugtool"
"github.com/cilium/tetragon/pkg/cgrouprate"
"github.com/cilium/tetragon/pkg/defaults"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/exporter"
tetragonGrpc "github.com/cilium/tetragon/pkg/grpc"
"github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/metricsconfig"
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/policyfilter"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/namespace"
"github.com/cilium/tetragon/pkg/rthooks"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/sensors/base"
"github.com/cilium/tetragon/pkg/sensors/exec/procevents"
"github.com/cilium/tetragon/pkg/testutils"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/watcher"
"github.com/cilium/tetragon/pkg/watcher/crdwatcher"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

var (
Expand Down Expand Up @@ -374,7 +373,7 @@ func getDefaultSensors(tb testing.TB, initialSensor *sensors.Sensor, opts ...Tes
}

func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, opts *testExporterOptions, oo *testObserverOptions) error {
watcher := opts.watcher
k8sWatcher := opts.watcher
processCacheSize := 32768
dataCacheSize := 1024
procCacheGCInterval := defaults.DefaultProcessCacheGCInterval
Expand All @@ -391,7 +390,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
})

if oo.crd {
crdwatcher.WatchTracePolicy(ctx, sensorManager)
crdwatcher.AddTracingPolicyInformer(ctx, k8sWatcher, sensorManager)
}

if err := btf.InitCachedBTF(option.Config.HubbleLib, ""); err != nil {
Expand All @@ -402,7 +401,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
procCacheGCInterval = oo.procCacheGCInterval
}

if err := process.InitCache(watcher, processCacheSize, procCacheGCInterval); err != nil {
if err := process.InitCache(k8sWatcher, processCacheSize, procCacheGCInterval); err != nil {
return err
}

Expand All @@ -413,7 +412,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
var cancelWg sync.WaitGroup

// use an empty hooks runner
hookRunner := (&rthooks.Runner{}).WithWatcher(watcher)
hookRunner := (&rthooks.Runner{}).WithWatcher(k8sWatcher)

// For testing we disable the eventcache and cilium cache by default. If we
// enable these then every tests would need to wait for the 1.5 mimutes needed
Expand Down
36 changes: 21 additions & 15 deletions pkg/watcher/crdwatcher/tracingpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ import (

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"

"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
"github.com/cilium/tetragon/pkg/k8s/client/clientset/versioned"
"github.com/cilium/tetragon/pkg/k8s/client/informers/externalversions"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/tracingpolicy"
k8sconf "github.com/cilium/tetragon/pkg/watcher/conf"
"github.com/cilium/tetragon/pkg/watcher"
)

// Log "missing tracing policy" message once.
Expand Down Expand Up @@ -182,16 +179,18 @@ func updateTracingPolicy(ctx context.Context, log logrus.FieldLogger, s *sensors
}
}

func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
func AddTracingPolicyInformer(ctx context.Context, w watcher.Watcher, s *sensors.Manager) error {
log := logger.GetLogger()
conf, err := k8sconf.K8sConfig()
if err != nil {
log.WithError(err).Fatal("couldn't get cluster config")
if w == nil {
return fmt.Errorf("k8s watcher not initialized")
}
factory := w.GetCRDInformerFactory()
if factory == nil {
return fmt.Errorf("CRD informer factory not initialized")
}
client := versioned.NewForConfigOrDie(conf)
factory := externalversions.NewSharedInformerFactory(client, 0)

factory.Cilium().V1alpha1().TracingPolicies().Informer().AddEventHandler(
tpInformer := factory.Cilium().V1alpha1().TracingPolicies().Informer()
tpInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addTracingPolicy(ctx, log, s, obj)
Expand All @@ -202,8 +201,13 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
updateTracingPolicy(ctx, log, s, oldObj, newObj)
}})
err := w.AddInformer("TracingPolicy", tpInformer, nil)
if err != nil {
return err
}

factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer().AddEventHandler(
tpnInformer := factory.Cilium().V1alpha1().TracingPoliciesNamespaced().Informer()
tpnInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addTracingPolicy(ctx, log, s, obj)
Expand All @@ -214,8 +218,10 @@ func WatchTracePolicy(ctx context.Context, s *sensors.Manager) {
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
updateTracingPolicy(ctx, log, s, oldObj, newObj)
}})
err = w.AddInformer("TracingPolicyNamespaced", tpnInformer, nil)
if err != nil {
return err
}

go factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
log.Info("Started watching tracing policies")
return nil
}

0 comments on commit 12ec831

Please sign in to comment.