From cf3c667e9226973e149017cf0da7fe156f9f23c9 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Mon, 22 Jan 2024 14:40:31 +0000 Subject: [PATCH 01/16] make bpf progs objects --- ebpf/bpf.go | 11 ++ ebpf/deploy.go | 56 ++++----- ebpf/l7_req/{main.go => l7.go} | 150 ++++++++++++------------ ebpf/proc/{main.go => proc.go} | 122 +++++++++++--------- ebpf/tcp_state/{main.go => tcp.go} | 178 +++++++++++++---------------- main.go | 3 +- 6 files changed, 264 insertions(+), 256 deletions(-) create mode 100644 ebpf/bpf.go rename ebpf/l7_req/{main.go => l7.go} (85%) rename ebpf/proc/{main.go => proc.go} (54%) rename ebpf/tcp_state/{main.go => tcp.go} (52%) diff --git a/ebpf/bpf.go b/ebpf/bpf.go new file mode 100644 index 0000000..4ebe786 --- /dev/null +++ b/ebpf/bpf.go @@ -0,0 +1,11 @@ +package ebpf + +import "context" + +type Program interface { + Load() // Load bpf program to kernel + Attach() // attach links to programs, in case error process must exit + InitMaps() // initialize bpf map readers, must be called before Consume + Consume(ctx context.Context, ch chan interface{}) // consume bpf events, publishes to chan provided + Close() // release resources +} diff --git a/ebpf/deploy.go b/ebpf/deploy.go index b222fbc..893e91d 100644 --- a/ebpf/deploy.go +++ b/ebpf/deploy.go @@ -41,7 +41,7 @@ type EbpfCollector struct { ebpfProcEvents chan interface{} tlsAttachQueue chan uint32 - // TODO: objectify l7_req and tcp_state + bpfPrograms map[string]Program sslWriteUprobes map[uint32]link.Link sslReadEnterUprobes map[uint32]link.Link @@ -59,6 +59,13 @@ type EbpfCollector struct { func NewEbpfCollector(parentCtx context.Context) *EbpfCollector { ctx, _ := context.WithCancel(parentCtx) + bpfPrograms := make(map[string]Program) + + // initialize bpfPrograms + bpfPrograms["tcp_state_prog"] = tcp_state.InitTcpStateProg(nil) + bpfPrograms["l7_prog"] = l7_req.InitL7Prog(nil) + bpfPrograms["proc_prog"] = proc.InitProcProg(nil) + return &EbpfCollector{ ctx: ctx, done: make(chan struct{}), @@ -72,6 +79,7 @@ func NewEbpfCollector(parentCtx context.Context) *EbpfCollector { goTlsReadUprobes: make(map[uint32]link.Link), goTlsReadUretprobes: make(map[uint32][]link.Link), tlsAttachQueue: make(chan uint32, 10), + bpfPrograms: bpfPrograms, } } @@ -87,41 +95,35 @@ func (e *EbpfCollector) EbpfProcEvents() chan interface{} { return e.ebpfProcEvents } -func (e *EbpfCollector) Deploy() { - // load programs and convert them to user space structs - go e.AttachUprobesForEncrypted() - - tcp_state.LoadBpfObjects() - l7_req.LoadBpfObjects() - proc.LoadBpfObjects() - - // function to version to program +func (e *EbpfCollector) Init() { + for _, p := range e.bpfPrograms { + p.Load() + p.Attach() + p.InitMaps() + } - wg := sync.WaitGroup{} - wg.Add(3) - go func() { - defer wg.Done() - tcp_state.DeployAndWait(e.ctx, e.ebpfEvents) - }() go func() { - defer wg.Done() - l7_req.DeployAndWait(e.ctx, e.ebpfEvents) + <-e.ctx.Done() + e.close() + close(e.done) }() - go func() { - defer wg.Done() - proc.DeployAndWait(e.ctx, e.ebpfProcEvents) - }() - wg.Wait() +} - log.Logger.Info().Msg("reading ebpf maps stopped") - e.close() - close(e.done) +func (e *EbpfCollector) ListenEvents() { + go e.bpfPrograms["tcp_state_prog"].Consume(e.ctx, e.ebpfEvents) + go e.bpfPrograms["l7_prog"].Consume(e.ctx, e.ebpfEvents) + go e.bpfPrograms["proc_prog"].Consume(e.ctx, e.ebpfProcEvents) - // go listenDebugMsgs() + go e.AttachUprobesForEncrypted() } func (e *EbpfCollector) close() { log.Logger.Info().Msg("closing ebpf links") + + for _, p := range e.bpfPrograms { + p.Close() + } + close(e.ebpfEvents) close(e.ebpfProcEvents) diff --git a/ebpf/l7_req/main.go b/ebpf/l7_req/l7.go similarity index 85% rename from ebpf/l7_req/main.go rename to ebpf/l7_req/l7.go index 521a923..0b6954b 100644 --- a/ebpf/l7_req/main.go +++ b/ebpf/l7_req/l7.go @@ -281,11 +281,59 @@ func (e *L7Event) Type() string { var L7BpfProgsAndMaps bpfObjects -func LoadBpfObjects() { +type L7ProgConfig struct { + TrafficBpfMapSize uint32 // specified in terms of os page size + L7EventsBpfMapSize uint32 // specified in terms of os page size + LogsBpfMapSize uint32 +} + +var defaultConfig *L7ProgConfig = &L7ProgConfig{ + TrafficBpfMapSize: 4096, + L7EventsBpfMapSize: 4096, + LogsBpfMapSize: 4, +} + +type L7Prog struct { + // links represent a program attached to a hook + links map[string]link.Link // key : hook name + + l7Events *perf.Reader + logs *perf.Reader + traffic *perf.Reader // ingress-egress calls + + l7EventsMapSize uint32 + trafficMapSize uint32 + logsMapsSize uint32 +} + +func InitL7Prog(conf *L7ProgConfig) *L7Prog { + if conf == nil { + conf = defaultConfig + } + + return &L7Prog{ + links: map[string]link.Link{}, + l7EventsMapSize: conf.L7EventsBpfMapSize, + trafficMapSize: conf.TrafficBpfMapSize, + logsMapsSize: conf.LogsBpfMapSize, + } +} + +func (l7p *L7Prog) Close() { + for hookName, link := range l7p.links { + log.Logger.Info().Msgf("unattach %s", hookName) + link.Close() + } + L7BpfProgsAndMaps.Close() +} + +// Loads bpf programs into kernel +func (l7p *L7Prog) Load() { // Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { log.Logger.Fatal().Err(err).Msg("failed to remove memlock limit") } + // Load pre-compiled programs and maps into the kernel. L7BpfProgsAndMaps = bpfObjects{} if err := loadBpfObjects(&L7BpfProgsAndMaps, nil); err != nil { @@ -293,130 +341,84 @@ func LoadBpfObjects() { } } -// returns when program is detached -func DeployAndWait(parentCtx context.Context, ch chan interface{}) { - ctx, _ := context.WithCancel(parentCtx) - defer L7BpfProgsAndMaps.Close() - - // link programs +func (l7p *L7Prog) Attach() { l, err := link.Tracepoint("syscalls", "sys_enter_read", L7BpfProgsAndMaps.bpfPrograms.SysEnterRead, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_enter_read tracepoint") } - log.Logger.Info().Msg("sys_enter_read linked") - defer func() { - log.Logger.Info().Msg("closing sys_enter_read tracepoint") - l.Close() - }() + l7p.links["syscalls/sys_enter_read"] = l l1, err := link.Tracepoint("syscalls", "sys_enter_write", L7BpfProgsAndMaps.bpfPrograms.SysEnterWrite, nil) if err != nil { - log.Logger.Warn().Str("verifier log", string(L7BpfProgsAndMaps.bpfPrograms.SysEnterWrite.VerifierLog)).Msg("verifier log") log.Logger.Fatal().Err(err).Msg("link sys_enter_write tracepoint") } - log.Logger.Info().Msg("sys_enter_write linked") - defer func() { - log.Logger.Info().Msg("closing sys_enter_write tracepoint") - l1.Close() - }() + l7p.links["syscalls/sys_enter_write"] = l1 l2, err := link.Tracepoint("syscalls", "sys_exit_read", L7BpfProgsAndMaps.bpfPrograms.SysExitRead, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_exit_read tracepoint") } - log.Logger.Info().Msg("sys_exit_read linked") - defer func() { - log.Logger.Info().Msg("closing sys_exit_read tracepoint") - l2.Close() - }() + l7p.links["syscalls/sys_exit_read"] = l2 l3, err := link.Tracepoint("syscalls", "sys_enter_sendto", L7BpfProgsAndMaps.bpfPrograms.SysEnterSendto, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_enter_sendto tracepoint") } - log.Logger.Info().Msg("sys_enter_sendto linked") - defer func() { - log.Logger.Info().Msg("closing sys_enter_sendto tracepoint") - l3.Close() - }() + l7p.links["syscalls/sys_enter_sendto"] = l3 l4, err := link.Tracepoint("syscalls", "sys_enter_recvfrom", L7BpfProgsAndMaps.bpfPrograms.SysEnterRecvfrom, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_enter_recvfrom tracepoint") } - log.Logger.Info().Msg("sys_enter_recvfrom linked") - defer func() { - log.Logger.Info().Msg("closing sys_enter_recvfrom tracepoint") - l4.Close() - }() + l7p.links["syscalls/sys_enter_recvfrom"] = l4 l5, err := link.Tracepoint("syscalls", "sys_exit_recvfrom", L7BpfProgsAndMaps.bpfPrograms.SysExitRecvfrom, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_exit_recvfrom tracepoint") } - log.Logger.Info().Msg("sys_exit_recvfrom linked") - defer func() { - log.Logger.Info().Msg("closing sys_exit_recvfrom tracepoint") - l5.Close() - }() + l7p.links["syscalls/sys_exit_recvfrom"] = l5 l6, err := link.Tracepoint("syscalls", "sys_exit_sendto", L7BpfProgsAndMaps.bpfPrograms.SysExitSendto, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_exit_sendto tracepoint") } - log.Logger.Info().Msg("sys_exit_sendto linked") - defer func() { - log.Logger.Info().Msg("closing sys_exit_sendto tracepoint") - l6.Close() - }() + l7p.links["syscalls/sys_exit_sendto"] = l6 l7, err := link.Tracepoint("syscalls", "sys_exit_write", L7BpfProgsAndMaps.bpfPrograms.SysExitWrite, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_exit_write tracepoint") } - log.Logger.Info().Msg("sys_exit_write linked") - defer func() { - log.Logger.Info().Msg("closing sys_exit_write tracepoint") - l7.Close() - }() + l7p.links["syscalls/sys_exit_write"] = l7 +} +func (l7p *L7Prog) InitMaps() { // initialize perf event readers - l7Events, err := perf.NewReader(L7BpfProgsAndMaps.L7Events, 4096*os.Getpagesize()) + var err error + l7p.l7Events, err = perf.NewReader(L7BpfProgsAndMaps.L7Events, int(l7p.l7EventsMapSize)*os.Getpagesize()) if err != nil { log.Logger.Fatal().Err(err).Msg("error creating perf event array reader") } - defer func() { - log.Logger.Info().Msg("closing l7 events perf event array reader") - l7Events.Close() - }() - logs, err := perf.NewReader(L7BpfProgsAndMaps.LogMap, 4*os.Getpagesize()) + l7p.logs, err = perf.NewReader(L7BpfProgsAndMaps.LogMap, int(l7p.logsMapsSize)*os.Getpagesize()) if err != nil { log.Logger.Fatal().Err(err).Msg("error creating perf event array reader") } - defer func() { - log.Logger.Info().Msg("closing l7 events perf event array reader") - logs.Close() - }() - distTraceCalls, err := perf.NewReader(L7BpfProgsAndMaps.IngressEgressCalls, 4096*os.Getpagesize()) + l7p.traffic, err = perf.NewReader(L7BpfProgsAndMaps.IngressEgressCalls, int(l7p.trafficMapSize)*os.Getpagesize()) if err != nil { log.Logger.Fatal().Err(err).Msg("error creating perf reader") } - defer func() { - log.Logger.Info().Msg("closing distTraceCalls perf reader") - distTraceCalls.Close() - }() +} - logsDone := make(chan struct{}, 1) - readDone := make(chan struct{}) - read2Done := make(chan struct{}) +// returns when program is detached +func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { + stop := make(chan struct{}) go func() { var logMessage []byte var funcName []byte read := func() { - record, err := logs.Read() + record, err := l7p.logs.Read() if err != nil { log.Logger.Warn().Err(err).Msg("error reading from perf array") } @@ -511,7 +513,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { } for { select { - case <-logsDone: + case <-stop: return default: read() @@ -522,7 +524,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { go func() { var record perf.Record read := func() { - err := l7Events.ReadInto(&record) + err := l7p.l7Events.ReadInto(&record) if err != nil { log.Logger.Warn().Err(err).Msg("error reading from perf array") } @@ -582,7 +584,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { } for { select { - case <-readDone: + case <-stop: return default: read() @@ -593,7 +595,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { go func() { var record perf.Record read := func() { - err := distTraceCalls.ReadInto(&record) + err := l7p.traffic.ReadInto(&record) if err != nil { log.Logger.Warn().Err(err).Msg("error reading from dist trace calls") } @@ -624,7 +626,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { } for { select { - case <-read2Done: + case <-stop: return default: read() @@ -633,9 +635,7 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) { }() <-ctx.Done() // wait for context to be cancelled - readDone <- struct{}{} - read2Done <- struct{}{} - logsDone <- struct{}{} + close(stop) // defers will clean up } diff --git a/ebpf/proc/main.go b/ebpf/proc/proc.go similarity index 54% rename from ebpf/proc/main.go rename to ebpf/proc/proc.go index 4a16c7a..bf1268a 100644 --- a/ebpf/proc/main.go +++ b/ebpf/proc/proc.go @@ -3,7 +3,6 @@ package proc import ( "context" "os" - "time" "unsafe" "github.com/ddosify/alaz/log" @@ -60,7 +59,42 @@ func (e ProcEvent) Type() string { var objs bpfObjects -func LoadBpfObjects() { +type ProcProgConfig struct { + ProcEventsMapSize uint32 // specified in terms of os page size +} + +var defaultConfig *ProcProgConfig = &ProcProgConfig{ + ProcEventsMapSize: 16, +} + +type ProcProg struct { + // links represent a program attached to a hook + links map[string]link.Link // key : hook name + ProcEvents *perf.Reader + ProcEventsMapSize uint32 +} + +func InitProcProg(conf *ProcProgConfig) *ProcProg { + if conf == nil { + conf = defaultConfig + } + + return &ProcProg{ + links: map[string]link.Link{}, + ProcEventsMapSize: conf.ProcEventsMapSize, + } +} + +func (pp *ProcProg) Close() { + for hookName, link := range pp.links { + log.Logger.Info().Msgf("unattach %s", hookName) + link.Close() + } + objs.Close() +} + +// Loads bpf programs into kernel +func (pp *ProcProg) Load() { // Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { log.Logger.Fatal().Err(err).Msg("failed to remove memlock limit") @@ -72,78 +106,56 @@ func LoadBpfObjects() { log.Logger.Fatal().Err(err).Msg("loading objects") } } - -// returns when program is detached -func DeployAndWait(parentCtx context.Context, ch chan interface{}) { - ctx, _ := context.WithCancel(parentCtx) - defer objs.Close() - - time.Sleep(1 * time.Second) - +func (pp *ProcProg) Attach() { l, err := link.Tracepoint("sched", "sched_process_exit", objs.bpfPrograms.SchedProcessExit, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sched_process_exit tracepoint") } - defer func() { - log.Logger.Info().Msg("closing sched_process_exit tracepoint") - l.Close() - }() + pp.links["sched/sched_process_exit"] = l l1, err := link.Tracepoint("sched", "sched_process_exec", objs.bpfPrograms.SchedProcessExec, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sched_process_exec tracepoint") } - defer func() { - log.Logger.Info().Msg("closing sched_process_exec tracepoint") - l1.Close() - }() + pp.links["sched/sched_process_exec"] = l1 +} - pEvents, err := perf.NewReader(objs.ProcEvents, 16*os.Getpagesize()) +func (pp *ProcProg) InitMaps() { + var err error + pp.ProcEvents, err = perf.NewReader(objs.ProcEvents, 16*os.Getpagesize()) if err != nil { log.Logger.Fatal().Err(err).Msg("error creating ringbuf reader") } - defer func() { - log.Logger.Info().Msg("closing pExitEvents ringbuf reader") - pEvents.Close() - }() - - // go listenDebugMsgs() - - readDone := make(chan struct{}) - go func() { - for { - read := func() { - record, err := pEvents.Read() - if err != nil { - log.Logger.Warn().Err(err).Msg("error reading from pExitEvents") - } - - if record.RawSample == nil || len(record.RawSample) == 0 { - log.Logger.Debug().Msgf("read sample l7-event nil or empty") - return - } - - bpfEvent := (*PEvent)(unsafe.Pointer(&record.RawSample[0])) +} - go func() { - ch <- &ProcEvent{ - Pid: bpfEvent.Pid, - Type_: ProcEventConversion(bpfEvent.Type_).String(), - } - }() +func (pp *ProcProg) Consume(ctx context.Context, ch chan interface{}) { + for { + read := func() { + record, err := pp.ProcEvents.Read() + if err != nil { + log.Logger.Warn().Err(err).Msg("error reading from proc events map") } - select { - case <-readDone: + if record.RawSample == nil || len(record.RawSample) == 0 { + log.Logger.Debug().Msgf("read sample l7-event nil or empty") return - default: - read() } + bpfEvent := (*PEvent)(unsafe.Pointer(&record.RawSample[0])) + + go func() { + ch <- &ProcEvent{ + Pid: bpfEvent.Pid, + Type_: ProcEventConversion(bpfEvent.Type_).String(), + } + }() } - }() - <-ctx.Done() // wait for context to be cancelled - readDone <- struct{}{} - // defers will clean up + select { + case <-ctx.Done(): + return + default: + read() + } + } } diff --git a/ebpf/tcp_state/main.go b/ebpf/tcp_state/tcp.go similarity index 52% rename from ebpf/tcp_state/main.go rename to ebpf/tcp_state/tcp.go index 2fbf28a..4419a30 100644 --- a/ebpf/tcp_state/main.go +++ b/ebpf/tcp_state/tcp.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "time" "unsafe" "github.com/ddosify/alaz/log" @@ -59,7 +58,7 @@ func (e TcpStateConversion) String() string { const mapKey uint32 = 0 // padding to match the kernel struct -type TcpEvent struct { +type BpfTcpEvent struct { Fd uint64 Timestamp uint64 Type uint32 @@ -90,7 +89,45 @@ func (e TcpConnectEvent) Type() string { var objs bpfObjects -func LoadBpfObjects() { +var TcpState *TcpStateProg + +type TcpStateConfig struct { + BpfMapSize uint32 // specified in terms of os page size +} + +var defaultConfig *TcpStateConfig = &TcpStateConfig{ + BpfMapSize: 64, +} + +func InitTcpStateProg(conf *TcpStateConfig) *TcpStateProg { + if conf == nil { + conf = defaultConfig + } + + return &TcpStateProg{ + links: map[string]link.Link{}, + tcpConnectMapSize: conf.BpfMapSize, + } +} + +type TcpStateProg struct { + // links represent a program attached to a hook + links map[string]link.Link // key : hook name + + tcpConnectMapSize uint32 + tcpConnectEvents *perf.Reader +} + +func (tsp *TcpStateProg) Close() { + for hookName, link := range tsp.links { + log.Logger.Info().Msgf("unattach %s", hookName) + link.Close() + } + objs.Close() +} + +// Loads bpf programs into kernel +func (tsp *TcpStateProg) Load() { // Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { log.Logger.Fatal().Err(err).Msg("failed to remove memlock limit") @@ -103,127 +140,72 @@ func LoadBpfObjects() { } } -// returns when program is detached -func DeployAndWait(parentCtx context.Context, ch chan interface{}) { - ctx, _ := context.WithCancel(parentCtx) - defer objs.Close() - - time.Sleep(1 * time.Second) - +func (tsp *TcpStateProg) Attach() { l, err := link.Tracepoint("sock", "inet_sock_set_state", objs.bpfPrograms.InetSockSetState, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link inet_sock_set_state tracepoint") } - defer func() { - log.Logger.Info().Msg("closing inet_sock_set_state tracepoint") - l.Close() - }() + tsp.links["sock/inet_sock_set_state"] = l l1, err := link.Tracepoint("syscalls", "sys_enter_connect", objs.bpfPrograms.SysEnterConnect, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_enter_connect tracepoint") } - defer func() { - log.Logger.Info().Msg("closing sys_enter_connect tracepoint") - l1.Close() - }() + tsp.links["syscalls/sys_enter_connect"] = l1 l2, err := link.Tracepoint("syscalls", "sys_exit_connect", objs.bpfPrograms.SysEnterConnect, nil) if err != nil { log.Logger.Fatal().Err(err).Msg("link sys_exit_connect tracepoint") } - defer func() { - log.Logger.Info().Msg("closing sys_exit_connect tracepoint") - l2.Close() - }() - - // initialize perf event readers - tcpListenEvents, err := perf.NewReader(objs.TcpListenEvents, 64*os.Getpagesize()) - if err != nil { - log.Logger.Fatal().Err(err).Msg("error creating perf event array reader") - } - defer func() { - log.Logger.Info().Msg("closing tcpListenEvents perf event reader") - tcpListenEvents.Close() - }() + tsp.links["syscalls/sys_exit_connect"] = l2 +} - tcpConnectEvents, err := perf.NewReader(objs.TcpConnectEvents, 64*os.Getpagesize()) +func (tsp *TcpStateProg) InitMaps() { + var err error + tsp.tcpConnectEvents, err = perf.NewReader(objs.TcpConnectEvents, int(tsp.tcpConnectMapSize)*os.Getpagesize()) if err != nil { log.Logger.Fatal().Err(err).Msg("error creating perf event array reader") } - defer func() { - log.Logger.Info().Msg("closing tcpConnectEvents perf event reader") - tcpConnectEvents.Close() - }() - - // go listenDebugMsgs() - - readDone := make(chan struct{}) - go func() { - for { - read := func() { - record, err := tcpConnectEvents.Read() - if err != nil { - log.Logger.Warn().Err(err).Msg("error reading from perf array") - } - - if record.LostSamples != 0 { - log.Logger.Warn().Msgf("lost samples tcp-connect %d", record.LostSamples) - } +} - if record.RawSample == nil || len(record.RawSample) == 0 { - return - } +// returns when program is detached +func (tsp *TcpStateProg) Consume(ctx context.Context, ch chan interface{}) { + for { + read := func() { + record, err := tsp.tcpConnectEvents.Read() + if err != nil { + log.Logger.Warn().Err(err).Msg("error reading from tcp connect event map") + } - bpfEvent := (*TcpEvent)(unsafe.Pointer(&record.RawSample[0])) - - go func() { - ch <- &TcpConnectEvent{ - Pid: bpfEvent.Pid, - Fd: bpfEvent.Fd, - Timestamp: bpfEvent.Timestamp, - Type_: TcpStateConversion(bpfEvent.Type).String(), - SPort: bpfEvent.SPort, - DPort: bpfEvent.DPort, - SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]), - DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]), - } - }() + if record.LostSamples != 0 { + log.Logger.Warn().Msgf("lost samples tcp-connect %d", record.LostSamples) } - select { - case <-readDone: + if record.RawSample == nil || len(record.RawSample) == 0 { return - default: - read() } + bpfEvent := (*BpfTcpEvent)(unsafe.Pointer(&record.RawSample[0])) + + go func() { + ch <- &TcpConnectEvent{ + Pid: bpfEvent.Pid, + Fd: bpfEvent.Fd, + Timestamp: bpfEvent.Timestamp, + Type_: TcpStateConversion(bpfEvent.Type).String(), + SPort: bpfEvent.SPort, + DPort: bpfEvent.DPort, + SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]), + DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]), + } + }() } - }() - - <-ctx.Done() // wait for context to be cancelled - readDone <- struct{}{} - // defers will clean up -} - -func listenDebugMsgs() { - printsPath := "/sys/kernel/debug/tracing/trace_pipe" - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - fd, err := os.Open(printsPath) - if err != nil { - log.Logger.Warn().Err(err).Msg("error opening trace_pipe") - } - defer fd.Close() - - buf := make([]byte, 1024) - for range ticker.C { - n, err := fd.Read(buf) - if err != nil { - log.Logger.Error().Err(err).Msg("error reading from trace_pipe") + select { + case <-ctx.Done(): + log.Logger.Info().Msg("stop consuming tcp events...") + return + default: + read() } - log.Logger.Debug().Msgf("read %d bytes: %s\n", n, buf[:n]) } } diff --git a/main.go b/main.go index 9ab5b4e..0acd468 100644 --- a/main.go +++ b/main.go @@ -63,7 +63,8 @@ func main() { var ec *ebpf.EbpfCollector if ebpfEnabled { ec = ebpf.NewEbpfCollector(ctx) - go ec.Deploy() + ec.Init() + go ec.ListenEvents() a := aggregator.NewAggregator(ctx, kubeEvents, ec, dsBackend) a.Run() From abaffaeb63af0ce077c4e422bbf6276c8a6c4e1b Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 23 Jan 2024 13:40:36 +0000 Subject: [PATCH 02/16] change filename --- ebpf/{deploy.go => collector.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename ebpf/{deploy.go => collector.go} (100%) diff --git a/ebpf/deploy.go b/ebpf/collector.go similarity index 100% rename from ebpf/deploy.go rename to ebpf/collector.go From 6cbfb29d8782462889e0d756f056f85767ea4156 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Wed, 24 Jan 2024 10:43:19 +0000 Subject: [PATCH 03/16] start simulator, mock k8s events --- aggregator/data.go | 45 ++++--- datastore/datastore.go | 48 -------- ebpf/collector.go | 4 + go.mod | 2 +- go.sum | 3 +- main.go | 2 +- main_benchmark_test.go | 274 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 309 insertions(+), 69 deletions(-) create mode 100644 main_benchmark_test.go diff --git a/aggregator/data.go b/aggregator/data.go index f5add87..b7e001e 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -43,9 +43,10 @@ type Aggregator struct { ctx context.Context // listen to events from different sources - k8sChan <-chan interface{} - ebpfChan <-chan interface{} - ebpfProcChan <-chan interface{} + k8sChan <-chan interface{} + ebpfChan <-chan interface{} + ebpfProcChan <-chan interface{} + tlsAttachSignalChan chan uint32 ec *ebpf.EbpfCollector @@ -154,7 +155,11 @@ func containsSQLKeywords(input string) bool { return re.MatchString(strings.ToUpper(input)) } -func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *ebpf.EbpfCollector, ds datastore.DataStore) *Aggregator { +func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, + events chan interface{}, + procEvents chan interface{}, + tlsAttachSignalChan chan uint32, + ds datastore.DataStore) *Aggregator { ctx, _ := context.WithCancel(parentCtx) clusterInfo := &ClusterInfo{ PodIPToPodUid: map[string]types.UID{}, @@ -163,18 +168,18 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *eb } a := &Aggregator{ - ctx: ctx, - k8sChan: k8sChan, - ebpfChan: ec.EbpfEvents(), - ebpfProcChan: ec.EbpfProcEvents(), - ec: ec, - clusterInfo: clusterInfo, - ds: ds, - h2Ch: make(chan *l7_req.L7Event, 1000000), - h2Parsers: make(map[string]*http2Parser), - h2Frames: make(map[string]*FrameArrival), - liveProcesses: make(map[uint32]struct{}), - rateLimiters: make(map[uint32]*rate.Limiter), + ctx: ctx, + k8sChan: k8sChan, + ebpfChan: events, + ebpfProcChan: procEvents, + clusterInfo: clusterInfo, + ds: ds, + tlsAttachSignalChan: tlsAttachSignalChan, + h2Ch: make(chan *l7_req.L7Event, 1000000), + h2Parsers: make(map[string]*http2Parser), + h2Frames: make(map[string]*FrameArrival), + liveProcesses: make(map[uint32]struct{}), + rateLimiters: make(map[uint32]*rate.Limiter), } go a.clearSocketLines(ctx) @@ -399,8 +404,12 @@ func (a *Aggregator) processExit(pid uint32) { a.rateLimitMu.Unlock() } +func (a *Aggregator) signalTlsAttachment(pid uint32) { + a.tlsAttachSignalChan <- pid +} + func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { - go a.ec.ListenForEncryptedReqs(d.Pid) + go a.signalTlsAttachment(d.Pid) if d.Type_ == tcp_state.EVENT_TCP_ESTABLISHED { // filter out localhost connections if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" { @@ -1037,7 +1046,7 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap { a.clusterInfo.PidToSocketMap[pid] = sockMap a.clusterInfo.mu.Unlock() // unlock for writing - go a.ec.ListenForEncryptedReqs(pid) + go a.signalTlsAttachment(pid) } return sockMap } diff --git a/datastore/datastore.go b/datastore/datastore.go index 78623ca..83358cf 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -2,7 +2,6 @@ package datastore import ( "github.com/ddosify/alaz/ebpf/l7_req" - "github.com/ddosify/alaz/log" ) type DataStore interface { @@ -18,50 +17,3 @@ type DataStore interface { PersistTraceEvent(trace *l7_req.TraceEvent) error } - -type MockDataStore struct { -} - -func (m *MockDataStore) PersistPod(pod Pod, eventType string) error { - log.Logger.Debug().Str("pod", pod.Name).Msg("PersistPod") - return nil -} - -func (m *MockDataStore) PersistService(service Service, eventType string) error { - log.Logger.Debug().Str("service", service.Name).Msg("PersistService") - return nil -} - -func (m *MockDataStore) PersistReplicaSet(rs ReplicaSet, eventType string) error { - log.Logger.Debug().Str("replicaset", rs.Name).Msg("PersistReplicaSet") - return nil -} - -func (m *MockDataStore) PersistDeployment(d Deployment, eventType string) error { - log.Logger.Debug().Str("deployment", d.Name).Msg("PersistDeployment") - return nil -} - -func (m *MockDataStore) PersistEndpoints(e Endpoints, eventType string) error { - log.Logger.Debug().Str("endpoints", e.Name).Msg("PersistEndpoints") - return nil -} - -func (m *MockDataStore) PersistContainer(c Container, eventType string) error { - log.Logger.Debug().Str("container", c.Name).Msg("PersistContainer") - return nil -} - -func (m *MockDataStore) PersistDaemonSet(ds DaemonSet, eventType string) error { - log.Logger.Debug().Str("daemonset", ds.Name).Msg("PersistDaemonSet") - return nil -} - -func (m *MockDataStore) PersistRequest(request *Request) error { - log.Logger.Debug().Bool("isTls", request.Tls).Str("path", request.Path).Msg("PersistRequest") - return nil -} - -func (m *MockDataStore) PersistTraceEvent(trace *l7_req.TraceEvent) error { - return nil -} diff --git a/ebpf/collector.go b/ebpf/collector.go index 893e91d..7264157 100644 --- a/ebpf/collector.go +++ b/ebpf/collector.go @@ -95,6 +95,10 @@ func (e *EbpfCollector) EbpfProcEvents() chan interface{} { return e.ebpfProcEvents } +func (e *EbpfCollector) TlsAttachQueue() chan uint32 { + return e.tlsAttachQueue +} + func (e *EbpfCollector) Init() { for _, p := range e.bpfPrograms { p.Load() diff --git a/go.mod b/go.mod index 8e581d8..f655d98 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/alecthomas/kingpin/v2 v2.3.2 github.com/cilium/ebpf v0.10.1-0.20230626090016-654491c8a500 + github.com/cilium/fake v0.6.1 github.com/go-kit/log v0.2.1 github.com/hashicorp/go-retryablehttp v0.7.4 github.com/prometheus/client_golang v1.16.0 @@ -77,7 +78,6 @@ require ( github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/safchain/ethtool v0.3.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/testify v1.8.4 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.ddosify.com/ddosify v1.0.5 go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 84d9edb..4b5adde 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cilium/ebpf v0.10.1-0.20230626090016-654491c8a500 h1:eAn1/gEVvcamZLoF4JKznmG2zKABsF7mRisyfQtwa3Q= github.com/cilium/ebpf v0.10.1-0.20230626090016-654491c8a500/go.mod h1:WE7CZAnqOL2RouJ4f1uyNhqr2P4CCvXFIqdRDUgWsVs= +github.com/cilium/fake v0.6.1 h1:cLkNx1nkF0b0pPW79JaQxaI5oG2/rBzRKpp0YUg1fTA= +github.com/cilium/fake v0.6.1/go.mod h1:V9lCbbcsnSf3vB6sdOP7Q0bsUUJ/jyHPZxnFAw5nPUc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -218,7 +220,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= diff --git a/main.go b/main.go index 0acd468..5939654 100644 --- a/main.go +++ b/main.go @@ -66,7 +66,7 @@ func main() { ec.Init() go ec.ListenEvents() - a := aggregator.NewAggregator(ctx, kubeEvents, ec, dsBackend) + a := aggregator.NewAggregator(ctx, kubeEvents, ec.EbpfEvents(), ec.EbpfProcEvents(), ec.TlsAttachQueue(), dsBackend) a.Run() } diff --git a/main_benchmark_test.go b/main_benchmark_test.go new file mode 100644 index 0000000..be3e804 --- /dev/null +++ b/main_benchmark_test.go @@ -0,0 +1,274 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "os/signal" + "runtime" + "runtime/debug" + "syscall" + "testing" + "time" + + "github.com/cilium/fake" + "github.com/ddosify/alaz/aggregator" + "github.com/ddosify/alaz/datastore" + "github.com/ddosify/alaz/ebpf/l7_req" + "github.com/ddosify/alaz/k8s" + "github.com/ddosify/alaz/log" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestMain(m *testing.M) { + // TODO: read simulation config from a file + + // TODO: this code gets mem profile at exit + // we need to get it periodically with top output too + // memProfFile, err := os.Create("memprof.out") + // if err != nil { + // log.Logger.Fatal().Err(err).Msg("could not create memory profile") + // } + // defer memProfFile.Close() // error handling omitted for example + // defer func() { + // pprof.Lookup("allocs").WriteTo(memProfFile, 0) + // // if you want to check live heap objects: + // // runtime.GC() // get up-to-date statistics + // // pprof.Lookup("heap").WriteTo(memProfFile, 0) + // }() + + log.Logger.Info().Msg("simulation starts...") + conf := &SimulatorConfig{ + // TODO: get these from a config file + kubeEventsBufferSize: 1000, + ebpfEventsBufferSize: 100000, + ebpfProcEventsBufferSize: 100, + tlsAttachQueueBufferSize: 10, + } + go start(conf) + + <-time.After(10 * time.Second) + PrintMemUsage() + syscall.Kill(syscall.Getpid(), syscall.SIGINT) +} + +func PrintMemUsage() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + +func start(conf *SimulatorConfig) { + // TODO: call this func from another test, and after some time send a sigkill + // measure memory and cpu resources + + sim := CreateSimulator(conf) + sim.Setup() + + debug.SetGCPercent(80) + ctx, cancel := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + go func() { + <-c + signal.Stop(c) + cancel() + }() + + go sim.Simulate() + + a := aggregator.NewAggregator(ctx, sim.getKubeEvents(), sim.getEbpfEvents(), + sim.getEbpfProcEvents(), sim.getTlsAttachQueue(), sim.getDataStore()) + a.Run() + + go http.ListenAndServe(":8181", nil) + + <-ctx.Done() + log.Logger.Info().Msg("simulation finished") +} + +type Simulator struct { + kubeEvents chan interface{} // will be sent k8s events + // mockCollector ? + ebpfEvents chan interface{} + ebpfProcEvents chan interface{} + tlsAttachQueue chan uint32 + + mockDs datastore.DataStore +} + +type SimulatorConfig struct { + // number of processes + // pod and services + // k8s IPs must match with tcp and l7 events produced + // tcp and l7 events rates + // http, http2, grpc, postgres, rabbitmq calls + // outbound calls + + kubeEventsBufferSize int + ebpfEventsBufferSize int + ebpfProcEventsBufferSize int + tlsAttachQueueBufferSize int +} + +func CreateSimulator(conf *SimulatorConfig) *Simulator { + return &Simulator{ + kubeEvents: make(chan interface{}, conf.kubeEventsBufferSize), + ebpfEvents: make(chan interface{}, conf.ebpfEventsBufferSize), + ebpfProcEvents: make(chan interface{}, conf.ebpfProcEventsBufferSize), + tlsAttachQueue: make(chan uint32, conf.tlsAttachQueueBufferSize), + mockDs: &MockDataStore{}, + } +} + +func (s *Simulator) getKubeEvents() chan interface{} { + return s.kubeEvents +} + +func (s *Simulator) getEbpfEvents() chan interface{} { + return s.ebpfEvents +} + +func (s *Simulator) getEbpfProcEvents() chan interface{} { + return s.ebpfProcEvents +} + +func (s *Simulator) getTlsAttachQueue() chan uint32 { + return s.tlsAttachQueue +} + +type FakePod struct { + Name string + IP string + Image string +} + +func (s *Simulator) Setup() { + // Create Kubernetes Workloads + // K8sResourceMessage + + podCount := 30 + + pods := make(map[string]*FakePod) + + for i := 0; i < podCount; i++ { + // TODO: namespace + podName := fake.Name() + podIP := fake.IP() + mainContainerImage := fake.Name() + + pods[podName] = &FakePod{ + Name: podName, + IP: podIP, + Image: mainContainerImage, + } + } + + for _, p := range pods { + s.PodCreateEvent(p.Name, p.IP, p.Image) + } + + // create services + // then create traffic between pods and services + + s.ServiceCreateEvent("my-service", "10.123.42.99", types.UID("uid-service")) + +} + +func (s *Simulator) PodCreateEvent(name string, ip string, image string) { + obj := &corev1.Pod{} + obj.Name = name + obj.Status.PodIP = ip + obj.Spec.Containers = make([]corev1.Container, 0) + obj.Spec.Containers = append(obj.Spec.Containers, corev1.Container{ + Image: image, + }) + s.kubeEvents <- k8s.K8sResourceMessage{ + ResourceType: k8s.POD, + EventType: k8s.ADD, + Object: obj, + } +} + +func (s *Simulator) ServiceCreateEvent(name string, ip string, uid types.UID) { + obj := &corev1.Service{} + obj.Spec.ClusterIP = ip + obj.Name = name + obj.UID = uid + + s.kubeEvents <- k8s.K8sResourceMessage{ + ResourceType: k8s.SERVICE, + EventType: k8s.ADD, + Object: obj, + } +} + +func (s *Simulator) Simulate() { + // TODO: create traffic at various rates + // tcp events and l7 events +} + +func (s *Simulator) getDataStore() datastore.DataStore { + return s.mockDs +} + +type MockDataStore struct { + // TODO: mimic backend speed and timeouts +} + +func (m *MockDataStore) PersistPod(pod datastore.Pod, eventType string) error { + log.Logger.Info().Str("pod", pod.Name).Msg("PersistPod") + return nil +} + +func (m *MockDataStore) PersistService(service datastore.Service, eventType string) error { + log.Logger.Info().Str("service", service.Name).Msg("PersistService") + return nil +} + +func (m *MockDataStore) PersistReplicaSet(rs datastore.ReplicaSet, eventType string) error { + log.Logger.Info().Str("replicaset", rs.Name).Msg("PersistReplicaSet") + return nil +} + +func (m *MockDataStore) PersistDeployment(d datastore.Deployment, eventType string) error { + log.Logger.Info().Str("deployment", d.Name).Msg("PersistDeployment") + return nil +} + +func (m *MockDataStore) PersistEndpoints(e datastore.Endpoints, eventType string) error { + log.Logger.Info().Str("endpoints", e.Name).Msg("PersistEndpoints") + return nil +} + +func (m *MockDataStore) PersistContainer(c datastore.Container, eventType string) error { + log.Logger.Info().Str("container", c.Name).Msg("PersistContainer") + return nil +} + +func (m *MockDataStore) PersistDaemonSet(ds datastore.DaemonSet, eventType string) error { + log.Logger.Info().Str("daemonset", ds.Name).Msg("PersistDaemonSet") + return nil +} + +func (m *MockDataStore) PersistRequest(request *datastore.Request) error { + log.Logger.Info().Bool("isTls", request.Tls).Str("path", request.Path).Msg("PersistRequest") + return nil +} + +func (m *MockDataStore) PersistTraceEvent(trace *l7_req.TraceEvent) error { + log.Logger.Info().Msg("PersistTraceEvent") + return nil +} From 4beace4dd0ff114b44efe8b01f82f6138dbb0587 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 30 Jan 2024 16:06:34 +0000 Subject: [PATCH 04/16] make buffer sizes configurable, generate traffic at given rate, measure memory metrics --- main_benchmark_test.go | 461 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 407 insertions(+), 54 deletions(-) diff --git a/main_benchmark_test.go b/main_benchmark_test.go index be3e804..25a51cd 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -3,12 +3,12 @@ package main import ( "context" "fmt" + "math/rand" "net/http" - "os" - "os/signal" - "runtime" - "runtime/debug" - "syscall" + "runtime/metrics" + + "sync" + "sync/atomic" "testing" "time" @@ -16,14 +16,35 @@ import ( "github.com/ddosify/alaz/aggregator" "github.com/ddosify/alaz/datastore" "github.com/ddosify/alaz/ebpf/l7_req" + "github.com/ddosify/alaz/ebpf/tcp_state" "github.com/ddosify/alaz/k8s" "github.com/ddosify/alaz/log" + "golang.org/x/time/rate" + "github.com/prometheus/procfs" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + + "github.com/stretchr/testify/assert" ) -func TestMain(m *testing.M) { +var testDuration = 10 // in seconds +var memProfInterval = 5 // in seconds +var podCount = 100 +var serviceCount = 50 +var edgeCount = 20 +var edgeRate = 50000 // events per second on a single edge + +var kubeEventsBufferSize = 1000 +var ebpfEventsBufferSize = 8000 +var ebpfProcEventsBufferSize = 100 +var tlsAttachQueueBufferSize = 10 + +// expected total request count = testDuration * edgeCount * edgeRate +// 60 * 10 * 1000 = 600000 ≈ 600k +// 60 * 10 * 5000 = 3000000 ≈ 3m + +func TestSimulation(t *testing.T) { // TODO: read simulation config from a file // TODO: this code gets mem profile at exit @@ -43,51 +64,162 @@ func TestMain(m *testing.M) { log.Logger.Info().Msg("simulation starts...") conf := &SimulatorConfig{ // TODO: get these from a config file - kubeEventsBufferSize: 1000, - ebpfEventsBufferSize: 100000, - ebpfProcEventsBufferSize: 100, - tlsAttachQueueBufferSize: 10, + kubeEventsBufferSize: kubeEventsBufferSize, + ebpfEventsBufferSize: ebpfEventsBufferSize, + ebpfProcEventsBufferSize: ebpfProcEventsBufferSize, + tlsAttachQueueBufferSize: tlsAttachQueueBufferSize, } - go start(conf) - <-time.After(10 * time.Second) - PrintMemUsage() - syscall.Kill(syscall.Getpid(), syscall.SIGINT) + ctx, cancel := context.WithCancel(context.Background()) + + sim := CreateSimulator(conf) + var totalReqProcessed uint32 + go func() { + totalReqProcessed = sim.start(ctx, conf) + }() + + go func() { + t := time.NewTicker(time.Duration(memProfInterval) * time.Second) + for range t.C { + PrintMemUsage() + } + }() + + <-time.After(time.Duration(testDuration) * time.Second) // test duration + cancel() + <-sim.simDone // wait for simulator to stop + + time.Sleep(1 * time.Second) // wait for totalReqProcessed to be set + expectedTotalReqProcessed := uint32(testDuration * edgeCount * edgeRate) + errorMargin := 10 + + l := expectedTotalReqProcessed * uint32(100-errorMargin) / 100 + assert.GreaterOrEqual(t, totalReqProcessed, l, "actual request count is less than expected") + + <-time.After(time.Duration(memProfInterval) * time.Second) // time interval for retrival of mem usage after simulation stops } +var memMetrics = []metrics.Sample{ + // Cumulative sum of memory allocated to the heap by the + // application. + {Name: "/gc/heap/allocs:bytes"}, + // Memory occupied by live objects and dead objects that have not + // yet been marked free by the garbage collector. + // AKA HeapInUse + {Name: "/memory/classes/heap/objects:bytes"}, + // Count of completed GC cycles generated by the Go runtime. + {Name: "/gc/cycles/automatic:gc-cycles"}, + // Count of all completed GC cycles. + {Name: "/gc/cycles/total:gc-cycles"}, + // GOGC + {Name: "/gc/gogc:percent"}, + // GOMEMLIMIT + {Name: "/gc/gomemlimit:bytes"}, + // Memory that is completely free and eligible to be returned to + // the underlying system, but has not been. This metric is the + // runtime's estimate of free address space that is backed by + // physical memory. Btw even if goruntime release a memory block, OS will reclaim it at an appropiate moment + // not immediately. Most likely in case of a memory pressure in system. + {Name: "/memory/classes/heap/free:bytes"}, + // Memory that is completely free and has been returned to the + // underlying system. This metric is the runtime's estimate of free + // address space that is still mapped into the process, but is not + // backed by physical memory. + // can be recognized as rate of mem page transactions between process and OS. + {Name: "/memory/classes/heap/released:bytes"}, + // Memory that is reserved for heap objects but is not currently + // used to hold heap objects. + {Name: "/memory/classes/heap/unused:bytes"}, + // All memory mapped by the Go runtime into the current process + // as read-write. Note that this does not include memory mapped + // by code called via cgo or via the syscall package. Sum of all + // metrics in /memory/classes. + {Name: "/memory/classes/total:bytes"}, + // Memory allocated from the heap that is reserved for stack space, + // whether or not it is currently in-use. Currently, this + // represents all stack memory for goroutines. It also includes all + // OS thread stacks in non-cgo programs. Note that stacks may be + // allocated differently in the future, and this may change. + {Name: "/memory/classes/heap/stacks:bytes"}, + // Count of live goroutines + {Name: "/sched/goroutines:goroutines"}, +} + +// RES can be summarized as +// Instructions and static variables belong to executable are mapped on RAM (Pss_File in smaps_rollup output) +// StackInUse +// HeapInUse reported by go runtime +// Memory that are eligible to be returned to OS, but not has been by go runtime. (/memory/classes/heap/free:bytes) +// Memory that has been reserved for heap objects but unused. (/memory/classes/heap/unused:bytes) +// LazyFree pages that are returned to OS with madvise syscall but not yet reclaimed by OS. + func PrintMemUsage() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - // For info on each, see: https://golang.org/pkg/runtime/#MemStats - fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) - fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) - fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) - fmt.Printf("\tNumGC = %v\n", m.NumGC) + + // Memory statistics are recorded after a GC run. + // Trigger GC to have latest state of heap. + // runtime.GC() // triggered each time PrintMemUsage called, preventing us observing the normal GC behaviour. + metrics.Read(memMetrics) + + HeapInUse := bToMb(memMetrics[1].Value.Uint64()) + HeapFree := bToMb(memMetrics[6].Value.Uint64()) + HeapUnused := bToMb(memMetrics[8].Value.Uint64()) + Stack := bToMb(memMetrics[10].Value.Uint64()) + LiveGoroutines := memMetrics[11].Value.Uint64() + + fmt.Printf("Total bytes allocated: %v", bToMb(memMetrics[0].Value.Uint64())) + fmt.Printf("\tIn-use bytes: %v", HeapInUse) + // fmt.Printf("\tAutomatic gc cycles: %v", (memMetrics[2].Value.Uint64())) + fmt.Printf("\tTotal gc cycles: %v", (memMetrics[3].Value.Uint64())) + // fmt.Printf("\tGOGC percent: %v", (memMetrics[4].Value.Uint64())) + // fmt.Printf("\tGOMEMLIMIT: %v\n", bToMb(memMetrics[5].Value.Uint64())) + // fmt.Printf("\tHeapFree: %v", HeapFree) + // fmt.Printf("\tHeapReleased: %v", bToMb(memMetrics[7].Value.Uint64())) + // fmt.Printf("\tHeapUnused: %v", HeapUnused) + // fmt.Printf("\tTotal: %v", bToMb(memMetrics[9].Value.Uint64())) + // fmt.Printf("\tStack: %v", Stack) + fmt.Printf("\tLiveGoroutines: %v", LiveGoroutines) + + proc, err := procfs.Self() + if err != nil { + log.Logger.Fatal().Err(err) + } + smapRollup, err := proc.ProcSMapsRollup() + if err != nil { + log.Logger.Fatal().Err(err) + } + + // Anonymous pages of process that are mapped on RAM. Includes heap area. + Anonymous := bToMb(smapRollup.Anonymous) + // Resident Set Size, total size of memory that process has mapped on RAM. + Rss := bToMb(smapRollup.Rss) + // Pss_File := Rss - Anonymous // estimating instructions and static variables belongs to the executable + + fmt.Printf("\tAnonymous: %v", Anonymous) + fmt.Printf("\tRss: %v", Rss) + + goRuntimeMetrics := (HeapInUse + HeapFree + HeapUnused + Stack) + var diff uint64 + if Anonymous > goRuntimeMetrics { + diff = Anonymous - goRuntimeMetrics + } else { + diff = goRuntimeMetrics - Anonymous + } + fmt.Printf("\tDiff %d\n", diff) + } func bToMb(b uint64) uint64 { return b / 1024 / 1024 } -func start(conf *SimulatorConfig) { +func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) uint32 { // TODO: call this func from another test, and after some time send a sigkill // measure memory and cpu resources - sim := CreateSimulator(conf) sim.Setup() - debug.SetGCPercent(80) - ctx, cancel := context.WithCancel(context.Background()) - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) - go func() { - <-c - signal.Stop(c) - cancel() - }() - - go sim.Simulate() + // debug.SetGCPercent(80) + go sim.Simulate(ctx) a := aggregator.NewAggregator(ctx, sim.getKubeEvents(), sim.getEbpfEvents(), sim.getEbpfProcEvents(), sim.getTlsAttachQueue(), sim.getDataStore()) @@ -95,8 +227,11 @@ func start(conf *SimulatorConfig) { go http.ListenAndServe(":8181", nil) - <-ctx.Done() + <-sim.simDone // wait for simulation to stop to print metrics log.Logger.Info().Msg("simulation finished") + totalReqProcessed := sim.getDataStore().(*MockDataStore).ReqCount.Load() + log.Logger.Info().Uint32("totalReq", totalReqProcessed).Str("tReq", ToText(totalReqProcessed)).Msg("totalReqCount") + return totalReqProcessed } type Simulator struct { @@ -107,6 +242,11 @@ type Simulator struct { tlsAttachQueue chan uint32 mockDs datastore.DataStore + + pods map[string]*FakePod + services map[string]*FakeService + + simDone chan struct{} } type SimulatorConfig struct { @@ -130,6 +270,9 @@ func CreateSimulator(conf *SimulatorConfig) *Simulator { ebpfProcEvents: make(chan interface{}, conf.ebpfProcEventsBufferSize), tlsAttachQueue: make(chan uint32, conf.tlsAttachQueueBufferSize), mockDs: &MockDataStore{}, + pods: map[string]*FakePod{}, + services: map[string]*FakeService{}, + simDone: make(chan struct{}), } } @@ -153,44 +296,72 @@ type FakePod struct { Name string IP string Image string + Uid types.UID + + // + Pid uint32 + Fds map[uint64]struct{} + OpenConnections map[uint64]uint64 // fd -> timestamp +} + +type FakeService struct { + Name string + IP string + UID types.UID } func (s *Simulator) Setup() { // Create Kubernetes Workloads // K8sResourceMessage - podCount := 30 - - pods := make(map[string]*FakePod) - for i := 0; i < podCount; i++ { // TODO: namespace podName := fake.Name() - podIP := fake.IP() + podIP := fake.IP(fake.WithIPv4()) mainContainerImage := fake.Name() - - pods[podName] = &FakePod{ - Name: podName, - IP: podIP, - Image: mainContainerImage, + uid := types.UID(fake.Name()) + pid := rand.Uint32() + + s.pods[podName] = &FakePod{ + Name: podName, + IP: podIP, + Image: mainContainerImage, + Uid: uid, + Pid: pid, + Fds: map[uint64]struct{}{}, + OpenConnections: map[uint64]uint64{}, } } - for _, p := range pods { - s.PodCreateEvent(p.Name, p.IP, p.Image) + for _, p := range s.pods { + s.PodCreateEvent(p.Name, p.IP, p.Image, p.Uid) } // create services // then create traffic between pods and services - s.ServiceCreateEvent("my-service", "10.123.42.99", types.UID("uid-service")) + for i := 0; i < serviceCount; i++ { + // TODO: namespace + svcName := fake.Name() + svcIP := fake.IP(fake.WithIPv4()) + s.services[svcName] = &FakeService{ + Name: svcName, + IP: svcIP, + UID: types.UID(fake.Name()), + } + } + + for _, svc := range s.services { + s.ServiceCreateEvent(svc.Name, svc.IP, svc.UID) + } } -func (s *Simulator) PodCreateEvent(name string, ip string, image string) { +func (s *Simulator) PodCreateEvent(name string, ip string, image string, uid types.UID) { obj := &corev1.Pod{} obj.Name = name obj.Status.PodIP = ip + obj.UID = uid obj.Spec.Containers = make([]corev1.Container, 0) obj.Spec.Containers = append(obj.Spec.Containers, corev1.Container{ Image: image, @@ -215,9 +386,163 @@ func (s *Simulator) ServiceCreateEvent(name string, ip string, uid types.UID) { } } -func (s *Simulator) Simulate() { +func (sim *Simulator) Simulate(ctx context.Context) { // TODO: create traffic at various rates // tcp events and l7 events + podKeys := make([]string, 0) + svcKeys := make([]string, 0) + + for name, _ := range sim.pods { + n := name + podKeys = append(podKeys, n) + } + + for name, _ := range sim.services { + n := name + svcKeys = append(svcKeys, n) + } + + ec := edgeCount + // retryLimit changed to 1 on aggregator + // processL7 exiting, stop retrying... // retry blocks workers + + wg := &sync.WaitGroup{} + for ec > 0 { + ec-- + + // select one pod and service + // TODO: these randoms conflict ???? + + pod := sim.pods[podKeys[rand.Intn(len(podKeys))]] + svc := sim.services[svcKeys[rand.Intn(len(svcKeys))]] + + // get a unique fd + var fd uint64 + for { + fd = rand.Uint64() + if _, ok := pod.Fds[fd]; !ok { + pod.Fds[fd] = struct{}{} + break + } + } + + tx := rand.Uint64() + pod.OpenConnections[fd] = tx + cc := &ConnectionConfig{ + Pid: pod.Pid, + Fd: fd, + Saddr: pod.IP, + Daddr: svc.IP, + Tx: tx, + PodName: pod.Name, + SvcName: svc.Name, + } + + sim.constructSockets([]*ConnectionConfig{cc}) + wg.Add(1) + // simulate traffic + go func(wg *sync.WaitGroup) { + sim.httpTraffic(ctx, &Traffic{ + pod: pod, + fd: fd, + svc: svc, + rate: rate.NewLimiter(rate.Limit(edgeRate), edgeRate), // 1000 events per second + protocol: l7_req.L7_PROTOCOL_HTTP, + }) + wg.Done() + }(wg) + } + + wg.Wait() + close(sim.simDone) +} + +type ConnectionConfig struct { + Pid uint32 // source pid + Fd uint64 + Saddr string // podIP + Daddr string // svcIP + Tx uint64 // timestamp of connection start + + PodName string + SvcName string +} + +// podName -> Pid + +func (sim *Simulator) constructSockets(cc []*ConnectionConfig) { + for _, c := range cc { + sim.tcpEstablish(c.Pid, c.Fd, c.Saddr, c.Daddr, c.Tx) + } +} + +type Traffic struct { + pod *FakePod + fd uint64 + svc *FakeService + rate *rate.Limiter + protocol string +} + +func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { + // connStartTx := pod.OpenConnections[fd] + + httpPayload := `GET /user HTTP1.1 + ` + + payload := [1024]uint8{} + for i, b := range []uint8(httpPayload) { + payload[i] = b + } + + log.Logger.Warn().Any("payload", payload) + + for { + // time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + select { + case <-ctx.Done(): + return + default: + if t.rate.Allow() { + sim.ebpfEvents <- &l7_req.L7Event{ + Fd: t.fd, + Pid: t.pod.Pid, + Status: 200, + Duration: 50, + Protocol: t.protocol, + Tls: false, + Method: "", + Payload: payload, + PayloadSize: uint32(len(httpPayload)), + PayloadReadComplete: true, + Failed: false, + WriteTimeNs: t.pod.OpenConnections[t.fd] + 10, + + // tracing purposes + Tid: 0, + Seq: 0, + EventReadTime: 0, + } + } + } + } + +} + +// saddr is matched with podIP +// {pid,fd} duo is used to socketLine struct +// socketInfo corresponding to requests timestamp is retrieved +func (sim *Simulator) tcpEstablish(srcPid uint32, fd uint64, saddr string, daddr string, tx uint64) { + sim.ebpfEvents <- &tcp_state.TcpConnectEvent{ + Fd: fd, + Timestamp: tx, + Type_: tcp_state.EVENT_TCP_ESTABLISHED, + Pid: srcPid, + SPort: 0, + DPort: 0, + SAddr: saddr, + DAddr: daddr, + } } func (s *Simulator) getDataStore() datastore.DataStore { @@ -226,15 +551,16 @@ func (s *Simulator) getDataStore() datastore.DataStore { type MockDataStore struct { // TODO: mimic backend speed and timeouts + ReqCount atomic.Uint32 } func (m *MockDataStore) PersistPod(pod datastore.Pod, eventType string) error { - log.Logger.Info().Str("pod", pod.Name).Msg("PersistPod") + log.Logger.Debug().Str("pod", pod.Name).Msg("PersistPod") return nil } func (m *MockDataStore) PersistService(service datastore.Service, eventType string) error { - log.Logger.Info().Str("service", service.Name).Msg("PersistService") + log.Logger.Debug().Str("service", service.Name).Msg("PersistService") return nil } @@ -264,11 +590,38 @@ func (m *MockDataStore) PersistDaemonSet(ds datastore.DaemonSet, eventType strin } func (m *MockDataStore) PersistRequest(request *datastore.Request) error { - log.Logger.Info().Bool("isTls", request.Tls).Str("path", request.Path).Msg("PersistRequest") + // TODO: mimic latency + m.ReqCount.Add(1) return nil } func (m *MockDataStore) PersistTraceEvent(trace *l7_req.TraceEvent) error { + // TODO: mimic latency log.Logger.Info().Msg("PersistTraceEvent") return nil } + +type Magnitude struct { + Magnitude uint32 + Symbol string +} + +func (m *Magnitude) ToText(number uint32) string { + return fmt.Sprintf("%.1f%s", float64(number)/float64(m.Magnitude), m.Symbol) +} + +func ToText(number uint32) string { + list := []Magnitude{ + // Magnitude{1000000000000, "T"}, + Magnitude{1000000000, "B"}, + Magnitude{1000000, "M"}, + Magnitude{1000, "K"}, + } + for _, m := range list { + if m.Magnitude < uint32(number) { + return m.ToText(uint32(number)) + } + } + return fmt.Sprintf("%d", number) + +} From 5574d8ab2b54187a4812bda79ad23206a907320a Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Thu, 1 Feb 2024 08:20:01 +0000 Subject: [PATCH 05/16] read simulation configs from file --- go.mod | 5 +- go.sum | 1 + main.go | 4 +- main_benchmark_test.go | 274 ++++++++++++++++++++++------------------ testconfig/config1.json | 15 +++ 5 files changed, 176 insertions(+), 123 deletions(-) create mode 100644 testconfig/config1.json diff --git a/go.mod b/go.mod index f655d98..86b383c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/prometheus/common v0.44.0 github.com/prometheus/node_exporter v1.6.1 github.com/rs/zerolog v1.29.1 + github.com/stretchr/testify v1.8.4 golang.org/x/arch v0.5.0 golang.org/x/mod v0.12.0 inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a @@ -21,6 +22,8 @@ require ( ) +require github.com/pmezard/go-difflib v1.0.0 // indirect + require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/beevik/ntp v0.3.0 // indirect @@ -74,7 +77,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus-community/go-runit v0.1.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/procfs v0.11.0 // indirect + github.com/prometheus/procfs v0.11.0 github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/safchain/ethtool v0.3.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 4b5adde..9a6486f 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= diff --git a/main.go b/main.go index 5939654..16ba471 100644 --- a/main.go +++ b/main.go @@ -51,11 +51,11 @@ func main() { metricsEnabled, _ := strconv.ParseBool(os.Getenv("METRICS_ENABLED")) // datastore backend - dsBackend := datastore.NewBackendDS(ctx, config.BackendConfig{ + dsBackend := datastore.NewBackendDS(ctx, config.BackendDSConfig{ Host: os.Getenv("BACKEND_HOST"), - Port: os.Getenv("BACKEND_PORT"), MetricsExport: metricsEnabled, MetricsExportInterval: 10, + ReqBufferSize: 40000, // TODO: get from a conf file }) go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, k8sVersion) diff --git a/main_benchmark_test.go b/main_benchmark_test.go index 25a51cd..6d300ae 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -2,9 +2,13 @@ package main import ( "context" + "encoding/json" "fmt" + "io" "math/rand" "net/http" + "net/http/httptest" + "os" "runtime/metrics" "sync" @@ -14,11 +18,12 @@ import ( "github.com/cilium/fake" "github.com/ddosify/alaz/aggregator" + "github.com/ddosify/alaz/config" "github.com/ddosify/alaz/datastore" "github.com/ddosify/alaz/ebpf/l7_req" "github.com/ddosify/alaz/ebpf/tcp_state" "github.com/ddosify/alaz/k8s" - "github.com/ddosify/alaz/log" + "github.com/rs/zerolog" "golang.org/x/time/rate" "github.com/prometheus/procfs" @@ -28,21 +33,48 @@ import ( "github.com/stretchr/testify/assert" ) -var testDuration = 10 // in seconds -var memProfInterval = 5 // in seconds -var podCount = 100 -var serviceCount = 50 -var edgeCount = 20 -var edgeRate = 50000 // events per second on a single edge +type SimulatorConfig struct { + // number of processes + // pod and services + // k8s IPs must match with tcp and l7 events produced + // tcp and l7 events rates + // http, http2, grpc, postgres, rabbitmq calls + // outbound calls -var kubeEventsBufferSize = 1000 -var ebpfEventsBufferSize = 8000 -var ebpfProcEventsBufferSize = 100 -var tlsAttachQueueBufferSize = 10 + // edgeCount * edgeRate should be smaller than ebpfEventsBufferSize + + TestDuration int `json:"testDuration"` + MemProfInterval int `json:"memProfInterval"` + PodCount int `json:"podCount"` + ServiceCount int `json:"serviceCount"` + EdgeCount int `json:"edgeCount"` + EdgeRate int `json:"edgeRate"` + KubeEventsBufferSize int `json:"kubeEventsBufferSize"` + EbpfEventsBufferSize int `json:"ebpfEventsBufferSize"` + EbpfProcEventsBufferSize int `json:"ebpfProcEventsBufferSize"` + TlsAttachQueueBufferSize int `json:"tlsAttachQueueBufferSize"` + DsReqBufferSize int `json:"dsReqBufferSize"` +} -// expected total request count = testDuration * edgeCount * edgeRate -// 60 * 10 * 1000 = 600000 ≈ 600k -// 60 * 10 * 5000 = 3000000 ≈ 3m +func readSimulationConfig(path string) (*SimulatorConfig, error) { + var conf SimulatorConfig + f, err := os.Open(path) + if err != nil { + return nil, err + } + bytes, err := io.ReadAll(f) + if err != nil { + return nil, err + } + err = json.Unmarshal(bytes, &conf) + if err != nil { + return nil, err + } + + return &conf, nil +} + +var simLog zerolog.Logger func TestSimulation(t *testing.T) { // TODO: read simulation config from a file @@ -51,7 +83,7 @@ func TestSimulation(t *testing.T) { // we need to get it periodically with top output too // memProfFile, err := os.Create("memprof.out") // if err != nil { - // log.Logger.Fatal().Err(err).Msg("could not create memory profile") + // simLog.Fatal().Err(err).Msg("could not create memory profile") // } // defer memProfFile.Close() // error handling omitted for example // defer func() { @@ -61,42 +93,55 @@ func TestSimulation(t *testing.T) { // // pprof.Lookup("heap").WriteTo(memProfFile, 0) // }() - log.Logger.Info().Msg("simulation starts...") - conf := &SimulatorConfig{ - // TODO: get these from a config file - kubeEventsBufferSize: kubeEventsBufferSize, - ebpfEventsBufferSize: ebpfEventsBufferSize, - ebpfProcEventsBufferSize: ebpfProcEventsBufferSize, - tlsAttachQueueBufferSize: tlsAttachQueueBufferSize, + simLog = zerolog.New(os.Stdout).With().Timestamp().Logger() + + conf, err := readSimulationConfig("testconfig/config1.json") + if err != nil { + simLog.Fatal().Err(err).Msg("could not read simulation config") } + simLog.Info().Msg("simulation starts...") + ctx, cancel := context.WithCancel(context.Background()) - sim := CreateSimulator(conf) - var totalReqProcessed uint32 - go func() { - totalReqProcessed = sim.start(ctx, conf) - }() + sim := CreateSimulator(ctx, conf) - go func() { - t := time.NewTicker(time.Duration(memProfInterval) * time.Second) + go func(ctx context.Context) { + t := time.NewTicker(time.Duration(conf.MemProfInterval) * time.Second) for range t.C { - PrintMemUsage() + select { + case <-ctx.Done(): + return + default: + PrintMemUsage() + } } + }(ctx) + + go func() { + <-time.After(time.Duration(conf.TestDuration) * time.Second) // test duration + cancel() + simLog.Info().Msg("context canceled") }() - <-time.After(time.Duration(testDuration) * time.Second) // test duration - cancel() - <-sim.simDone // wait for simulator to stop + sim.start(ctx, conf) + + totalReqReadyToBeSent := sim.getDataStore().(*MockDataStore).ReadyToBeSendReq.Load() + putIntoBackendQueue := sim.getDataStore().(*MockDataStore).SendToBackendQueueReq.Load() + + simLog.Info().Str("totalReqReadyToBeSent", ToText(totalReqReadyToBeSent)).Msg("totalReqReadyToBeSent") + simLog.Info().Str("putIntoBackendQueue", ToText(putIntoBackendQueue)).Msg("putIntoBackendQueue") - time.Sleep(1 * time.Second) // wait for totalReqProcessed to be set - expectedTotalReqProcessed := uint32(testDuration * edgeCount * edgeRate) + expectedTotalReqProcessed := uint32(conf.TestDuration * conf.EdgeCount * conf.EdgeRate) errorMargin := 10 + simLog.Info().Str("expectedTotalReqProcessed", ToText(expectedTotalReqProcessed)).Msg("expectedTotalReqProcessed") + l := expectedTotalReqProcessed * uint32(100-errorMargin) / 100 - assert.GreaterOrEqual(t, totalReqProcessed, l, "actual request count is less than expected") + assert.GreaterOrEqual(t, totalReqReadyToBeSent, l, "actual request count is less than expected") + assert.GreaterOrEqual(t, putIntoBackendQueue, l, "actual request count is less than expected") - <-time.After(time.Duration(memProfInterval) * time.Second) // time interval for retrival of mem usage after simulation stops + // <-time.After(time.Duration(2*conf.MemProfInterval) * time.Second) // time interval for retrival of mem usage after simulation stops } var memMetrics = []metrics.Sample{ @@ -172,20 +217,20 @@ func PrintMemUsage() { fmt.Printf("\tTotal gc cycles: %v", (memMetrics[3].Value.Uint64())) // fmt.Printf("\tGOGC percent: %v", (memMetrics[4].Value.Uint64())) // fmt.Printf("\tGOMEMLIMIT: %v\n", bToMb(memMetrics[5].Value.Uint64())) - // fmt.Printf("\tHeapFree: %v", HeapFree) - // fmt.Printf("\tHeapReleased: %v", bToMb(memMetrics[7].Value.Uint64())) - // fmt.Printf("\tHeapUnused: %v", HeapUnused) + fmt.Printf("\tHeapFree: %v", HeapFree) + fmt.Printf("\tHeapReleased: %v", bToMb(memMetrics[7].Value.Uint64())) + fmt.Printf("\tHeapUnused: %v", HeapUnused) // fmt.Printf("\tTotal: %v", bToMb(memMetrics[9].Value.Uint64())) - // fmt.Printf("\tStack: %v", Stack) + fmt.Printf("\tStack: %v", Stack) fmt.Printf("\tLiveGoroutines: %v", LiveGoroutines) proc, err := procfs.Self() if err != nil { - log.Logger.Fatal().Err(err) + simLog.Fatal().Err(err) } smapRollup, err := proc.ProcSMapsRollup() if err != nil { - log.Logger.Fatal().Err(err) + simLog.Fatal().Err(err) } // Anonymous pages of process that are mapped on RAM. Includes heap area. @@ -212,7 +257,7 @@ func bToMb(b uint64) uint64 { return b / 1024 / 1024 } -func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) uint32 { +func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) { // TODO: call this func from another test, and after some time send a sigkill // measure memory and cpu resources @@ -227,11 +272,7 @@ func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) uint32 { go http.ListenAndServe(":8181", nil) - <-sim.simDone // wait for simulation to stop to print metrics - log.Logger.Info().Msg("simulation finished") - totalReqProcessed := sim.getDataStore().(*MockDataStore).ReqCount.Load() - log.Logger.Info().Uint32("totalReq", totalReqProcessed).Str("tReq", ToText(totalReqProcessed)).Msg("totalReqCount") - return totalReqProcessed + <-sim.simDone // wait for simulation to stop generating traffic to return metrics } type Simulator struct { @@ -247,32 +288,21 @@ type Simulator struct { services map[string]*FakeService simDone chan struct{} -} -type SimulatorConfig struct { - // number of processes - // pod and services - // k8s IPs must match with tcp and l7 events produced - // tcp and l7 events rates - // http, http2, grpc, postgres, rabbitmq calls - // outbound calls - - kubeEventsBufferSize int - ebpfEventsBufferSize int - ebpfProcEventsBufferSize int - tlsAttachQueueBufferSize int + conf *SimulatorConfig } -func CreateSimulator(conf *SimulatorConfig) *Simulator { +func CreateSimulator(ctx context.Context, conf *SimulatorConfig) *Simulator { return &Simulator{ - kubeEvents: make(chan interface{}, conf.kubeEventsBufferSize), - ebpfEvents: make(chan interface{}, conf.ebpfEventsBufferSize), - ebpfProcEvents: make(chan interface{}, conf.ebpfProcEventsBufferSize), - tlsAttachQueue: make(chan uint32, conf.tlsAttachQueueBufferSize), - mockDs: &MockDataStore{}, + kubeEvents: make(chan interface{}, conf.KubeEventsBufferSize), + ebpfEvents: make(chan interface{}, conf.EbpfEventsBufferSize), + ebpfProcEvents: make(chan interface{}, conf.EbpfProcEventsBufferSize), + tlsAttachQueue: make(chan uint32, conf.TlsAttachQueueBufferSize), + mockDs: NewMockDataStore(ctx, conf), pods: map[string]*FakePod{}, services: map[string]*FakeService{}, simDone: make(chan struct{}), + conf: conf, } } @@ -314,7 +344,7 @@ func (s *Simulator) Setup() { // Create Kubernetes Workloads // K8sResourceMessage - for i := 0; i < podCount; i++ { + for i := 0; i < s.conf.PodCount; i++ { // TODO: namespace podName := fake.Name() podIP := fake.IP(fake.WithIPv4()) @@ -340,7 +370,7 @@ func (s *Simulator) Setup() { // create services // then create traffic between pods and services - for i := 0; i < serviceCount; i++ { + for i := 0; i < s.conf.ServiceCount; i++ { // TODO: namespace svcName := fake.Name() svcIP := fake.IP(fake.WithIPv4()) @@ -402,7 +432,7 @@ func (sim *Simulator) Simulate(ctx context.Context) { svcKeys = append(svcKeys, n) } - ec := edgeCount + ec := sim.conf.EdgeCount // retryLimit changed to 1 on aggregator // processL7 exiting, stop retrying... // retry blocks workers @@ -446,14 +476,16 @@ func (sim *Simulator) Simulate(ctx context.Context) { pod: pod, fd: fd, svc: svc, - rate: rate.NewLimiter(rate.Limit(edgeRate), edgeRate), // 1000 events per second + rate: rate.NewLimiter(rate.Limit(sim.conf.EdgeRate), sim.conf.EdgeRate), // 1000 events per second protocol: l7_req.L7_PROTOCOL_HTTP, }) wg.Done() }(wg) } + simLog.Warn().Msg("waiting for traffic to stop") wg.Wait() + simLog.Warn().Msg("closing simDone chan") close(sim.simDone) } @@ -485,18 +517,15 @@ type Traffic struct { } func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { - // connStartTx := pod.OpenConnections[fd] - - httpPayload := `GET /user HTTP1.1 - ` - + httpPayload := `GET /user HTTP1.1` payload := [1024]uint8{} for i, b := range []uint8(httpPayload) { payload[i] = b } - log.Logger.Warn().Any("payload", payload) + simLog.Warn().Any("payload", payload) + blockingLogged := false for { // time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) select { @@ -504,7 +533,15 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { return default: if t.rate.Allow() { - sim.ebpfEvents <- &l7_req.L7Event{ + // In ebpf.Program's Consume methods, in order to prevent drops on + // ebpf maps, we send collected data using new goroutines + // otherwise in case of blocking on internal ebpfEvents channel + // ebpf map are likely to drop events + // go func() { + // TODO:! when a new goroutine spawned for each event stack rocketed + + select { + case sim.ebpfEvents <- &l7_req.L7Event{ Fd: t.fd, Pid: t.pod.Pid, Status: 200, @@ -522,7 +559,15 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { Tid: 0, Seq: 0, EventReadTime: 0, + }: + default: + if !blockingLogged { + simLog.Warn().Msg("block on ebpfEvents chan") + blockingLogged = true + } } + // }() + } } } @@ -549,55 +594,44 @@ func (s *Simulator) getDataStore() datastore.DataStore { return s.mockDs } -type MockDataStore struct { - // TODO: mimic backend speed and timeouts - ReqCount atomic.Uint32 -} - -func (m *MockDataStore) PersistPod(pod datastore.Pod, eventType string) error { - log.Logger.Debug().Str("pod", pod.Name).Msg("PersistPod") - return nil -} - -func (m *MockDataStore) PersistService(service datastore.Service, eventType string) error { - log.Logger.Debug().Str("service", service.Name).Msg("PersistService") - return nil -} - -func (m *MockDataStore) PersistReplicaSet(rs datastore.ReplicaSet, eventType string) error { - log.Logger.Info().Str("replicaset", rs.Name).Msg("PersistReplicaSet") - return nil -} +func NewMockDataStore(ctx context.Context, conf *SimulatorConfig) *MockDataStore { + mockBackendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + simLog.Debug().Str("path", r.URL.Path).Msg("") + // time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) + fmt.Fprintf(w, "success") + })) -func (m *MockDataStore) PersistDeployment(d datastore.Deployment, eventType string) error { - log.Logger.Info().Str("deployment", d.Name).Msg("PersistDeployment") - return nil -} + backendDs := datastore.NewBackendDS(ctx, config.BackendDSConfig{ + Host: mockBackendServer.URL, + MetricsExport: false, + MetricsExportInterval: 10, + ReqBufferSize: conf.DsReqBufferSize, + }) + return &MockDataStore{ + BackendDS: backendDs, + BackendServer: mockBackendServer, + ReadyToBeSendReq: atomic.Uint32{}, + SendToBackendQueueReq: atomic.Uint32{}, + } -func (m *MockDataStore) PersistEndpoints(e datastore.Endpoints, eventType string) error { - log.Logger.Info().Str("endpoints", e.Name).Msg("PersistEndpoints") - return nil } -func (m *MockDataStore) PersistContainer(c datastore.Container, eventType string) error { - log.Logger.Info().Str("container", c.Name).Msg("PersistContainer") - return nil -} +type MockDataStore struct { + // Wrapper for BackendDS + // mock backend endpoints with httptest.Server + *datastore.BackendDS + BackendServer *httptest.Server -func (m *MockDataStore) PersistDaemonSet(ds datastore.DaemonSet, eventType string) error { - log.Logger.Info().Str("daemonset", ds.Name).Msg("PersistDaemonSet") - return nil + // difference between these two metrics can indicate + // small buffer on backendDS or slow responding backend + ReadyToBeSendReq atomic.Uint32 + SendToBackendQueueReq atomic.Uint32 } func (m *MockDataStore) PersistRequest(request *datastore.Request) error { - // TODO: mimic latency - m.ReqCount.Add(1) - return nil -} - -func (m *MockDataStore) PersistTraceEvent(trace *l7_req.TraceEvent) error { - // TODO: mimic latency - log.Logger.Info().Msg("PersistTraceEvent") + m.ReadyToBeSendReq.Add(1) + // m.BackendDS.PersistRequest(request) // depends on dsReqBufferSize, batchSize, batchInterval, backend latency + m.SendToBackendQueueReq.Add(1) return nil } diff --git a/testconfig/config1.json b/testconfig/config1.json new file mode 100644 index 0000000..a49579d --- /dev/null +++ b/testconfig/config1.json @@ -0,0 +1,15 @@ +{ + "testDuration" : 30, + "memProfInterval" : 5, + "podCount": 100, + "serviceCount" : 50, + "edgeCount" : 20, + "edgeRate" : 10000, + "kubeEventsBufferSize" : 1000, + "ebpfEventsBufferSize": 200000, + "ebpfProcEventsBufferSize" : 100, + "tlsAttachQueueBufferSize" : 10, + "dsReqBufferSize" : 150000, + "mockBackendMinLatency": 5, + "mockBackendMaxLatency": 20 +} \ No newline at end of file From d6a70874e955bf8e8be8cb995b786682dd1ed3c4 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Thu, 1 Feb 2024 08:22:14 +0000 Subject: [PATCH 06/16] update backendDS config --- config/db.go | 5 +++-- datastore/backend.go | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config/db.go b/config/db.go index bcd4a83..3e5fe39 100644 --- a/config/db.go +++ b/config/db.go @@ -8,9 +8,10 @@ type PostgresConfig struct { DBName string } -type BackendConfig struct { +type BackendDSConfig struct { Host string - Port string MetricsExport bool MetricsExportInterval int // in seconds + + ReqBufferSize int } diff --git a/datastore/backend.go b/datastore/backend.go index 0b00592..ed793be 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -186,7 +186,7 @@ func (ll LeveledLogger) Warn(msg string, keysAndValues ...interface{}) { ll.l.Warn().Fields(keysAndValues).Msg(msg) } -func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *BackendDS { +func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *BackendDS { ctx, _ := context.WithCancel(parentCtx) rand.Seed(time.Now().UnixNano()) @@ -270,12 +270,11 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend ds := &BackendDS{ ctx: ctx, host: conf.Host, - port: conf.Port, c: client, batchSize: bs, reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}), traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), - reqChanBuffer: make(chan *ReqInfo, 40000), + reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize), podEventChan: make(chan interface{}, 100), svcEventChan: make(chan interface{}, 100), rsEventChan: make(chan interface{}, 100), From e81fbd76c4f488d092d32456073f661126b2dd2e Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 11:38:04 +0300 Subject: [PATCH 07/16] update go version to 1.20 --- go.mod | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 86b383c..ccf7bdb 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ddosify/alaz -go 1.18 +go 1.20 require ( github.com/alecthomas/kingpin/v2 v2.3.2 @@ -19,7 +19,6 @@ require ( k8s.io/api v0.27.2 k8s.io/apimachinery v0.27.2 k8s.io/client-go v0.27.2 - ) require github.com/pmezard/go-difflib v1.0.0 // indirect From 38095902a49d5bed3db3a0286b47bbb76ed8ee89 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 11:38:48 +0300 Subject: [PATCH 08/16] capture mem and goroutine profile --- main_benchmark_test.go | 43 +++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/main_benchmark_test.go b/main_benchmark_test.go index 6d300ae..8dbea90 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "os" "runtime/metrics" + "runtime/pprof" "sync" "sync/atomic" @@ -77,22 +78,6 @@ func readSimulationConfig(path string) (*SimulatorConfig, error) { var simLog zerolog.Logger func TestSimulation(t *testing.T) { - // TODO: read simulation config from a file - - // TODO: this code gets mem profile at exit - // we need to get it periodically with top output too - // memProfFile, err := os.Create("memprof.out") - // if err != nil { - // simLog.Fatal().Err(err).Msg("could not create memory profile") - // } - // defer memProfFile.Close() // error handling omitted for example - // defer func() { - // pprof.Lookup("allocs").WriteTo(memProfFile, 0) - // // if you want to check live heap objects: - // // runtime.GC() // get up-to-date statistics - // // pprof.Lookup("heap").WriteTo(memProfFile, 0) - // }() - simLog = zerolog.New(os.Stdout).With().Timestamp().Logger() conf, err := readSimulationConfig("testconfig/config1.json") @@ -108,13 +93,15 @@ func TestSimulation(t *testing.T) { go func(ctx context.Context) { t := time.NewTicker(time.Duration(conf.MemProfInterval) * time.Second) + i := 0 for range t.C { select { case <-ctx.Done(): return default: - PrintMemUsage() + CaptureMemUsage(i) } + i++ } }(ctx) @@ -198,11 +185,28 @@ var memMetrics = []metrics.Sample{ // Memory that has been reserved for heap objects but unused. (/memory/classes/heap/unused:bytes) // LazyFree pages that are returned to OS with madvise syscall but not yet reclaimed by OS. -func PrintMemUsage() { - +func CaptureMemUsage(order int) { // Memory statistics are recorded after a GC run. // Trigger GC to have latest state of heap. // runtime.GC() // triggered each time PrintMemUsage called, preventing us observing the normal GC behaviour. + + go func() { + memProfFile, err := os.Create(fmt.Sprintf("memprof_%d.prof", order)) + if err != nil { + simLog.Fatal().Err(err).Msg("could not create memory profile") + } + defer memProfFile.Close() // error handling omitted for example + pprof.WriteHeapProfile(memProfFile) + // pprof.Lookup("allocs").WriteTo(memProfFile, 0) + // pprof.Lookup("heap").WriteTo(memProfFile, 0) + + goroutineProfFile, err := os.Create(fmt.Sprintf("goroutineprof_%d.prof", order)) + if err != nil { + simLog.Fatal().Err(err).Msg("could not create goroutine profile") + } + defer goroutineProfFile.Close() // error handling omitted for example + pprof.Lookup("goroutine").WriteTo(goroutineProfFile, 0) + }() metrics.Read(memMetrics) HeapInUse := bToMb(memMetrics[1].Value.Uint64()) @@ -211,6 +215,7 @@ func PrintMemUsage() { Stack := bToMb(memMetrics[10].Value.Uint64()) LiveGoroutines := memMetrics[11].Value.Uint64() + fmt.Printf("Stat%d : ", order) fmt.Printf("Total bytes allocated: %v", bToMb(memMetrics[0].Value.Uint64())) fmt.Printf("\tIn-use bytes: %v", HeapInUse) // fmt.Printf("\tAutomatic gc cycles: %v", (memMetrics[2].Value.Uint64())) From 1bb4b3f3446da8482fecf46dca76b42791ce7bd8 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 11:39:12 +0300 Subject: [PATCH 09/16] add test mode flag --- datastore/backend.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datastore/backend.go b/datastore/backend.go index ed793be..8152cf9 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -44,6 +44,12 @@ var kernelVersion string var cloudProvider CloudProvider func init() { + + TestMode := os.Getenv("TEST_MODE") + if TestMode == "true" { + return + } + MonitoringID = os.Getenv("MONITORING_ID") if MonitoringID == "" { log.Logger.Fatal().Msg("MONITORING_ID is not set") From c23fe6d07abcca2909fc9d058e6e1ff5274b9204 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 14:35:05 +0300 Subject: [PATCH 10/16] separate tcp and l7 workers --- aggregator/data.go | 36 ++++++++++++++++++++++++++++-------- ebpf/collector.go | 10 +++++++++- main.go | 2 +- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index b7e001e..7008e3b 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -44,8 +44,9 @@ type Aggregator struct { // listen to events from different sources k8sChan <-chan interface{} - ebpfChan <-chan interface{} + ebpfChan chan interface{} ebpfProcChan <-chan interface{} + ebpfTcpChan <-chan interface{} tlsAttachSignalChan chan uint32 ec *ebpf.EbpfCollector @@ -127,8 +128,8 @@ type ClusterInfo struct { var ( // default exponential backoff (*2) // when retryLimit is increased, we are blocking the events that we wait it to be processed more - retryInterval = 400 * time.Millisecond - retryLimit = 5 + retryInterval = 50 * time.Millisecond + retryLimit = 2 // 400 + 800 + 1600 + 3200 + 6400 = 12400 ms defaultExpiration = 5 * time.Minute @@ -158,6 +159,7 @@ func containsSQLKeywords(input string) bool { func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, events chan interface{}, procEvents chan interface{}, + tcpEvents chan interface{}, tlsAttachSignalChan chan uint32, ds datastore.DataStore) *Aggregator { ctx, _ := context.WithCancel(parentCtx) @@ -172,6 +174,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, k8sChan: k8sChan, ebpfChan: events, ebpfProcChan: procEvents, + ebpfTcpChan: tcpEvents, clusterInfo: clusterInfo, ds: ds, tlsAttachSignalChan: tlsAttachSignalChan, @@ -267,12 +270,13 @@ func (a *Aggregator) Run() { } for i := 0; i < numWorker; i++ { go a.processEbpf(a.ctx) - go a.processEbpfProc(a.ctx) + go a.processEbpfTcp(a.ctx) } // TODO: pod number may be ideal for i := 0; i < 2*cpuCount; i++ { go a.processHttp2Frames() + go a.processEbpfProc(a.ctx) } } @@ -304,7 +308,6 @@ func (a *Aggregator) processEbpfProc(ctx context.Context) { for data := range a.ebpfProcChan { select { case <-ctx.Done(): - log.Logger.Info().Msg("processEbpf exiting...") return default: bpfEvent, ok := data.(ebpf.BpfEvent) @@ -325,11 +328,10 @@ func (a *Aggregator) processEbpfProc(ctx context.Context) { } } -func (a *Aggregator) processEbpf(ctx context.Context) { - for data := range a.ebpfChan { +func (a *Aggregator) processEbpfTcp(ctx context.Context) { + for data := range a.ebpfTcpChan { select { case <-ctx.Done(): - log.Logger.Info().Msg("processEbpf exiting...") return default: bpfEvent, ok := data.(ebpf.BpfEvent) @@ -341,6 +343,24 @@ func (a *Aggregator) processEbpf(ctx context.Context) { case tcp_state.TCP_CONNECT_EVENT: d := data.(*tcp_state.TcpConnectEvent) // copy data's value a.processTcpConnect(d) + } + } + + } +} + +func (a *Aggregator) processEbpf(ctx context.Context) { + for data := range a.ebpfChan { + select { + case <-ctx.Done(): + return + default: + bpfEvent, ok := data.(ebpf.BpfEvent) + if !ok { + log.Logger.Error().Interface("ebpfData", data).Msg("error casting ebpf event") + continue + } + switch bpfEvent.Type() { case l7_req.L7_EVENT: d := data.(*l7_req.L7Event) // copy data's value a.processL7(ctx, d) diff --git a/ebpf/collector.go b/ebpf/collector.go index 7264157..627cbd6 100644 --- a/ebpf/collector.go +++ b/ebpf/collector.go @@ -39,6 +39,8 @@ type EbpfCollector struct { done chan struct{} ebpfEvents chan interface{} ebpfProcEvents chan interface{} + ebpfTcpEvents chan interface{} + tlsAttachQueue chan uint32 bpfPrograms map[string]Program @@ -71,6 +73,7 @@ func NewEbpfCollector(parentCtx context.Context) *EbpfCollector { done: make(chan struct{}), ebpfEvents: make(chan interface{}, 100000), // interface is 16 bytes, 16 * 100000 = 8 Megabytes ebpfProcEvents: make(chan interface{}, 100), + ebpfTcpEvents: make(chan interface{}, 1000), tlsPidMap: make(map[uint32]struct{}), sslWriteUprobes: make(map[uint32]link.Link), sslReadEnterUprobes: make(map[uint32]link.Link), @@ -95,6 +98,10 @@ func (e *EbpfCollector) EbpfProcEvents() chan interface{} { return e.ebpfProcEvents } +func (e *EbpfCollector) EbpfTcpEvents() chan interface{} { + return e.ebpfTcpEvents +} + func (e *EbpfCollector) TlsAttachQueue() chan uint32 { return e.tlsAttachQueue } @@ -114,7 +121,7 @@ func (e *EbpfCollector) Init() { } func (e *EbpfCollector) ListenEvents() { - go e.bpfPrograms["tcp_state_prog"].Consume(e.ctx, e.ebpfEvents) + go e.bpfPrograms["tcp_state_prog"].Consume(e.ctx, e.ebpfTcpEvents) go e.bpfPrograms["l7_prog"].Consume(e.ctx, e.ebpfEvents) go e.bpfPrograms["proc_prog"].Consume(e.ctx, e.ebpfProcEvents) @@ -130,6 +137,7 @@ func (e *EbpfCollector) close() { close(e.ebpfEvents) close(e.ebpfProcEvents) + close(e.ebpfTcpEvents) e.probesMu.Lock() defer e.probesMu.Unlock() diff --git a/main.go b/main.go index 16ba471..5e97ab0 100644 --- a/main.go +++ b/main.go @@ -66,7 +66,7 @@ func main() { ec.Init() go ec.ListenEvents() - a := aggregator.NewAggregator(ctx, kubeEvents, ec.EbpfEvents(), ec.EbpfProcEvents(), ec.TlsAttachQueue(), dsBackend) + a := aggregator.NewAggregator(ctx, kubeEvents, ec.EbpfEvents(), ec.EbpfProcEvents(), ec.EbpfTcpEvents(), ec.TlsAttachQueue(), dsBackend) a.Run() } From 300f05112ca8ef350831bd87691b5fcfcefad3a0 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 16:18:17 +0300 Subject: [PATCH 11/16] shard pid map to reduce lock contention --- aggregator/data.go | 167 +++++++++++++++++++++++++++++------------- aggregator/persist.go | 24 +++--- 2 files changed, 130 insertions(+), 61 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index 7008e3b..31a80bd 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -105,13 +105,24 @@ type SocketMap struct { } type ClusterInfo struct { - mu sync.RWMutex + k8smu sync.RWMutex PodIPToPodUid map[string]types.UID `json:"podIPToPodUid"` ServiceIPToServiceUid map[string]types.UID `json:"serviceIPToServiceUid"` // Pid -> SocketMap // pid -> fd -> {saddr, sport, daddr, dport} - PidToSocketMap map[uint32]*SocketMap `json:"pidToSocketMap"` + + // shard pidToSocketMap by pid to reduce lock contention + mu0 sync.RWMutex + mu1 sync.RWMutex + mu2 sync.RWMutex + mu3 sync.RWMutex + mu4 sync.RWMutex + PidToSocketMap0 map[uint32]*SocketMap `json:"pidToSocketMap0"` // pid ending with 0-1 + PidToSocketMap1 map[uint32]*SocketMap `json:"pidToSocketMap1"` // pid ending with 2-3 + PidToSocketMap2 map[uint32]*SocketMap `json:"pidToSocketMap2"` // pid ending with 4-5 + PidToSocketMap3 map[uint32]*SocketMap `json:"pidToSocketMap3"` // pid ending with 6-7 + PidToSocketMap4 map[uint32]*SocketMap `json:"pidToSocketMap4"` // pid ending with 8-9 } // If we have information from the container runtimes @@ -166,7 +177,11 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, clusterInfo := &ClusterInfo{ PodIPToPodUid: map[string]types.UID{}, ServiceIPToServiceUid: map[string]types.UID{}, - PidToSocketMap: make(map[uint32]*SocketMap, 0), + PidToSocketMap0: make(map[uint32]*SocketMap), + PidToSocketMap1: make(map[uint32]*SocketMap), + PidToSocketMap2: make(map[uint32]*SocketMap), + PidToSocketMap3: make(map[uint32]*SocketMap), + PidToSocketMap4: make(map[uint32]*SocketMap), } a := &Aggregator{ @@ -239,10 +254,7 @@ func (a *Aggregator) Run() { if err != nil { // pid does not exist delete(a.liveProcesses, pid) - - a.clusterInfo.mu.Lock() - delete(a.clusterInfo.PidToSocketMap, pid) - a.clusterInfo.mu.Unlock() + a.removeFromClusterInfo(pid) a.h2ParserMu.Lock() for key, parser := range a.h2Parsers { @@ -402,9 +414,7 @@ func (a *Aggregator) processExit(pid uint32) { delete(a.liveProcesses, pid) a.liveProcessesMu.Unlock() - a.clusterInfo.mu.Lock() - delete(a.clusterInfo.PidToSocketMap, pid) - a.clusterInfo.mu.Unlock() + a.removeFromClusterInfo(pid) a.h2ParserMu.Lock() pid_s := fmt.Sprint(pid) @@ -439,30 +449,26 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { var sockMap *SocketMap var ok bool - a.clusterInfo.mu.RLock() // lock for reading - sockMap, ok = a.clusterInfo.PidToSocketMap[d.Pid] - a.clusterInfo.mu.RUnlock() // unlock for reading + mu, pidToSocketMap := a.getShard(d.Pid) + mu.Lock() + sockMap, ok = pidToSocketMap[d.Pid] if !ok { sockMap = &SocketMap{ M: make(map[uint64]*SocketLine), mu: sync.RWMutex{}, } - a.clusterInfo.mu.Lock() // lock for writing - a.clusterInfo.PidToSocketMap[d.Pid] = sockMap - a.clusterInfo.mu.Unlock() // unlock for writing + pidToSocketMap[d.Pid] = sockMap } + mu.Unlock() // unlock for writing var skLine *SocketLine - sockMap.mu.RLock() // lock for reading + sockMap.mu.Lock() // lock for reading skLine, ok = sockMap.M[d.Fd] - sockMap.mu.RUnlock() // unlock for reading if !ok { skLine = NewSocketLine(d.Pid, d.Fd) - sockMap.mu.Lock() // lock for writing sockMap.M[d.Fd] = skLine - sockMap.mu.Unlock() // unlock for writing } skLine.AddValue( @@ -476,35 +482,36 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { Dport: d.DPort, }, ) + sockMap.mu.Unlock() // unlock for writing + } else if d.Type_ == tcp_state.EVENT_TCP_CLOSED { var sockMap *SocketMap var ok bool - a.clusterInfo.mu.RLock() // lock for reading - sockMap, ok = a.clusterInfo.PidToSocketMap[d.Pid] - a.clusterInfo.mu.RUnlock() // unlock for reading - + mu, pidToSocketMap := a.getShard(d.Pid) + mu.Lock() + sockMap, ok = pidToSocketMap[d.Pid] if !ok { sockMap = &SocketMap{ M: make(map[uint64]*SocketLine), mu: sync.RWMutex{}, } - a.clusterInfo.mu.Lock() // lock for writing - a.clusterInfo.PidToSocketMap[d.Pid] = sockMap - a.clusterInfo.mu.Unlock() // unlock for writing + pidToSocketMap[d.Pid] = sockMap + mu.Unlock() // unlock for writing return } + mu.Unlock() var skLine *SocketLine - sockMap.mu.RLock() // lock for reading + sockMap.mu.Lock() // lock for reading skLine, ok = sockMap.M[d.Fd] - sockMap.mu.RUnlock() // unlock for reading - if !ok { + sockMap.mu.Unlock() // unlock for reading return } + sockMap.mu.Unlock() // unlock for reading // If connection is established before, add the close event skLine.AddValue( @@ -829,9 +836,9 @@ func (a *Aggregator) processHttp2Frames() { func (a *Aggregator) setFromTo(skInfo *SockInfo, d *l7_req.L7Event, reqDto *datastore.Request, hostHeader string) error { // find pod info - a.clusterInfo.mu.RLock() // lock for reading + a.clusterInfo.k8smu.RLock() // lock for reading podUid, ok := a.clusterInfo.PodIPToPodUid[skInfo.Saddr] - a.clusterInfo.mu.RUnlock() // unlock for reading + a.clusterInfo.k8smu.RUnlock() // unlock for reading if !ok { return fmt.Errorf("error finding pod with sockets saddr") } @@ -842,17 +849,17 @@ func (a *Aggregator) setFromTo(skInfo *SockInfo, d *l7_req.L7Event, reqDto *data reqDto.ToPort = skInfo.Dport // find service info - a.clusterInfo.mu.RLock() // lock for reading + a.clusterInfo.k8smu.RLock() // lock for reading svcUid, ok := a.clusterInfo.ServiceIPToServiceUid[skInfo.Daddr] - a.clusterInfo.mu.RUnlock() // unlock for reading + a.clusterInfo.k8smu.RUnlock() // unlock for reading if ok { reqDto.ToUID = string(svcUid) reqDto.ToType = "service" } else { - a.clusterInfo.mu.RLock() // lock for reading + a.clusterInfo.k8smu.RLock() // lock for reading podUid, ok := a.clusterInfo.PodIPToPodUid[skInfo.Daddr] - a.clusterInfo.mu.RUnlock() // unlock for reading + a.clusterInfo.k8smu.RUnlock() // unlock for reading if ok { reqDto.ToUID = string(podUid) @@ -1000,19 +1007,17 @@ func getHostnameFromIP(ipAddr string) (string, error) { } func (a *Aggregator) fetchSkLine(sockMap *SocketMap, pid uint32, fd uint64) *SocketLine { - sockMap.mu.RLock() // lock for reading + sockMap.mu.Lock() // lock for reading skLine, ok := sockMap.M[fd] - sockMap.mu.RUnlock() // unlock for reading if !ok { log.Logger.Debug().Uint32("pid", pid).Uint64("fd", fd).Msg("error finding skLine, go look for it") // start new socket line, find already established connections skLine = NewSocketLine(pid, fd) skLine.GetAlreadyExistingSockets() // find already established connections - sockMap.mu.Lock() // lock for writing sockMap.M[fd] = skLine - sockMap.mu.Unlock() // unlock for writing } + sockMap.mu.Unlock() // unlock for writing return skLine } @@ -1049,25 +1054,57 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_ return skInfo } +func (a *Aggregator) getShard(pid uint32) (*sync.RWMutex, map[uint32]*SocketMap) { + lastDigit := pid % 10 + var mu *sync.RWMutex + var pidToSocketMap map[uint32]*SocketMap + switch lastDigit { + case 0, 1: + mu = &a.clusterInfo.mu0 + pidToSocketMap = a.clusterInfo.PidToSocketMap0 + case 2, 3: + mu = &a.clusterInfo.mu1 + pidToSocketMap = a.clusterInfo.PidToSocketMap1 + case 4, 5: + mu = &a.clusterInfo.mu2 + pidToSocketMap = a.clusterInfo.PidToSocketMap2 + case 6, 7: + mu = &a.clusterInfo.mu3 + pidToSocketMap = a.clusterInfo.PidToSocketMap3 + case 8, 9: + mu = &a.clusterInfo.mu4 + pidToSocketMap = a.clusterInfo.PidToSocketMap4 + } + + return mu, pidToSocketMap +} + +func (a *Aggregator) removeFromClusterInfo(pid uint32) { + mu, pidToSocketMap := a.getShard(pid) + mu.Lock() + delete(pidToSocketMap, pid) + mu.Unlock() +} + func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap { var sockMap *SocketMap var ok bool - a.clusterInfo.mu.RLock() // lock for reading - sockMap, ok = a.clusterInfo.PidToSocketMap[pid] - a.clusterInfo.mu.RUnlock() // unlock for reading + mu, pidToSocketMap := a.getShard(pid) // create shard if not exists + mu.Lock() // lock for reading + sockMap, ok = pidToSocketMap[pid] if !ok { // initialize socket map sockMap = &SocketMap{ M: make(map[uint64]*SocketLine), mu: sync.RWMutex{}, } - a.clusterInfo.mu.Lock() // lock for writing - a.clusterInfo.PidToSocketMap[pid] = sockMap - a.clusterInfo.mu.Unlock() // unlock for writing + pidToSocketMap[pid] = sockMap go a.signalTlsAttachment(pid) } + mu.Unlock() // unlock for writing + return sockMap } @@ -1124,12 +1161,44 @@ func (a *Aggregator) clearSocketLines(ctx context.Context) { }() for range ticker.C { - a.clusterInfo.mu.RLock() - for _, socketMap := range a.clusterInfo.PidToSocketMap { + a.clusterInfo.mu0.RLock() + for _, socketMap := range a.clusterInfo.PidToSocketMap0 { + for _, socketLine := range socketMap.M { + skLineCh <- socketLine + } + } + a.clusterInfo.mu0.RUnlock() + + a.clusterInfo.mu1.RLock() + for _, socketMap := range a.clusterInfo.PidToSocketMap1 { + for _, socketLine := range socketMap.M { + skLineCh <- socketLine + } + } + a.clusterInfo.mu1.RUnlock() + + a.clusterInfo.mu2.RLock() + for _, socketMap := range a.clusterInfo.PidToSocketMap2 { + for _, socketLine := range socketMap.M { + skLineCh <- socketLine + } + } + a.clusterInfo.mu2.RUnlock() + + a.clusterInfo.mu3.RLock() + for _, socketMap := range a.clusterInfo.PidToSocketMap3 { + for _, socketLine := range socketMap.M { + skLineCh <- socketLine + } + } + a.clusterInfo.mu3.RUnlock() + + a.clusterInfo.mu4.RLock() + for _, socketMap := range a.clusterInfo.PidToSocketMap4 { for _, socketLine := range socketMap.M { skLineCh <- socketLine } } - a.clusterInfo.mu.RUnlock() + a.clusterInfo.mu4.RUnlock() } } diff --git a/aggregator/persist.go b/aggregator/persist.go index 06803e9..0bca01a 100644 --- a/aggregator/persist.go +++ b/aggregator/persist.go @@ -54,19 +54,19 @@ func (a *Aggregator) processPod(d k8s.K8sResourceMessage) { switch d.EventType { case k8s.ADD: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() a.clusterInfo.PodIPToPodUid[pod.Status.PodIP] = pod.UID - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistPod(dtoPod, ADD) case k8s.UPDATE: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() a.clusterInfo.PodIPToPodUid[pod.Status.PodIP] = pod.UID - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistPod(dtoPod, UPDATE) case k8s.DELETE: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() delete(a.clusterInfo.PodIPToPodUid, pod.Status.PodIP) - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistPod(dtoPod, DELETE) } } @@ -110,19 +110,19 @@ func (a *Aggregator) processSvc(d k8s.K8sResourceMessage) { switch d.EventType { case k8s.ADD: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() a.clusterInfo.ServiceIPToServiceUid[service.Spec.ClusterIP] = service.UID - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistSvc(dtoSvc, ADD) case k8s.UPDATE: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() a.clusterInfo.ServiceIPToServiceUid[service.Spec.ClusterIP] = service.UID - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistSvc(dtoSvc, UPDATE) case k8s.DELETE: - a.clusterInfo.mu.Lock() + a.clusterInfo.k8smu.Lock() delete(a.clusterInfo.ServiceIPToServiceUid, service.Spec.ClusterIP) - a.clusterInfo.mu.Unlock() + a.clusterInfo.k8smu.Unlock() go a.persistSvc(dtoSvc, DELETE) } } From 885b53bf4f35749df96da580bec22d43484cc624 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 17:06:35 +0300 Subject: [PATCH 12/16] delete unmatched sockets --- aggregator/sock_num_line.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aggregator/sock_num_line.go b/aggregator/sock_num_line.go index 18d6794..078c04d 100644 --- a/aggregator/sock_num_line.go +++ b/aggregator/sock_num_line.go @@ -40,7 +40,6 @@ func NewSocketLine(pid uint32, fd uint64) *SocketLine { fd: fd, Values: make([]TimestampedSocket, 0), } - go skLine.DeleteUnused() return skLine } @@ -107,6 +106,9 @@ func (nl *SocketLine) DeleteUnused() { } if lastMatchedReqTime == 0 { + // in case of tracking only tcp sockets without any requests matching them, socketLine will consume memory over time + // we need to delete all values in this case + nl.Values = make([]TimestampedSocket, 0) return } From 579730407931a9727d99fc6d2ccba34ae5233068 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 17:08:13 +0300 Subject: [PATCH 13/16] get cpu time for test --- main_benchmark_test.go | 98 +++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/main_benchmark_test.go b/main_benchmark_test.go index 8dbea90..fa8aaff 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -53,6 +53,7 @@ type SimulatorConfig struct { KubeEventsBufferSize int `json:"kubeEventsBufferSize"` EbpfEventsBufferSize int `json:"ebpfEventsBufferSize"` EbpfProcEventsBufferSize int `json:"ebpfProcEventsBufferSize"` + EbpfTcpEventsBufferSize int `json:"ebpfTcpEventsBufferSize"` TlsAttachQueueBufferSize int `json:"tlsAttachQueueBufferSize"` DsReqBufferSize int `json:"dsReqBufferSize"` } @@ -111,7 +112,21 @@ func TestSimulation(t *testing.T) { simLog.Info().Msg("context canceled") }() - sim.start(ctx, conf) + aggregatorCtx, aggregatorCancel := context.WithCancel(context.Background()) + sim.start(ctx, aggregatorCtx, conf) + + wait_start := time.Now() + var wait_end time.Time + for { + // all events are consumed + if len(sim.ebpfEvents) == 0 && len(sim.ebpfTcpEvents) == 0 { + wait_end = time.Now() + break + } + } + simLog.Info().Str("wait time", wait_end.Sub(wait_start).String()).Msg("waited for all events to be consumed") + + aggregatorCancel() totalReqReadyToBeSent := sim.getDataStore().(*MockDataStore).ReadyToBeSendReq.Load() putIntoBackendQueue := sim.getDataStore().(*MockDataStore).SendToBackendQueueReq.Load() @@ -223,16 +238,29 @@ func CaptureMemUsage(order int) { // fmt.Printf("\tGOGC percent: %v", (memMetrics[4].Value.Uint64())) // fmt.Printf("\tGOMEMLIMIT: %v\n", bToMb(memMetrics[5].Value.Uint64())) fmt.Printf("\tHeapFree: %v", HeapFree) - fmt.Printf("\tHeapReleased: %v", bToMb(memMetrics[7].Value.Uint64())) - fmt.Printf("\tHeapUnused: %v", HeapUnused) + // fmt.Printf("\tHeapReleased: %v", bToMb(memMetrics[7].Value.Uint64())) + // fmt.Printf("\tHeapUnused: %v", HeapUnused) // fmt.Printf("\tTotal: %v", bToMb(memMetrics[9].Value.Uint64())) - fmt.Printf("\tStack: %v", Stack) + // fmt.Printf("\tStack: %v", Stack) fmt.Printf("\tLiveGoroutines: %v", LiveGoroutines) proc, err := procfs.Self() if err != nil { simLog.Fatal().Err(err) } + + stat, err := proc.Stat() + if err != nil { + simLog.Fatal().Err(err) + } + cpuTime := stat.CPUTime() + fmt.Printf("\tCpuTime: %v", cpuTime) + // (utime + stime)/clocktick + // utime and stime measured in clock ticks + + // unix timestamp of the process in seconds + stat.StartTime() + smapRollup, err := proc.ProcSMapsRollup() if err != nil { simLog.Fatal().Err(err) @@ -262,17 +290,14 @@ func bToMb(b uint64) uint64 { return b / 1024 / 1024 } -func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) { - // TODO: call this func from another test, and after some time send a sigkill - // measure memory and cpu resources - +func (sim *Simulator) start(simCtx context.Context, ctx context.Context, conf *SimulatorConfig) { sim.Setup() // debug.SetGCPercent(80) - go sim.Simulate(ctx) + go sim.Simulate(simCtx) a := aggregator.NewAggregator(ctx, sim.getKubeEvents(), sim.getEbpfEvents(), - sim.getEbpfProcEvents(), sim.getTlsAttachQueue(), sim.getDataStore()) + sim.getEbpfProcEvents(), sim.getEbpfTcpEvents(), sim.getTlsAttachQueue(), sim.getDataStore()) a.Run() go http.ListenAndServe(":8181", nil) @@ -282,9 +307,11 @@ func (sim *Simulator) start(ctx context.Context, conf *SimulatorConfig) { type Simulator struct { kubeEvents chan interface{} // will be sent k8s events - // mockCollector ? + ebpfEvents chan interface{} ebpfProcEvents chan interface{} + ebpfTcpEvents chan interface{} + tlsAttachQueue chan uint32 mockDs datastore.DataStore @@ -302,6 +329,7 @@ func CreateSimulator(ctx context.Context, conf *SimulatorConfig) *Simulator { kubeEvents: make(chan interface{}, conf.KubeEventsBufferSize), ebpfEvents: make(chan interface{}, conf.EbpfEventsBufferSize), ebpfProcEvents: make(chan interface{}, conf.EbpfProcEventsBufferSize), + ebpfTcpEvents: make(chan interface{}, conf.EbpfTcpEventsBufferSize), tlsAttachQueue: make(chan uint32, conf.TlsAttachQueueBufferSize), mockDs: NewMockDataStore(ctx, conf), pods: map[string]*FakePod{}, @@ -327,6 +355,10 @@ func (s *Simulator) getTlsAttachQueue() chan uint32 { return s.tlsAttachQueue } +func (s *Simulator) getEbpfTcpEvents() chan interface{} { + return s.ebpfTcpEvents +} + type FakePod struct { Name string IP string @@ -446,8 +478,6 @@ func (sim *Simulator) Simulate(ctx context.Context) { ec-- // select one pod and service - // TODO: these randoms conflict ???? - pod := sim.pods[podKeys[rand.Intn(len(podKeys))]] svc := sim.services[svcKeys[rand.Intn(len(svcKeys))]] @@ -472,20 +502,19 @@ func (sim *Simulator) Simulate(ctx context.Context) { PodName: pod.Name, SvcName: svc.Name, } - sim.constructSockets([]*ConnectionConfig{cc}) wg.Add(1) // simulate traffic - go func(wg *sync.WaitGroup) { - sim.httpTraffic(ctx, &Traffic{ - pod: pod, - fd: fd, - svc: svc, - rate: rate.NewLimiter(rate.Limit(sim.conf.EdgeRate), sim.conf.EdgeRate), // 1000 events per second - protocol: l7_req.L7_PROTOCOL_HTTP, - }) + go func(wg *sync.WaitGroup, t *Traffic) { + sim.httpTraffic(ctx, t) wg.Done() - }(wg) + }(wg, &Traffic{ + pod: pod, + fd: fd, + svc: svc, + rate: rate.NewLimiter(rate.Limit(sim.conf.EdgeRate), sim.conf.EdgeRate), // 1000 events per second + protocol: l7_req.L7_PROTOCOL_HTTP, + }) } simLog.Warn().Msg("waiting for traffic to stop") @@ -530,7 +559,7 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { simLog.Warn().Any("payload", payload) - blockingLogged := false + // blockingLogged := false for { // time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) select { @@ -545,8 +574,7 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { // go func() { // TODO:! when a new goroutine spawned for each event stack rocketed - select { - case sim.ebpfEvents <- &l7_req.L7Event{ + sim.ebpfEvents <- &l7_req.L7Event{ Fd: t.fd, Pid: t.pod.Pid, Status: 200, @@ -564,14 +592,16 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { Tid: 0, Seq: 0, EventReadTime: 0, - }: - default: - if !blockingLogged { - simLog.Warn().Msg("block on ebpfEvents chan") - blockingLogged = true - } } - // }() + // select { + // case + // // default: + // // if !blockingLogged { + // // simLog.Warn().Msg("block on ebpfEvents chan") + // // blockingLogged = true + // // } + // // } + // // }() } } @@ -583,7 +613,7 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { // {pid,fd} duo is used to socketLine struct // socketInfo corresponding to requests timestamp is retrieved func (sim *Simulator) tcpEstablish(srcPid uint32, fd uint64, saddr string, daddr string, tx uint64) { - sim.ebpfEvents <- &tcp_state.TcpConnectEvent{ + sim.ebpfTcpEvents <- &tcp_state.TcpConnectEvent{ Fd: fd, Timestamp: tx, Type_: tcp_state.EVENT_TCP_ESTABLISHED, From 5430278178d77b620506f8287faa80ae41aa7fa3 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 6 Feb 2024 17:47:34 +0300 Subject: [PATCH 14/16] test config file --- testconfig/config1.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testconfig/config1.json b/testconfig/config1.json index a49579d..ccae66b 100644 --- a/testconfig/config1.json +++ b/testconfig/config1.json @@ -1,5 +1,5 @@ { - "testDuration" : 30, + "testDuration" : 15, "memProfInterval" : 5, "podCount": 100, "serviceCount" : 50, @@ -9,6 +9,7 @@ "ebpfEventsBufferSize": 200000, "ebpfProcEventsBufferSize" : 100, "tlsAttachQueueBufferSize" : 10, + "ebpfTcpEventsBufferSize" : 1000, "dsReqBufferSize" : 150000, "mockBackendMinLatency": 5, "mockBackendMaxLatency": 20 From a017e989aacf9ab8358b54da5c11761ff791d3a4 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Wed, 7 Feb 2024 12:16:39 +0300 Subject: [PATCH 15/16] update number of workers --- aggregator/data.go | 7 ++++--- main_benchmark_test.go | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index 31a80bd..eeeb685 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -276,10 +276,11 @@ func (a *Aggregator) Run() { go a.processk8s() cpuCount := runtime.NumCPU() - numWorker := 10 * cpuCount - if numWorker < 100 { - numWorker = 100 // min number + numWorker := 5 * cpuCount + if numWorker < 50 { + numWorker = 50 // min number } + for i := 0; i < numWorker; i++ { go a.processEbpf(a.ctx) go a.processEbpfTcp(a.ctx) diff --git a/main_benchmark_test.go b/main_benchmark_test.go index fa8aaff..b841639 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -34,6 +34,8 @@ import ( "github.com/stretchr/testify/assert" ) +// CGO_ENABLED=0 TEST_MODE=true DISABLE_LOGS=true go test + type SimulatorConfig struct { // number of processes // pod and services From e25da4c99d9c0cd222d6d9185f3607d12da12360 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Wed, 7 Feb 2024 12:30:26 +0300 Subject: [PATCH 16/16] populate liveProcesses sync --- aggregator/data.go | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index eeeb685..7cae982 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -206,32 +206,30 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, } func (a *Aggregator) Run() { - go func() { - // get all alive processes, populate liveProcesses - cmd := exec.Command("ps", "-e", "-o", "pid=") - output, err := cmd.Output() - if err != nil { - log.Logger.Fatal().Err(err).Msg("error getting all alive processes") - } + // get all alive processes, populate liveProcesses + cmd := exec.Command("ps", "-e", "-o", "pid=") + output, err := cmd.Output() + if err != nil { + log.Logger.Fatal().Err(err).Msg("error getting all alive processes") + } - lines := strings.Split(string(output), "\n") - for _, line := range lines { - line = strings.TrimSpace(line) - if line != "" { - fields := strings.Fields(line) - if len(fields) > 0 { - pid := fields[0] - pidInt, err := strconv.Atoi(pid) - if err != nil { - log.Logger.Error().Err(err).Msgf("error converting pid to int %s", pid) - continue - } - a.liveProcesses[uint32(pidInt)] = struct{}{} + lines := strings.Split(string(output), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" { + fields := strings.Fields(line) + if len(fields) > 0 { + pid := fields[0] + pidInt, err := strconv.Atoi(pid) + if err != nil { + log.Logger.Error().Err(err).Msgf("error converting pid to int %s", pid) + continue } + a.liveProcesses[uint32(pidInt)] = struct{}{} } } + } - }() go func() { // every 2 minutes, check alive processes, and clear the ones left behind // since we process events concurrently, some short-lived processes exit event can come before exec events