Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (a *Aggregator) processk8s() {
a.processDaemonSet(d)
case k8s.STATEFULSET:
a.processStatefulSet(d)
case k8s.K8SEVENT:
a.processK8SEvent(d)
default:
log.Logger.Warn().Msgf("unknown resource type %s", d.ResourceType)
}
Expand Down Expand Up @@ -1277,7 +1279,6 @@ func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

log.Logger.Debug().Str("path", reqDto.Path).Msg("processmongoEvent persisting")
err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
Expand Down
18 changes: 18 additions & 0 deletions aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,21 @@ func (a *Aggregator) processStatefulSet(d k8s.K8sResourceMessage) {
go a.ds.PersistStatefulSet(dtoStatefulSet, DELETE)
}
}

func (a *Aggregator) processK8SEvent(d k8s.K8sResourceMessage) {
event := d.Object.(*corev1.Event)

dtoK8SEvent := datastore.K8SEvent{
EventName: event.Name,
Kind: event.InvolvedObject.Kind,
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
Uid: string(event.InvolvedObject.UID),
Reason: event.Reason,
Message: event.Message,
Count: event.Count,
FirstTimestamp: event.FirstTimestamp.UnixMilli(),
LastTimestamp: event.LastTimestamp.UnixMilli(),
}
go a.ds.PersistK8SEvent(dtoK8SEvent)
}
49 changes: 30 additions & 19 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ type BackendDS struct {
dsEventChan chan interface{} // *DaemonSetEvent
ssEventChan chan interface{} // *StatefulSetEvent

k8sEventChan chan interface{} // K8SEvent, kubectl events

// TODO add:
// job
// cronjob
Expand All @@ -181,6 +183,7 @@ const (
connEndpoint = "/connections/"
kafkaEventEndpoint = "/events/kafka/"
healthCheckEndpoint = "/healthcheck/"
k8sEventsEndpoint = "/k8s-events/"

// dist tracing disabled by default temporarily
// traceEventEndpoint = "/dist_tracing/traffic/"
Expand Down Expand Up @@ -286,24 +289,26 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
resourceChanSize := 200

ds := &BackendDS{
ctx: ctx,
host: conf.Host,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
kafkaEventInfoPool: newKafkaEventPool(func() *KafkaEventInfo { return &KafkaEventInfo{} }, func(r *KafkaEventInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
kafkaChanBuffer: make(chan *KafkaEventInfo, conf.ReqBufferSize),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
depEventChan: make(chan interface{}, 2*resourceChanSize),
epEventChan: make(chan interface{}, resourceChanSize),
containerEventChan: make(chan interface{}, 5*resourceChanSize),
dsEventChan: make(chan interface{}, resourceChanSize),
ssEventChan: make(chan interface{}, resourceChanSize),
ctx: ctx,
host: conf.Host,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
kafkaEventInfoPool: newKafkaEventPool(func() *KafkaEventInfo { return &KafkaEventInfo{} }, func(r *KafkaEventInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
kafkaChanBuffer: make(chan *KafkaEventInfo, conf.ReqBufferSize),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
depEventChan: make(chan interface{}, 2*resourceChanSize),
epEventChan: make(chan interface{}, resourceChanSize),
containerEventChan: make(chan interface{}, 5*resourceChanSize),
dsEventChan: make(chan interface{}, resourceChanSize),
ssEventChan: make(chan interface{}, resourceChanSize),
k8sEventChan: make(chan interface{}, resourceChanSize),

metricsExport: conf.MetricsExport,
gpuMetricsExport: conf.GpuMetricsExport,
metricsExportInterval: conf.MetricsExportInterval,
Expand Down Expand Up @@ -336,6 +341,7 @@ func (ds *BackendDS) Start() {
go ds.sendEventsInBatch(ds.containerEventChan, containerEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.dsEventChan, dsEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.ssEventChan, ssEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.k8sEventChan, k8sEventsEndpoint, eventsInterval)

// send node-exporter and nvidia-gpu metrics
go func() {
Expand Down Expand Up @@ -546,7 +552,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
return
}

// if endpoint == reqEndpoint {
// if endpoint == k8sEventsEndpoint {
// log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
// }
err = b.DoRequest(httpReq)
Expand Down Expand Up @@ -947,6 +953,11 @@ func (b *BackendDS) PersistContainer(c Container, eventType string) error {
return nil
}

func (b *BackendDS) PersistK8SEvent(ev K8SEvent) error {
b.k8sEventChan <- &ev
return nil
}

type HealthCheckAction string

const (
Expand Down
1 change: 1 addition & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type DataStore interface {
PersistContainer(c Container, eventType string) error
PersistDaemonSet(ds DaemonSet, eventType string) error
PersistStatefulSet(ss StatefulSet, eventType string) error
PersistK8SEvent(ev K8SEvent) error

PersistRequest(request *Request) error

Expand Down
13 changes: 13 additions & 0 deletions datastore/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ type StatefulSet struct {
Namespace string // Namespace
}

type K8SEvent struct {
EventName string // this field is unique
Kind string
Namespace string
Name string
Uid string
Reason string
Message string
Count int32
FirstTimestamp int64
LastTimestamp int64
}

type Deployment struct {
UID string // Deployment UID
Name string // Deployment Name
Expand Down
1 change: 1 addition & 0 deletions ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (e *EbpfCollector) close() {
log.Logger.Info().Msg("closing ebpf uprobes")

for pid := range e.sslWriteUprobes {
log.Logger.Debug().Uint32("pid", pid).Msg("closing sslWriteUprobes")
e.sslWriteUprobes[pid].Close()
}
log.Logger.Info().Msg("closed sslWriteUprobes")
Expand Down
13 changes: 13 additions & 0 deletions k8s/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
CONTAINER = "Container"
DAEMONSET = "DaemonSet"
STATEFULSET = "StatefulSet"
K8SEVENT = "Event"
)

const (
Expand All @@ -61,13 +62,19 @@ type K8sCollector struct {
daemonsetInformer appsv1.DaemonSetInformer
statefulSetInformer appsv1.StatefulSetInformer

eventInformer v1.EventInformer

Events chan interface{}
}

func (k *K8sCollector) Init(events chan interface{}) error {
log.Logger.Info().Msg("k8sCollector initializing...")
k.Events = events

// k8s events
k.eventInformer = k.informersFactory.Core().V1().Events()
k.watchers[K8SEVENT] = k.eventInformer.Informer()

// Pod
k.podInformer = k.informersFactory.Core().V1().Pods()
k.watchers[POD] = k.podInformer.Informer()
Expand Down Expand Up @@ -99,6 +106,12 @@ func (k *K8sCollector) Init(events chan interface{}) error {
defer runtime.HandleCrash()

// Add event handlers
k.watchers[K8SEVENT].AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: getOnAddK8SEventFunc(k.Events),
UpdateFunc: getOnUpdateK8SEventFunc(k.Events),
DeleteFunc: getOnDeleteK8SEventFunc(k.Events),
})

k.watchers[POD].AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: getOnAddPodFunc(k.Events),
UpdateFunc: getOnUpdatePodFunc(k.Events),
Expand Down
31 changes: 31 additions & 0 deletions k8s/k8sevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package k8s

func getOnAddK8SEventFunc(ch chan interface{}) func(interface{}) {
return func(obj interface{}) {
ch <- K8sResourceMessage{
ResourceType: K8SEVENT,
EventType: ADD,
Object: obj,
}
}
}

func getOnUpdateK8SEventFunc(ch chan interface{}) func(interface{}, interface{}) {
return func(oldObj, newObj interface{}) {
ch <- K8sResourceMessage{
ResourceType: K8SEVENT,
EventType: UPDATE,
Object: newObj,
}
}
}

func getOnDeleteK8SEventFunc(ch chan interface{}) func(interface{}) {
return func(obj interface{}) {
ch <- K8sResourceMessage{
ResourceType: K8SEVENT,
EventType: DELETE,
Object: obj,
}
}
}
1 change: 1 addition & 0 deletions resources/alaz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rules:
- deployments
- daemonsets
- statefulsets
- events
verbs:
- "get"
- "list"
Expand Down