From f1b7b37fb83937d5fad90d7b6b52f4a38823da9e Mon Sep 17 00:00:00 2001 From: Matthis Date: Sat, 20 Jul 2024 07:58:26 +0200 Subject: [PATCH] feat: custom analysis paralelism (#1203) --- pkg/analysis/analysis.go | 50 ++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/pkg/analysis/analysis.go b/pkg/analysis/analysis.go index d8c9b59d40..80b160862e 100644 --- a/pkg/analysis/analysis.go +++ b/pkg/analysis/analysis.go @@ -164,23 +164,39 @@ func (a *Analysis) RunCustomAnalysis() { var customAnalyzers []custom.CustomAnalyzer if err := viper.UnmarshalKey("custom_analyzers", &customAnalyzers); err != nil { a.Errors = append(a.Errors, err.Error()) + return } + semaphore := make(chan struct{}, a.MaxConcurrency) + var wg sync.WaitGroup + var mutex sync.Mutex for _, cAnalyzer := range customAnalyzers { + wg.Add(1) + semaphore <- struct{}{} + go func(analyzer custom.CustomAnalyzer, wg *sync.WaitGroup, semaphore chan struct{}) { + defer wg.Done() + canClient, err := custom.NewClient(cAnalyzer.Connection) + if err != nil { + mutex.Lock() + a.Errors = append(a.Errors, fmt.Sprintf("Client creation error for %s analyzer", cAnalyzer.Name)) + mutex.Unlock() + return + } - canClient, err := custom.NewClient(cAnalyzer.Connection) - if err != nil { - a.Errors = append(a.Errors, fmt.Sprintf("Client creation error for %s analyzer", cAnalyzer.Name)) - continue - } - - result, err := canClient.Run() - if err != nil { - a.Errors = append(a.Errors, fmt.Sprintf("[%s] %s", cAnalyzer.Name, err)) - } else { - a.Results = append(a.Results, result) - } + result, err := canClient.Run() + if err != nil { + mutex.Lock() + a.Errors = append(a.Errors, fmt.Sprintf("[%s] %s", cAnalyzer.Name, err)) + mutex.Unlock() + } else { + mutex.Lock() + a.Results = append(a.Results, result) + mutex.Unlock() + } + <-semaphore + }(cAnalyzer, &wg, semaphore) } + wg.Wait() } func (a *Analysis) RunAnalysis() { @@ -209,10 +225,10 @@ func (a *Analysis) RunAnalysis() { } semaphore := make(chan struct{}, a.MaxConcurrency) + var wg sync.WaitGroup + var mutex sync.Mutex // if there are no filters selected and no active_filters then run coreAnalyzer if len(a.Filters) == 0 && len(activeFilters) == 0 { - var wg sync.WaitGroup - var mutex sync.Mutex for _, analyzer := range coreAnalyzerMap { wg.Add(1) semaphore <- struct{}{} @@ -234,11 +250,8 @@ func (a *Analysis) RunAnalysis() { wg.Wait() return } - semaphore = make(chan struct{}, a.MaxConcurrency) // if the filters flag is specified if len(a.Filters) != 0 { - var wg sync.WaitGroup - var mutex sync.Mutex for _, filter := range a.Filters { if analyzer, ok := analyzerMap[filter]; ok { semaphore <- struct{}{} @@ -264,9 +277,6 @@ func (a *Analysis) RunAnalysis() { return } - var wg sync.WaitGroup - var mutex sync.Mutex - semaphore = make(chan struct{}, a.MaxConcurrency) // use active_filters for _, filter := range activeFilters { if analyzer, ok := analyzerMap[filter]; ok {