@@ -22,6 +22,7 @@ import (
22
22
"strconv"
23
23
"strings"
24
24
"sync"
25
+ "sync/atomic"
25
26
"syscall"
26
27
27
28
"golang.org/x/time/rate"
@@ -79,7 +80,7 @@ type Aggregator struct {
79
80
rateLimitMu sync.RWMutex
80
81
81
82
// Used to find the correct mutex for the pid, some pids can share the same mutex
82
- muIndex int
83
+ muIndex atomic. Uint64
83
84
muArray []* sync.RWMutex
84
85
}
85
86
@@ -185,7 +186,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
185
186
liveProcesses : make (map [uint32 ]struct {}),
186
187
rateLimiters : make (map [uint32 ]* rate.Limiter ),
187
188
pgStmts : make (map [string ]string ),
188
- muIndex : 0 ,
189
+ muIndex : atomic. Uint64 {} ,
189
190
muArray : nil ,
190
191
}
191
192
@@ -227,11 +228,10 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
227
228
a .muArray = make ([]* sync.RWMutex , countMuArray )
228
229
229
230
// set distinct mutex for every live process
230
- a .muIndex = 0
231
231
for pid := range a .liveProcesses {
232
- a .muArray [a .muIndex ] = & sync.RWMutex {}
233
- sockMaps [pid ].mu = a .muArray [a .muIndex ]
234
- a .muIndex ++
232
+ a .muArray [a .muIndex . Load () ] = & sync.RWMutex {}
233
+ sockMaps [pid ].mu = a .muArray [a .muIndex . Load () ]
234
+ a .muIndex . Add ( 1 )
235
235
a .getAlreadyExistingSockets (pid )
236
236
}
237
237
@@ -456,9 +456,16 @@ func (a *Aggregator) processExec(d *proc.ProcEvent) {
456
456
a .liveProcesses [d .Pid ] = struct {}{}
457
457
458
458
// create lock on demand
459
- a .muArray [a .muIndex % len (a .muArray )] = & sync.RWMutex {}
460
- a .muIndex ++
461
- a .clusterInfo .SocketMaps [d .Pid ].mu = a .muArray [a .muIndex % len (a .muArray )]
459
+ a .muArray [(a .muIndex .Load ())% uint64 (len (a .muArray ))] = & sync.RWMutex {}
460
+ a .muIndex .Add (1 )
461
+
462
+ // if duplicate exec event comes, underlying mutex will be changed
463
+ // if first assigned mutex is locked and another exec event comes, mutex will be changed
464
+ // and unlock of unlocked mutex now is a possibility
465
+ // to avoid this case, if a socket map already has a mutex, don't change it
466
+ if a .clusterInfo .SocketMaps [d .Pid ].mu == nil {
467
+ a .clusterInfo .SocketMaps [d .Pid ].mu = a .muArray [(a .muIndex .Load ())% uint64 (len (a .muArray ))]
468
+ }
462
469
}
463
470
464
471
func (a * Aggregator ) processExit (pid uint32 ) {
@@ -1325,7 +1332,7 @@ func (a *Aggregator) parseSqlCommand(d *l7_req.L7Event) (string, error) {
1325
1332
a .pgStmtsMu .RLock ()
1326
1333
query , ok := a .pgStmts [a .getPgStmtKey (d .Pid , d .Fd , stmtName )]
1327
1334
a .pgStmtsMu .RUnlock ()
1328
- if ! ok { // we don't have the query for the prepared statement
1335
+ if ! ok || query == "" { // we don't have the query for the prepared statement
1329
1336
// Execute (name of prepared statement) [(parameter)]
1330
1337
return fmt .Sprintf ("EXECUTE %s *values*" , stmtName ), nil
1331
1338
}
0 commit comments