Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get address pair from ebpf combined with event #160

Merged
merged 3 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 97 additions & 115 deletions aggregator/data.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package aggregator

// aggregate data from different sources
// 1. k8s
// 2. containerd (TODO)
// 3. ebpf
// 4. cgroup (TODO)
// 5. docker (TODO)

import (
"bytes"
"context"
Expand Down Expand Up @@ -324,6 +317,7 @@ func (a *Aggregator) processEbpf(ctx context.Context) {
case l7_req.L7_EVENT:
d := data.(*l7_req.L7Event) // copy data's value
ctxPid := context.WithValue(a.ctx, log.LOG_CONTEXT, fmt.Sprint(d.Pid))
go a.signalTlsAttachment(d.Pid)
a.processL7(ctxPid, d)
case l7_req.TRACE_EVENT:
d := data.(*l7_req.TraceEvent)
Expand Down Expand Up @@ -460,7 +454,9 @@ func (a *Aggregator) processTcpConnect(ctx context.Context, d *tcp_state.TcpConn
}

var skLine *SocketLine
sockMap.mu.RLock()
skLine, ok = sockMap.M[d.Fd]
sockMap.mu.RUnlock()
if !ok {
return
}
Expand Down Expand Up @@ -571,19 +567,16 @@ func (a *Aggregator) processHttp2Frames() {
return
}

skInfo, err := a.findRelatedSocket(a.ctx, d)
if skInfo == nil || err != nil {
return
}
addrPair := extractAddressPair(d)

req.Latency = d.WriteTimeNs - req.Latency
req.StartTime = int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6) // nano to milli
req.Completed = true
req.FromIP = skInfo.Saddr
req.ToIP = skInfo.Daddr
req.FromIP = addrPair.Saddr
req.ToIP = addrPair.Daddr
req.Tls = d.Tls
req.FromPort = skInfo.Sport
req.ToPort = skInfo.Dport
req.FromPort = addrPair.Sport
req.ToPort = addrPair.Dport
req.FailReason = ""
if req.Protocol == "" {
req.Protocol = "HTTP2"
Expand All @@ -596,7 +589,7 @@ func (a *Aggregator) processHttp2Frames() {
}

// toUID is set to :authority header in client frame
err = a.setFromTo(skInfo, d, req, req.ToUID)
err := a.setFromToV2(addrPair, d, req, req.ToUID)
if err != nil {
return
}
Expand Down Expand Up @@ -814,6 +807,52 @@ func (a *Aggregator) getSvcWithIP(addr string) (types.UID, bool) {
svcUid, ok := a.clusterInfo.ServiceIPToServiceUid[addr]
a.clusterInfo.k8smu.RUnlock() // unlock for reading
return svcUid, ok

}

func (a *Aggregator) setFromToV2(addrPair *AddressPair, d *l7_req.L7Event, event datastore.DirectionalEvent, hostHeader string) error {
// find pod info
podUid, ok := a.getPodWithIP(addrPair.Saddr)
if !ok {
return fmt.Errorf("error finding pod with sockets saddr")
}

event.SetFromUID(string(podUid))
event.SetFromType(POD)
event.SetFromPort(addrPair.Sport)
event.SetToPort(addrPair.Dport)

// find service info
svcUid, ok := a.getSvcWithIP(addrPair.Daddr)
if ok {
event.SetToUID(string(svcUid))
event.SetToType(SVC)
} else {
podUid, ok := a.getPodWithIP(addrPair.Daddr)

if ok {
event.SetToUID(string(podUid))
event.SetToType(POD)
} else {
// 3rd party url
if hostHeader != "" {
event.SetToUID(hostHeader)
event.SetToType(OUTBOUND)
} else {
remoteDnsHost, err := getHostnameFromIP(addrPair.Daddr)
if err == nil {
// dns lookup successful
event.SetToUID(remoteDnsHost)
event.SetToType(OUTBOUND)
} else {
event.SetToUID(addrPair.Daddr)
event.SetToType(OUTBOUND)
}
}
}
}

return nil
}

func (a *Aggregator) setFromTo(skInfo *SockInfo, d *l7_req.L7Event, event datastore.DirectionalEvent, hostHeader string) error {
Expand Down Expand Up @@ -978,39 +1017,20 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}

log.Logger.Debug().
Ctx(ctx).
Err(err).
Uint32("pid", d.Pid).
Uint64("fd", d.Fd).
Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).
Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding kafka event, socket not found")
addrPair := extractAddressPair(d)

return
}
for _, msg := range kafkaMessages {
event := &datastore.KafkaEvent{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
FromIP: addrPair.Saddr,
FromType: "",
FromUID: "",
FromPort: 0,
ToIP: skInfo.Daddr,
FromPort: addrPair.Sport,
ToIP: addrPair.Daddr,
ToType: "",
ToUID: "",
ToPort: 0,
ToPort: addrPair.Dport,
Tls: d.Tls,
Topic: msg.TopicName,
Partition: uint32(msg.Partition),
Expand All @@ -1021,7 +1041,7 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err := a.setFromTo(skInfo, d, event, "")
err := a.setFromToV2(addrPair, d, event, "")
if err != nil {
return
}
Expand All @@ -1042,26 +1062,13 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
}

func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().Uint32("pid", d.Pid).Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding amqp event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1073,7 +1080,7 @@ func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err := a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand All @@ -1095,28 +1102,13 @@ func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) {
func (a *Aggregator) processRedisEvent(ctx context.Context, d *l7_req.L7Event) {
query := string(d.Payload[0:d.PayloadSize])

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().
Ctx(ctx).
Err(err).
Uint32("pid", d.Pid).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("discarding redis event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1128,7 +1120,7 @@ func (a *Aggregator) processRedisEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err := a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand Down Expand Up @@ -1202,28 +1194,13 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
_, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil || err != nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}
log.Logger.Debug().Uint32("pid", d.Pid).
Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).
Any("payload", string(d.Payload[:d.PayloadSize])).
Msg("discarding http event, socket not found")
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1235,7 +1212,7 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, reqHostHeader)
err := a.setFromToV2(addrPair, d, reqDto, reqHostHeader)
if err != nil {
return
}
Expand All @@ -1262,28 +1239,13 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event
return
}

