Skip to content

Commit 49d809a

Browse files
committed
parallel process events in spawn
1 parent 9f57de9 commit 49d809a

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

pipeline/processor.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pipeline
22

33
import (
4+
"sync"
5+
46
"github.com/ozontech/file.d/logger"
57
insaneJSON "github.com/vitkovskii/insane-json"
68
"go.uber.org/atomic"
@@ -397,6 +399,9 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
397399
parent.SetChildParentKind()
398400
nextActionIdx := parent.action + 1
399401

402+
wg := &sync.WaitGroup{}
403+
results := make(chan *Event)
404+
400405
for _, node := range nodes {
401406
// we can't reuse parent event (using insaneJSON.Root{Node: child}
402407
// because of nil decoder
@@ -409,12 +414,25 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
409414
child.SetChildKind()
410415
child.action = nextActionIdx
411416

412-
ok, _ := p.doActions(child)
413-
if ok {
414-
child.stage = eventStageOutput
415-
p.output.Out(child)
416-
}
417-
child.Root.ReleaseBufMem()
417+
wg.Add(1)
418+
go func(child *Event) {
419+
defer wg.Done()
420+
ok, _ := p.doActions(child)
421+
if ok {
422+
results <- child
423+
}
424+
}(child)
425+
}
426+
427+
go func() {
428+
wg.Wait()
429+
close(results)
430+
}()
431+
432+
for child := range results {
433+
child.stage = eventStageOutput
434+
p.output.Out(child)
435+
child.Root.ReleaseMem()
418436
}
419437

420438
if p.busyActionsTotal == 0 {

0 commit comments

Comments
 (0)