Skip to content

Commit 8ba4c0f

Browse files
authored
Merge pull request #40 from ddosify/develop
memory efficiency
2 parents 5dd54aa + ece114c commit 8ba4c0f

File tree

5 files changed

+233
-96
lines changed

5 files changed

+233
-96
lines changed

aggregator/data.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,30 @@ func init() {
102102
reverseDnsCache = cache.New(defaultExpiration, purgeTime)
103103
}
104104

105+
func clearSocketLines(ctx context.Context, pidToSocketMap map[uint32]*SocketMap) {
106+
ticker := time.NewTicker(1 * time.Minute)
107+
skLineCh := make(chan *SocketLine, 1000)
108+
109+
go func() {
110+
// spawn N goroutines to clear socket map
111+
for i := 0; i < 10; i++ {
112+
go func() {
113+
for skLine := range skLineCh {
114+
skLine.DeleteUnused()
115+
}
116+
}()
117+
}
118+
}()
119+
120+
for range ticker.C {
121+
for _, socketMap := range pidToSocketMap {
122+
for _, socketLine := range socketMap.M {
123+
skLineCh <- socketLine
124+
}
125+
}
126+
}
127+
}
128+
105129
func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *ebpf.EbpfCollector, ds datastore.DataStore) *Aggregator {
106130
ctx, _ := context.WithCancel(parentCtx)
107131
clusterInfo := &ClusterInfo{
@@ -110,6 +134,8 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *eb
110134
PidToSocketMap: make(map[uint32]*SocketMap, 0),
111135
}
112136

137+
go clearSocketLines(ctx, clusterInfo.PidToSocketMap)
138+
113139
return &Aggregator{
114140
ctx: ctx,
115141
k8sChan: k8sChan,
@@ -122,7 +148,11 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *eb
122148

123149
func (a *Aggregator) Run() {
124150
go a.processk8s()
125-
go a.processEbpf(a.ctx)
151+
152+
numWorker := 10
153+
for i := 0; i < numWorker; i++ {
154+
go a.processEbpf(a.ctx)
155+
}
126156
}
127157

128158
func (a *Aggregator) processk8s() {

aggregator/sock_num_line.go

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,8 @@ func NewSocketLine(pid uint32, fd uint64) *SocketLine {
4848
func (nl *SocketLine) AddValue(timestamp uint64, sockInfo *SockInfo) {
4949
nl.mu.Lock()
5050
defer nl.mu.Unlock()
51-
nl.Values = append(nl.Values, TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo})
52-
sort.Slice(nl.Values, func(i, j int) bool {
53-
return nl.Values[i].Timestamp < nl.Values[j].Timestamp
54-
})
51+
52+
nl.Values = insertIntoSortedSlice(nl.Values, TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo})
5553
}
5654

5755
func (nl *SocketLine) GetValue(timestamp uint64) (*SockInfo, error) {
@@ -92,47 +90,42 @@ func (nl *SocketLine) GetValue(timestamp uint64) (*SockInfo, error) {
9290

9391
func (nl *SocketLine) DeleteUnused() {
9492
// Delete socket lines that are not in use
95-
ticker := time.NewTicker(1 * time.Minute)
96-
97-
for range ticker.C {
98-
func() {
99-
nl.mu.Lock()
100-
defer nl.mu.Unlock()
93+
nl.mu.Lock()
94+
defer nl.mu.Unlock()
10195

102-
if len(nl.Values) == 0 {
103-
return
104-
}
96+
if len(nl.Values) == 0 {
97+
return
98+
}
10599

106-
var lastMatchedReqTime uint64 = 0
100+
var lastMatchedReqTime uint64 = 0
107101

108-
// traverse the slice backwards
109-
for i := len(nl.Values) - 1; i >= 0; i-- {
110-
if nl.Values[i].LastMatch != 0 && nl.Values[i].LastMatch > lastMatchedReqTime {
111-
lastMatchedReqTime = nl.Values[i].LastMatch
112-
}
113-
}
102+
// traverse the slice backwards
103+
for i := len(nl.Values) - 1; i >= 0; i-- {
104+
if nl.Values[i].LastMatch != 0 && nl.Values[i].LastMatch > lastMatchedReqTime {
105+
lastMatchedReqTime = nl.Values[i].LastMatch
106+
}
107+
}
114108

115-
if lastMatchedReqTime == 0 {
116-
return
117-
}
109+
if lastMatchedReqTime == 0 {
110+
return
111+
}
118112

119-
// assumedInterval is inversely proportional to the number of requests being discarded
120-
assumedInterval := uint64(5 * time.Minute)
113+
// assumedInterval is inversely proportional to the number of requests being discarded
114+
assumedInterval := uint64(5 * time.Minute)
121115

122-
// delete all values that
123-
// closed and its LastMatch + assumedInterval < lastMatchedReqTime
124-
for i := len(nl.Values) - 1; i >= 1; i-- {
125-
if nl.Values[i].SockInfo == nil &&
126-
nl.Values[i-1].SockInfo != nil &&
127-
nl.Values[i-1].LastMatch+assumedInterval < lastMatchedReqTime {
116+
// delete all values that
117+
// closed and its LastMatch + assumedInterval < lastMatchedReqTime
118+
for i := len(nl.Values) - 1; i >= 1; i-- {
119+
if nl.Values[i].SockInfo == nil &&
120+
nl.Values[i-1].SockInfo != nil &&
121+
nl.Values[i-1].LastMatch+assumedInterval < lastMatchedReqTime {
128122

129-
// delete these two values
130-
nl.Values = append(nl.Values[:i-1], nl.Values[i+1:]...)
131-
i-- // we deleted two values, so we need to decrement i by 2
132-
}
133-
}
134-
}()
123+
// delete these two values
124+
nl.Values = append(nl.Values[:i-1], nl.Values[i+1:]...)
125+
i-- // we deleted two values, so we need to decrement i by 2
126+
}
135127
}
128+
136129
}
137130

138131
func (nl *SocketLine) GetAlreadyExistingSockets() {
@@ -305,3 +298,16 @@ const (
305298
stateEstablished = "01"
306299
stateListen = "0A"
307300
)
301+
302+
func insertIntoSortedSlice(sortedSlice []TimestampedSocket, newItem TimestampedSocket) []TimestampedSocket {
303+
idx := sort.Search(len(sortedSlice), func(i int) bool {
304+
return sortedSlice[i].Timestamp >= newItem.Timestamp
305+
})
306+
307+
// Insert the new item at the correct position.
308+
sortedSlice = append(sortedSlice, TimestampedSocket{})
309+
copy(sortedSlice[idx+1:], sortedSlice[idx:])
310+
sortedSlice[idx] = newItem
311+
312+
return sortedSlice
313+
}

datastore/backend.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
130130
shouldRetry = true
131131
log.Logger.Warn().Msgf("will retry, error: %v", err)
132132
} else {
133+
defer resp.Body.Close()
133134
if resp.StatusCode == http.StatusBadRequest ||
134135
resp.StatusCode == http.StatusTooManyRequests ||
135136
resp.StatusCode >= http.StatusInternalServerError {
@@ -254,13 +255,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
254255
return
255256
}
256257

257-
body, err := io.ReadAll(resp.Body)
258-
if err != nil {
259-
log.Logger.Error().Msgf("error reading inner metrics response body: %v", err)
260-
return
261-
}
262-
263-
req, err = http.NewRequest(http.MethodPost, fmt.Sprintf("%s/alaz/metrics/scrape/?instance=%s&monitoring_id=%s", ds.host, NodeID, MonitoringID), bytes.NewReader(body))
258+
req, err = http.NewRequest(http.MethodPost, fmt.Sprintf("%s/alaz/metrics/scrape/?instance=%s&monitoring_id=%s", ds.host, NodeID, MonitoringID), resp.Body)
264259
if err != nil {
265260
log.Logger.Error().Msgf("error creating metrics request: %v", err)
266261
return

datastore/datastore.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datastore
22

3+
import "github.com/ddosify/alaz/log"
4+
35
type DataStore interface {
46
PersistPod(pod Pod, eventType string) error
57
PersistService(service Service, eventType string) error
@@ -11,3 +13,46 @@ type DataStore interface {
1113

1214
PersistRequest(request *Request) error
1315
}
16+
17+
type MockDataStore struct {
18+
}
19+
20+
func (m *MockDataStore) PersistPod(pod Pod, eventType string) error {
21+
log.Logger.Debug().Str("pod", pod.Name).Msg("PersistPod")
22+
return nil
23+
}
24+
25+
func (m *MockDataStore) PersistService(service Service, eventType string) error {
26+
log.Logger.Debug().Str("service", service.Name).Msg("PersistService")
27+
return nil
28+
}
29+
30+
func (m *MockDataStore) PersistReplicaSet(rs ReplicaSet, eventType string) error {
31+
log.Logger.Debug().Str("replicaset", rs.Name).Msg("PersistReplicaSet")
32+
return nil
33+
}
34+
35+
func (m *MockDataStore) PersistDeployment(d Deployment, eventType string) error {
36+
log.Logger.Debug().Str("deployment", d.Name).Msg("PersistDeployment")
37+
return nil
38+
}
39+
40+
func (m *MockDataStore) PersistEndpoints(e Endpoints, eventType string) error {
41+
log.Logger.Debug().Str("endpoints", e.Name).Msg("PersistEndpoints")
42+
return nil
43+
}
44+
45+
func (m *MockDataStore) PersistContainer(c Container, eventType string) error {
46+
log.Logger.Debug().Str("container", c.Name).Msg("PersistContainer")
47+
return nil
48+
}
49+
50+
func (m *MockDataStore) PersistDaemonSet(ds DaemonSet, eventType string) error {
51+
log.Logger.Debug().Str("daemonset", ds.Name).Msg("PersistDaemonSet")
52+
return nil
53+
}
54+
55+
func (m *MockDataStore) PersistRequest(request *Request) error {
56+
log.Logger.Debug().Bool("isTls", request.Tls).Str("path", request.Path).Msg("PersistRequest")
57+
return nil
58+
}

0 commit comments

Comments
 (0)