diff --git a/pkg/search/proxy/store/util.go b/pkg/search/proxy/store/util.go index 45e9fe281beb..9a5ce6833222 100644 --- a/pkg/search/proxy/store/util.go +++ b/pkg/search/proxy/store/util.go @@ -193,9 +193,21 @@ func (w *watchMux) AddSource(watcher watch.Interface, decorator func(watch.Event // Start run the watcher func (w *watchMux) Start() { - for _, source := range w.sources { - go w.startWatchSource(source.watcher, source.decorator) + wg := sync.WaitGroup{} + for i := range w.sources { + source := w.sources[i] + wg.Add(1) + go func() { + defer wg.Done() + w.startWatchSource(source.watcher, source.decorator) + }() } + + go func() { + // close result chan after all goroutines exit, avoiding data race. + defer close(w.result) + wg.Wait() + }() } // ResultChan implements watch.Interface @@ -218,7 +230,6 @@ func (w *watchMux) Stop() { case <-w.done: default: close(w.done) - close(w.result) } } @@ -244,19 +255,8 @@ func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch select { case <-w.done: return - default: + case w.result <- copyEvent: } - - func() { - w.lock.RLock() - defer w.lock.RUnlock() - select { - case <-w.done: - return - default: - w.result <- copyEvent - } - }() } }