diff --git a/aggregator/data.go b/aggregator/data.go index c87217e..1747ec2 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -229,9 +229,9 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, // set distinct mutex for every live process for pid := range a.liveProcesses { + a.muIndex.Add(1) a.muArray[a.muIndex.Load()] = &sync.RWMutex{} sockMaps[pid].mu = a.muArray[a.muIndex.Load()] - a.muIndex.Add(1) a.getAlreadyExistingSockets(pid) } @@ -455,15 +455,14 @@ func (a *Aggregator) processExec(d *proc.ProcEvent) { a.liveProcesses[d.Pid] = struct{}{} - // create lock on demand - a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{} - a.muIndex.Add(1) - // if duplicate exec event comes, underlying mutex will be changed // if first assigned mutex is locked and another exec event comes, mutex will be changed // and unlock of unlocked mutex now is a possibility // to avoid this case, if a socket map already has a mutex, don't change it if a.clusterInfo.SocketMaps[d.Pid].mu == nil { + // create lock on demand + a.muIndex.Add(1) + a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{} a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] } } @@ -687,8 +686,8 @@ func (a *Aggregator) processHttp2Frames() { return } - req.StartTime = d.EventReadTime req.Latency = d.WriteTimeNs - req.Latency + req.StartTime = d.EventReadTime req.Completed = true req.FromIP = skInfo.Saddr req.ToIP = skInfo.Daddr @@ -712,6 +711,11 @@ func (a *Aggregator) processHttp2Frames() { return } + if d.WriteTimeNs < req.Latency { + // ignore + return + } + a.ds.PersistRequest(req) } diff --git a/datastore/backend.go b/datastore/backend.go index b1c2546..cad499e 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -150,6 +150,10 @@ type BackendDS struct { traceInfoPool *poolutil.Pool[*TraceInfo] + metricsExport bool + gpuMetricsExport bool + metricsExportInterval int + podEventChan chan interface{} // *PodEvent svcEventChan chan interface{} // *SvcEvent depEventChan chan interface{} // *DepEvent @@ -277,32 +281,38 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe if err != nil { bs = defaultBatchSize } - resourceChanSize := 200 ds := &BackendDS{ - ctx: ctx, - host: conf.Host, - c: client, - batchSize: bs, - reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}), - aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}), - traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), - reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize), - connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize), - podEventChan: make(chan interface{}, 5*resourceChanSize), - svcEventChan: make(chan interface{}, 2*resourceChanSize), - rsEventChan: make(chan interface{}, 2*resourceChanSize), - depEventChan: make(chan interface{}, 2*resourceChanSize), - epEventChan: make(chan interface{}, resourceChanSize), - containerEventChan: make(chan interface{}, 5*resourceChanSize), - dsEventChan: make(chan interface{}, resourceChanSize), - traceEventQueue: list.New(), + ctx: ctx, + host: conf.Host, + c: client, + batchSize: bs, + reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}), + aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}), + traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), + reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize), + connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize), + podEventChan: make(chan interface{}, 5*resourceChanSize), + svcEventChan: make(chan interface{}, 2*resourceChanSize), + rsEventChan: make(chan interface{}, 2*resourceChanSize), + depEventChan: make(chan interface{}, 2*resourceChanSize), + epEventChan: make(chan interface{}, resourceChanSize), + containerEventChan: make(chan interface{}, 5*resourceChanSize), + dsEventChan: make(chan interface{}, resourceChanSize), + traceEventQueue: list.New(), + metricsExport: conf.MetricsExport, + gpuMetricsExport: conf.GpuMetricsExport, + metricsExportInterval: conf.MetricsExportInterval, } - go ds.sendReqsInBatch(bs) - go ds.sendConnsInBatch(bs) - go ds.sendTraceEventsInBatch(10 * bs) + return ds +} + +func (ds *BackendDS) Start() { + go ds.sendReqsInBatch(ds.batchSize) + go ds.sendConnsInBatch(ds.batchSize) + go ds.sendTraceEventsInBatch(10 * ds.batchSize) // events are resynced every 60 seconds on k8s informers // resourceBatchSize ~ burst size, if more than resourceBatchSize events are sent in a moment, blocking can occur @@ -321,17 +331,17 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe // send node-exporter and nvidia-gpu metrics go func() { - if !(conf.MetricsExport || conf.GpuMetricsExport) { + if !(ds.metricsExport || ds.gpuMetricsExport) { return } var nodeMetrics, gpuMetrics bool - if conf.MetricsExport { + if ds.metricsExport { go ds.exportNodeMetrics() nodeMetrics = true // by default } - if conf.GpuMetricsExport { + if ds.gpuMetricsExport { err := ds.exportGpuMetrics() if err != nil { log.Logger.Error().Msgf("error exporting gpu metrics: %v", err) @@ -340,7 +350,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe } } - t := time.NewTicker(time.Duration(conf.MetricsExportInterval) * time.Second) + t := time.NewTicker(time.Duration(ds.metricsExportInterval) * time.Second) for { select { case <-ds.ctx.Done(): @@ -380,8 +390,6 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe // ds.reqInfoPool.Done() log.Logger.Info().Msg("backend datastore stopped") }() - - return ds } func (b *BackendDS) enqueueTraceInfo(traceInfo *TraceInfo) { @@ -478,7 +486,7 @@ func (b *BackendDS) sendMetricsToBackend(r io.Reader) { return } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(b.ctx, 10*time.Second) defer cancel() resp, err := b.c.Do(req.WithContext(ctx)) @@ -980,7 +988,7 @@ type nodeExportLogger struct { } func (l nodeExportLogger) Log(keyvals ...interface{}) error { - l.logger.Debug().Msg(fmt.Sprint(keyvals...)) + // l.logger.Debug().Msg(fmt.Sprint(keyvals...)) return nil } diff --git a/ebpf/collector.go b/ebpf/collector.go index b2d1b79..ac611cd 100644 --- a/ebpf/collector.go +++ b/ebpf/collector.go @@ -168,12 +168,6 @@ func (e *EbpfCollector) close() { log.Logger.Info().Msg("ebpf collector closed") } -// in order to prevent the memory peak at the beginning -// we'll attach to processes one by one -func (e *EbpfCollector) ListenForEncryptedReqs(pid uint32) { - e.tlsAttachQueue <- pid -} - // we check the size of the executable before reading it into memory // because it can be very large // otherwise we can get stuck to memory limit defined in k8s @@ -191,6 +185,7 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() { e.mu.Unlock() go func(pid uint32) { + log.Logger.Debug().Str("ctx", "tls-uprobes").Uint32("pid", pid).Msg("attaching uprobes for encrypted connections") // attach to libssl uprobes if process is using libssl errs := e.AttachSslUprobesOnProcess("/proc", pid) if len(errs) > 0 { @@ -221,7 +216,6 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() { } }(pid) - } } diff --git a/ebpf/l7_req/bpf_bpfeb.o b/ebpf/l7_req/bpf_bpfeb.o index 8a1ecdb..e0ea293 100644 Binary files a/ebpf/l7_req/bpf_bpfeb.o and b/ebpf/l7_req/bpf_bpfeb.o differ diff --git a/ebpf/l7_req/bpf_bpfel.o b/ebpf/l7_req/bpf_bpfel.o index 51f4045..e77105b 100644 Binary files a/ebpf/l7_req/bpf_bpfel.o and b/ebpf/l7_req/bpf_bpfel.o differ diff --git a/ebpf/proc/bpf_bpfeb.o b/ebpf/proc/bpf_bpfeb.o new file mode 100644 index 0000000..5394728 Binary files /dev/null and b/ebpf/proc/bpf_bpfeb.o differ diff --git a/ebpf/proc/bpf_bpfel.o b/ebpf/proc/bpf_bpfel.o new file mode 100644 index 0000000..6e1509e Binary files /dev/null and b/ebpf/proc/bpf_bpfel.o differ diff --git a/ebpf/tcp_state/bpf_bpfeb.o b/ebpf/tcp_state/bpf_bpfeb.o index 7bc41b7..ca6d31b 100644 Binary files a/ebpf/tcp_state/bpf_bpfeb.o and b/ebpf/tcp_state/bpf_bpfeb.o differ diff --git a/ebpf/tcp_state/bpf_bpfel.o b/ebpf/tcp_state/bpf_bpfel.o index 58cec2b..a78ef10 100644 Binary files a/ebpf/tcp_state/bpf_bpfel.o and b/ebpf/tcp_state/bpf_bpfel.o differ diff --git a/main.go b/main.go index dcbeebb..c2f9b64 100644 --- a/main.go +++ b/main.go @@ -67,7 +67,6 @@ func main() { ReqBufferSize: 40000, // TODO: get from a conf file ConnBufferSize: 1000, // TODO: get from a conf file }) - go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, distTracingEnabled, k8sVersion) // deploy ebpf programs var ec *ebpf.EbpfCollector @@ -81,6 +80,8 @@ func main() { go ec.ListenEvents() } + dsBackend.Start() + go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, distTracingEnabled, k8sVersion) go http.ListenAndServe(":8181", nil) <-k8sCollector.Done()