Skip to content

Commit 1c40178

Browse files
authored
Merge pull request #89 from ddosify/develop
Refactor/speed up
2 parents 80596e7 + 9968000 commit 1c40178

File tree

3 files changed

+75
-143
lines changed

3 files changed

+75
-143
lines changed

aggregator/data.go

Lines changed: 71 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"context"
1313
"encoding/binary"
1414
"fmt"
15+
"io"
1516
"net"
1617
"os"
1718
"os/exec"
@@ -47,13 +48,11 @@ type Aggregator struct {
4748

4849
// listen to events from different sources
4950
k8sChan <-chan interface{}
50-
ebpfChan chan interface{}
51+
ebpfChan <-chan interface{}
5152
ebpfProcChan <-chan interface{}
5253
ebpfTcpChan <-chan interface{}
5354
tlsAttachSignalChan chan uint32
5455

55-
ec *ebpf.EbpfCollector
56-
5756
// store the service map
5857
clusterInfo *ClusterInfo
5958

@@ -118,18 +117,7 @@ type ClusterInfo struct {
118117

119118
// Pid -> SocketMap
120119
// pid -> fd -> {saddr, sport, daddr, dport}
121-
122-
// shard pidToSocketMap by pid to reduce lock contention
123-
mu0 sync.RWMutex
124-
mu1 sync.RWMutex
125-
mu2 sync.RWMutex
126-
mu3 sync.RWMutex
127-
mu4 sync.RWMutex
128-
PidToSocketMap0 map[uint32]*SocketMap `json:"pidToSocketMap0"` // pid ending with 0-1
129-
PidToSocketMap1 map[uint32]*SocketMap `json:"pidToSocketMap1"` // pid ending with 2-3
130-
PidToSocketMap2 map[uint32]*SocketMap `json:"pidToSocketMap2"` // pid ending with 4-5
131-
PidToSocketMap3 map[uint32]*SocketMap `json:"pidToSocketMap3"` // pid ending with 6-7
132-
PidToSocketMap4 map[uint32]*SocketMap `json:"pidToSocketMap4"` // pid ending with 8-9
120+
SocketMaps []*SocketMap // index symbolizes pid
133121
}
134122

135123
// If we have information from the container runtimes
@@ -154,10 +142,7 @@ var (
154142
purgeTime = 10 * time.Minute
155143
)
156144

157-
var usePgDs bool = false
158-
var useBackendDs bool = true // default to true
159145
var reverseDnsCache *cache.Cache
160-
161146
var re *regexp.Regexp
162147

163148
func init() {
@@ -179,13 +164,24 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
179164
clusterInfo := &ClusterInfo{
180165
PodIPToPodUid: map[string]types.UID{},
181166
ServiceIPToServiceUid: map[string]types.UID{},
182-
PidToSocketMap0: make(map[uint32]*SocketMap),
183-
PidToSocketMap1: make(map[uint32]*SocketMap),
184-
PidToSocketMap2: make(map[uint32]*SocketMap),
185-
PidToSocketMap3: make(map[uint32]*SocketMap),
186-
PidToSocketMap4: make(map[uint32]*SocketMap),
187167
}
188168

169+
maxPid, err := getPidMax()
170+
if err != nil {
171+
log.Logger.Fatal().Err(err).Msg("error getting max pid")
172+
}
173+
sockMaps := make([]*SocketMap, maxPid+1) // index=pid
174+
175+
// initialize sockMaps
176+
for i := range sockMaps {
177+
sockMaps[i] = &SocketMap{
178+
M: nil, // initialized on demand later
179+
mu: sync.RWMutex{},
180+
}
181+
}
182+
183+
clusterInfo.SocketMaps = sockMaps
184+
189185
a := &Aggregator{
190186
ctx: ctx,
191187
k8sChan: k8sChan,
@@ -289,6 +285,7 @@ func (a *Aggregator) Run() {
289285
}()
290286
go a.processk8s()
291287

288+
// TODO: determine the number of workers with benchmarking
292289
cpuCount := runtime.NumCPU()
293290
numWorker := 5 * cpuCount
294291
if numWorker < 50 {
@@ -300,7 +297,6 @@ func (a *Aggregator) Run() {
300297
go a.processEbpfTcp(a.ctx)
301298
}
302299

303-
// TODO: pod number may be ideal
304300
for i := 0; i < 2*cpuCount; i++ {
305301
go a.processHttp2Frames()
306302
go a.processEbpfProc(a.ctx)
@@ -472,23 +468,15 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
472468
var sockMap *SocketMap
473469
var ok bool
474470

475-
mu, pidToSocketMap := a.getShard(d.Pid)
476-
mu.Lock()
477-
sockMap, ok = pidToSocketMap[d.Pid]
478-
if !ok {
479-
sockMap = &SocketMap{
480-
M: make(map[uint64]*SocketLine),
481-
mu: sync.RWMutex{},
482-
}
483-
pidToSocketMap[d.Pid] = sockMap
484-
}
485-
mu.Unlock() // unlock for writing
486-
471+
sockMap = a.clusterInfo.SocketMaps[d.Pid]
487472
var skLine *SocketLine
488473

489474
sockMap.mu.Lock() // lock for reading
490-
skLine, ok = sockMap.M[d.Fd]
475+
if sockMap.M == nil {
476+
sockMap.M = make(map[uint64]*SocketLine)
477+
}
491478

479+
skLine, ok = sockMap.M[d.Fd]
492480
if !ok {
493481
skLine = NewSocketLine(d.Pid, d.Fd)
494482
sockMap.M[d.Fd] = skLine
@@ -512,24 +500,14 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
512500
var sockMap *SocketMap
513501
var ok bool
514502

515-
mu, pidToSocketMap := a.getShard(d.Pid)
516-
mu.Lock()
517-
sockMap, ok = pidToSocketMap[d.Pid]
518-
if !ok {
519-
sockMap = &SocketMap{
520-
M: make(map[uint64]*SocketLine),
521-
mu: sync.RWMutex{},
522-
}
523-
524-
pidToSocketMap[d.Pid] = sockMap
525-
mu.Unlock() // unlock for writing
526-
return
527-
}
528-
mu.Unlock()
503+
sockMap = a.clusterInfo.SocketMaps[d.Pid]
529504

530505
var skLine *SocketLine
531506

532507
sockMap.mu.Lock() // lock for reading
508+
if sockMap.M == nil {
509+
sockMap.M = make(map[uint64]*SocketLine)
510+
}
533511
skLine, ok = sockMap.M[d.Fd]
534512
if !ok {
535513
sockMap.mu.Unlock() // unlock for reading
@@ -1068,17 +1046,9 @@ func (a *Aggregator) fetchSkLine(sockMap *SocketMap, pid uint32, fd uint64) *Soc
10681046
// add it to the socket map
10691047
func (a *Aggregator) getAlreadyExistingSockets(pid uint32) {
10701048
// no need for locking because this is called firstmost and no other goroutine is running
1071-
_, pidToSocketMap := a.getShard(pid)
1072-
sockMap, ok := pidToSocketMap[pid]
1073-
if !ok {
1074-
sockMap = &SocketMap{
1075-
M: make(map[uint64]*SocketLine),
1076-
mu: sync.RWMutex{},
1077-
}
1078-
pidToSocketMap[pid] = sockMap
1079-
}
10801049

10811050
socks := map[string]sock{}
1051+
sockMap := a.fetchSocketMap(pid)
10821052

10831053
// Get the sockets for the process.
10841054
var err error
@@ -1140,7 +1110,12 @@ func (a *Aggregator) getAlreadyExistingSockets(pid uint32) {
11401110
skLine := NewSocketLine(pid, fd.Fd)
11411111
skLine.AddValue(0, sockInfo)
11421112

1113+
sockMap.mu.Lock()
1114+
if sockMap.M == nil {
1115+
sockMap.M = make(map[uint64]*SocketLine)
1116+
}
11431117
sockMap.M[fd.Fd] = skLine
1118+
sockMap.mu.Unlock()
11441119
}
11451120
}
11461121

@@ -1178,56 +1153,20 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_
11781153
return skInfo
11791154
}
11801155

1181-
func (a *Aggregator) getShard(pid uint32) (*sync.RWMutex, map[uint32]*SocketMap) {
1182-
lastDigit := pid % 10
1183-
var mu *sync.RWMutex
1184-
var pidToSocketMap map[uint32]*SocketMap
1185-
switch lastDigit {
1186-
case 0, 1:
1187-
mu = &a.clusterInfo.mu0
1188-
pidToSocketMap = a.clusterInfo.PidToSocketMap0
1189-
case 2, 3:
1190-
mu = &a.clusterInfo.mu1
1191-
pidToSocketMap = a.clusterInfo.PidToSocketMap1
1192-
case 4, 5:
1193-
mu = &a.clusterInfo.mu2
1194-
pidToSocketMap = a.clusterInfo.PidToSocketMap2
1195-
case 6, 7:
1196-
mu = &a.clusterInfo.mu3
1197-
pidToSocketMap = a.clusterInfo.PidToSocketMap3
1198-
case 8, 9:
1199-
mu = &a.clusterInfo.mu4
1200-
pidToSocketMap = a.clusterInfo.PidToSocketMap4
1201-
}
1202-
1203-
return mu, pidToSocketMap
1204-
}
1205-
12061156
func (a *Aggregator) removeFromClusterInfo(pid uint32) {
1207-
mu, pidToSocketMap := a.getShard(pid)
1208-
mu.Lock()
1209-
delete(pidToSocketMap, pid)
1210-
mu.Unlock()
1157+
sockMap := a.clusterInfo.SocketMaps[pid]
1158+
sockMap.mu.Lock()
1159+
sockMap.M = nil
1160+
sockMap.mu.Unlock()
12111161
}
12121162

12131163
func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap {
1214-
var sockMap *SocketMap
1215-
var ok bool
1216-
1217-
mu, pidToSocketMap := a.getShard(pid) // create shard if not exists
1218-
mu.Lock() // lock for reading
1219-
sockMap, ok = pidToSocketMap[pid]
1220-
if !ok {
1221-
// initialize socket map
1222-
sockMap = &SocketMap{
1223-
M: make(map[uint64]*SocketLine),
1224-
mu: sync.RWMutex{},
1225-
}
1226-
pidToSocketMap[pid] = sockMap
1227-
1228-
go a.signalTlsAttachment(pid)
1164+
sockMap := a.clusterInfo.SocketMaps[pid]
1165+
sockMap.mu.Lock()
1166+
if sockMap.M == nil {
1167+
sockMap.M = make(map[uint64]*SocketLine)
12291168
}
1230-
mu.Unlock() // unlock for writing
1169+
sockMap.mu.Unlock()
12311170

12321171
return sockMap
12331172
}
@@ -1408,44 +1347,36 @@ func (a *Aggregator) clearSocketLines(ctx context.Context) {
14081347
}()
14091348

14101349
for range ticker.C {
1411-
a.clusterInfo.mu0.RLock()
1412-
for _, socketMap := range a.clusterInfo.PidToSocketMap0 {
1413-
for _, socketLine := range socketMap.M {
1414-
skLineCh <- socketLine
1415-
}
1416-
}
1417-
a.clusterInfo.mu0.RUnlock()
1418-
1419-
a.clusterInfo.mu1.RLock()
1420-
for _, socketMap := range a.clusterInfo.PidToSocketMap1 {
1421-
for _, socketLine := range socketMap.M {
1422-
skLineCh <- socketLine
1423-
}
1424-
}
1425-
a.clusterInfo.mu1.RUnlock()
1426-
1427-
a.clusterInfo.mu2.RLock()
1428-
for _, socketMap := range a.clusterInfo.PidToSocketMap2 {
1429-
for _, socketLine := range socketMap.M {
1430-
skLineCh <- socketLine
1350+
for _, sockMap := range a.clusterInfo.SocketMaps {
1351+
sockMap.mu.Lock()
1352+
if sockMap.M != nil {
1353+
for _, skLine := range sockMap.M {
1354+
skLineCh <- skLine
1355+
}
14311356
}
1357+
sockMap.mu.Unlock()
14321358
}
1433-
a.clusterInfo.mu2.RUnlock()
1359+
}
1360+
}
14341361

1435-
a.clusterInfo.mu3.RLock()
1436-
for _, socketMap := range a.clusterInfo.PidToSocketMap3 {
1437-
for _, socketLine := range socketMap.M {
1438-
skLineCh <- socketLine
1439-
}
1440-
}
1441-
a.clusterInfo.mu3.RUnlock()
1362+
func getPidMax() (int, error) {
1363+
// Read the contents of the file
1364+
f, err := os.Open("/proc/sys/kernel/pid_max")
1365+
if err != nil {
1366+
fmt.Println("Error opening file:", err)
1367+
return 0, err
1368+
}
1369+
content, err := io.ReadAll(f)
1370+
if err != nil {
1371+
fmt.Println("Error reading file:", err)
1372+
return 0, err
1373+
}
14421374

1443-
a.clusterInfo.mu4.RLock()
1444-
for _, socketMap := range a.clusterInfo.PidToSocketMap4 {
1445-
for _, socketLine := range socketMap.M {
1446-
skLineCh <- socketLine
1447-
}
1448-
}
1449-
a.clusterInfo.mu4.RUnlock()
1375+
// Convert the content to an integer
1376+
pidMax, err := strconv.Atoi(string(content[:len(content)-1])) // trim newline
1377+
if err != nil {
1378+
fmt.Println("Error converting to integer:", err)
1379+
return 0, err
14501380
}
1381+
return pidMax, nil
14511382
}

ebpf/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func NewEbpfCollector(parentCtx context.Context) *EbpfCollector {
7272
ctx: ctx,
7373
done: make(chan struct{}),
7474
ebpfEvents: make(chan interface{}, 100000), // interface is 16 bytes, 16 * 100000 = 8 Megabytes
75-
ebpfProcEvents: make(chan interface{}, 100),
75+
ebpfProcEvents: make(chan interface{}, 2000),
7676
ebpfTcpEvents: make(chan interface{}, 1000),
7777
tlsPidMap: make(map[uint32]struct{}),
7878
sslWriteUprobes: make(map[uint32]link.Link),

main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,12 @@ func main() {
7272
var ec *ebpf.EbpfCollector
7373
if ebpfEnabled {
7474
ec = ebpf.NewEbpfCollector(ctx)
75-
ec.Init()
76-
go ec.ListenEvents()
7775

7876
a := aggregator.NewAggregator(ctx, kubeEvents, ec.EbpfEvents(), ec.EbpfProcEvents(), ec.EbpfTcpEvents(), ec.TlsAttachQueue(), dsBackend)
7977
a.Run()
78+
79+
ec.Init()
80+
go ec.ListenEvents()
8081
}
8182

8283
go http.ListenAndServe(":8181", nil)

0 commit comments

Comments
 (0)