skInfo, err := a.findRelatedSocket(ctx, d)
if skInfo == nil {
// requeue event if this is its first time
if !d.PutBack {
d.PutBack = true
a.ebpfChan <- d
return
}

log.Logger.Debug().Uint32("pid", d.Pid).
Err(err).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("discarding postgres event, socket not found")

return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
Expand All @@ -1295,7 +1257,7 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event
Seq: d.Seq,
}

err = a.setFromTo(skInfo, d, reqDto, "")
err = a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}
Expand Down Expand Up @@ -1580,3 +1542,23 @@ func convertKernelTimeToUserspaceTime(writeTime uint64) uint64 {
func convertUserTimeToKernelTime(now uint64) uint64 {
return l7_req.FirstKernelTime - (l7_req.FirstUserspaceTime - now)
}

// IntToIPv4 converts IP address of version 4 from integer to net.IP
// representation.
func IntToIPv4(ipaddr uint32) net.IP {
ip := make(net.IP, net.IPv4len)

// Proceed conversion
binary.BigEndian.PutUint32(ip, ipaddr)

return ip
}

func extractAddressPair(d *l7_req.L7Event) *AddressPair {
return &AddressPair{
Saddr: IntToIPv4(d.Saddr).String(),
Sport: d.Sport,
Daddr: IntToIPv4(d.Daddr).String(),
Dport: d.Dport,
}
}
1 change: 0 additions & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (nl *SocketLine) DeleteUnused() {
i-- // we deleted two values, so we need to decrement i by 2
}
}

}

type sock struct {
Expand Down
7 changes: 7 additions & 0 deletions aggregator/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"github.com/ddosify/alaz/log"
)

type AddressPair struct {
Saddr string `json:"saddr"`
Sport uint16 `json:"sport"`
Daddr string `json:"daddr"`
Dport uint16 `json:"dport"`
}

// We need to keep track of the following
// in order to build find relationships between
// connections and pods/services
Expand Down
Loading