@@ -39,8 +39,7 @@ type IngestorAPI struct {
39
39
Cfg * config.KubehoundConfig
40
40
providers * providers.ProvidersFactoryConfig
41
41
42
- mu * sync.RWMutex // mutex to sync write to the runIDs map
43
- runIDs map [string ]bool // runIDs map to monitor and avoid concurrency processing on the same runID
42
+ runIDs sync.Map // runIDs map to monitor and avoid concurrency processing on the same runID
44
43
}
45
44
46
45
var (
@@ -51,16 +50,12 @@ var (
51
50
52
51
func NewIngestorAPI (cfg * config.KubehoundConfig , puller puller.DataPuller , notifier notifier.Notifier ,
53
52
p * providers.ProvidersFactoryConfig ) * IngestorAPI {
54
- var mu sync.RWMutex
55
- var runIDs = make (map [string ]bool )
56
-
57
53
return & IngestorAPI {
58
54
notifier : notifier ,
59
55
puller : puller ,
60
56
Cfg : cfg ,
61
57
providers : p ,
62
- mu : & mu ,
63
- runIDs : runIDs ,
58
+ runIDs : sync.Map {},
64
59
}
65
60
}
66
61
@@ -306,24 +301,20 @@ func (g *IngestorAPI) Notify(ctx context.Context, clusterName string, runID stri
306
301
// Using a map to monitor all runIDs being processed,
307
302
// Using a mutex to write/read data to the runIDs map
308
303
func (g * IngestorAPI ) lockRunID (runID string ) error {
309
- g .mu .Lock ()
310
- defer g .mu .Unlock ()
311
- entry , ok := g .runIDs [runID ]
304
+ _ , ok := g .runIDs .Load (runID )
312
305
313
306
// If a runID is being processed, dropping the request
314
- if ok && entry {
307
+ if ok {
315
308
return fmt .Errorf ("%w [runID:%s]" , ErrCurrentlyIngesting , runID )
316
309
}
317
310
318
311
// Locking the current runID
319
- g .runIDs [ runID ] = true
312
+ g .runIDs . Store ( runID , true )
320
313
321
314
return nil
322
315
}
323
316
324
317
// Delocking the runID
325
318
func (g * IngestorAPI ) unlockRunID (runID string ) {
326
- g .mu .Lock ()
327
- g .runIDs [runID ] = false
328
- g .mu .Unlock ()
319
+ g .runIDs .Delete (runID )
329
320
}
0 commit comments