Skip to content

Commit 07bda59

Browse files
committed
fix deadlock collecting large logs
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent 894ab41 commit 07bda59

File tree

3 files changed

+100
-60
lines changed

3 files changed

+100
-60
lines changed

pkg/compose/printer.go

+70-60
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,19 @@ type logPrinter interface {
3232
}
3333

3434
type printer struct {
35-
sync.Mutex
3635
queue chan api.ContainerEvent
3736
consumer api.LogConsumer
38-
stopped bool
37+
stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue
38+
stop sync.Once
3939
}
4040

4141
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
4242
func newLogPrinter(consumer api.LogConsumer) logPrinter {
43-
queue := make(chan api.ContainerEvent)
4443
printer := printer{
4544
consumer: consumer,
46-
queue: queue,
45+
queue: make(chan api.ContainerEvent),
46+
stopCh: make(chan struct{}),
47+
stop: sync.Once{},
4748
}
4849
return &printer
4950
}
@@ -54,24 +55,27 @@ func (p *printer) Cancel() {
5455
}
5556

5657
func (p *printer) Stop() {
57-
p.Lock()
58-
defer p.Unlock()
59-
if !p.stopped {
60-
// only close if this is the first call to stop
61-
p.stopped = true
62-
close(p.queue)
63-
}
58+
p.stop.Do(func() {
59+
close(p.stopCh)
60+
for {
61+
select {
62+
case <-p.queue:
63+
// purge the queue to free producers goroutines
64+
// p.queue will be garbage collected
65+
default:
66+
return
67+
}
68+
}
69+
})
6470
}
6571

6672
func (p *printer) HandleEvent(event api.ContainerEvent) {
67-
p.Lock()
68-
defer p.Unlock()
69-
if p.stopped {
70-
// prevent deadlocking, if the printer is done, there's no reader for
71-
// queue, so this write could block indefinitely
73+
select {
74+
case <-p.stopCh:
7275
return
76+
default:
77+
p.queue <- event
7378
}
74-
p.queue <- event
7579
}
7680

7781
//nolint:gocyclo
@@ -80,58 +84,64 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
8084
aborting bool
8185
exitCode int
8286
)
87+
defer p.Stop()
88+
8389
containers := map[string]struct{}{}
84-
for event := range p.queue {
85-
container, id := event.Container, event.ID
86-
switch event.Type {
87-
case api.UserCancel:
88-
aborting = true
89-
case api.ContainerEventAttach:
90-
if _, ok := containers[id]; ok {
91-
continue
92-
}
93-
containers[id] = struct{}{}
94-
p.consumer.Register(container)
95-
case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
96-
if !event.Restarting {
97-
delete(containers, id)
98-
}
99-
if !aborting {
100-
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
101-
if event.Type == api.ContainerEventRecreated {
102-
p.consumer.Status(container, "has been recreated")
90+
for {
91+
select {
92+
case <-p.stopCh:
93+
return exitCode, nil
94+
case event := <-p.queue:
95+
container, id := event.Container, event.ID
96+
switch event.Type {
97+
case api.UserCancel:
98+
aborting = true
99+
case api.ContainerEventAttach:
100+
if _, ok := containers[id]; ok {
101+
continue
102+
}
103+
containers[id] = struct{}{}
104+
p.consumer.Register(container)
105+
case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
106+
if !event.Restarting {
107+
delete(containers, id)
103108
}
104-
}
105-
if cascadeStop {
106109
if !aborting {
107-
aborting = true
108-
err := stopFn()
109-
if err != nil {
110-
return 0, err
110+
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
111+
if event.Type == api.ContainerEventRecreated {
112+
p.consumer.Status(container, "has been recreated")
111113
}
112114
}
113-
if event.Type == api.ContainerEventExit {
114-
if exitCodeFrom == "" {
115-
exitCodeFrom = event.Service
115+
if cascadeStop {
116+
if !aborting {
117+
aborting = true
118+
err := stopFn()
119+
if err != nil {
120+
return 0, err
121+
}
116122
}
117-
if exitCodeFrom == event.Service {
118-
exitCode = event.ExitCode
123+
if event.Type == api.ContainerEventExit {
124+
if exitCodeFrom == "" {
125+
exitCodeFrom = event.Service
126+
}
127+
if exitCodeFrom == event.Service {
128+
exitCode = event.ExitCode
129+
}
119130
}
120131
}
121-
}
122-
if len(containers) == 0 {
123-
// Last container terminated, done
124-
return exitCode, nil
125-
}
126-
case api.ContainerEventLog:
127-
if !aborting {
128-
p.consumer.Log(container, event.Line)
129-
}
130-
case api.ContainerEventErr:
131-
if !aborting {
132-
p.consumer.Err(container, event.Line)
132+
if len(containers) == 0 {
133+
// Last container terminated, done
134+
return exitCode, nil
135+
}
136+
case api.ContainerEventLog:
137+
if !aborting {
138+
p.consumer.Log(container, event.Line)
139+
}
140+
case api.ContainerEventErr:
141+
if !aborting {
142+
p.consumer.Err(container, event.Line)
143+
}
133144
}
134145
}
135146
}
136-
return exitCode, nil
137147
}

pkg/e2e/fixtures/logs-test/cat.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
services:
2+
test:
3+
image: alpine
4+
command: cat /text_file.txt
5+
volumes:
6+
- ${FILE}:/text_file.txt

pkg/e2e/logs_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package e2e
1818

1919
import (
20+
"fmt"
21+
"io"
22+
"os"
23+
"path/filepath"
2024
"strings"
2125
"testing"
2226
"time"
@@ -96,6 +100,26 @@ func TestLocalComposeLogsFollow(t *testing.T) {
96100
poll.WaitOn(t, expectOutput(res, "ping-2 "), poll.WithDelay(100*time.Millisecond), poll.WithTimeout(20*time.Second))
97101
}
98102

103+
func TestLocalComposeLargeLogs(t *testing.T) {
104+
const projectName = "compose-e2e-large_logs"
105+
file := filepath.Join(t.TempDir(), "large.txt")
106+
c := NewCLI(t, WithEnv("FILE="+file))
107+
t.Cleanup(func() {
108+
c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
109+
})
110+
111+
f, err := os.Create(file)
112+
assert.NilError(t, err)
113+
for i := 0; i < 300_000; i++ {
114+
_, err := io.WriteString(f, fmt.Sprintf("This is line %d in a laaaarge text file\n", i))
115+
assert.NilError(t, err)
116+
}
117+
assert.NilError(t, f.Close())
118+
119+
res := c.RunDockerComposeCmd(t, "-f", "./fixtures/logs-test/cat.yaml", "--project-name", projectName, "up", "--abort-on-container-exit")
120+
res.Assert(t, icmd.Expected{Out: "test-1 exited with code 0"})
121+
}
122+
99123
func expectOutput(res *icmd.Result, expected string) func(t poll.LogT) poll.Result {
100124
return func(t poll.LogT) poll.Result {
101125
if strings.Contains(res.Stdout(), expected) {

0 commit comments

Comments
 (0)