Skip to content
This repository was archived by the owner on Oct 13, 2023. It is now read-only.

Commit 158808d

Browse files
authored
Upstream-commit: 788f2883d285fc997d25cd27235c5b17cf3c0947
Component: engine
2 parents bc88d1b + e34fcd7 commit 158808d

File tree

2 files changed

+109
-5
lines changed

2 files changed

+109
-5
lines changed

components/engine/daemon/logger/jsonfilelog/read.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
6161
}
6262

6363
type decoder struct {
64-
rdr io.Reader
65-
dec *json.Decoder
66-
jl *jsonlog.JSONLog
64+
rdr io.Reader
65+
dec *json.Decoder
66+
jl *jsonlog.JSONLog
67+
maxRetry int
6768
}
6869

6970
func (d *decoder) Reset(rdr io.Reader) {
@@ -87,7 +88,11 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
8788
if d.jl == nil {
8889
d.jl = &jsonlog.JSONLog{}
8990
}
90-
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
91+
if d.maxRetry == 0 {
92+
// We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing.
93+
d.maxRetry = maxJSONDecodeRetry
94+
}
95+
for retries := 0; retries < d.maxRetry; retries++ {
9196
msg, err = decodeLogLine(d.dec, d.jl)
9297
if err == nil || err == io.EOF {
9398
break
@@ -105,14 +110,54 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
105110
// If the json logger writes a partial json log entry to the disk
106111
// while at the same time the decoder tries to decode it, the race condition happens.
107112
if err == io.ErrUnexpectedEOF {
108-
d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr)
113+
d.rdr = combineReaders(d.dec.Buffered(), d.rdr)
109114
d.dec = json.NewDecoder(d.rdr)
110115
continue
111116
}
112117
}
113118
return msg, err
114119
}
115120

121+
func combineReaders(pre, rdr io.Reader) io.Reader {
122+
return &combinedReader{pre: pre, rdr: rdr}
123+
}
124+
125+
// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF.
126+
// Once `io.MultiReader` returns EOF, it is always EOF.
127+
//
128+
// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later.
129+
// As such, io.MultiReader does not work for us.
130+
type combinedReader struct {
131+
pre io.Reader
132+
rdr io.Reader
133+
}
134+
135+
func (r *combinedReader) Read(p []byte) (int, error) {
136+
var read int
137+
if r.pre != nil {
138+
n, err := r.pre.Read(p)
139+
if err != nil {
140+
if err != io.EOF {
141+
return n, err
142+
}
143+
r.pre = nil
144+
}
145+
read = n
146+
}
147+
148+
if read < len(p) {
149+
n, err := r.rdr.Read(p[read:])
150+
if n > 0 {
151+
read += n
152+
}
153+
if err != nil {
154+
return read, err
155+
}
156+
}
157+
158+
return read, nil
159+
}
160+
116161
// decodeFunc is used to create a decoder for the log file reader
117162
func decodeFunc(rdr io.Reader) loggerutils.Decoder {
118163
return &decoder{

components/engine/daemon/logger/jsonfilelog/read_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
22

33
import (
44
"bytes"
5+
"encoding/json"
56
"io"
67
"testing"
78
"time"
@@ -93,3 +94,61 @@ func TestEncodeDecode(t *testing.T) {
9394
_, err = dec.Decode()
9495
assert.Assert(t, err == io.EOF)
9596
}
97+
98+
func TestUnexpectedEOF(t *testing.T) {
99+
buf := bytes.NewBuffer(nil)
100+
msg1 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello1")}
101+
msg2 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello2")}
102+
103+
err := marshalMessage(msg1, json.RawMessage{}, buf)
104+
assert.NilError(t, err)
105+
err = marshalMessage(msg2, json.RawMessage{}, buf)
106+
assert.NilError(t, err)
107+
108+
r := &readerWithErr{
109+
err: io.EOF,
110+
after: buf.Len() / 4,
111+
r: buf,
112+
}
113+
dec := &decoder{rdr: r, maxRetry: 1}
114+
115+
_, err = dec.Decode()
116+
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
117+
// again just to check
118+
_, err = dec.Decode()
119+
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
120+
121+
// reset the error
122+
// from here all reads should succeed until we get EOF on the underlying reader
123+
r.err = nil
124+
125+
msg, err := dec.Decode()
126+
assert.NilError(t, err)
127+
assert.Equal(t, string(msg1.Line)+"\n", string(msg.Line))
128+
129+
msg, err = dec.Decode()
130+
assert.NilError(t, err)
131+
assert.Equal(t, string(msg2.Line)+"\n", string(msg.Line))
132+
133+
_, err = dec.Decode()
134+
assert.Error(t, err, io.EOF.Error())
135+
}
136+
137+
type readerWithErr struct {
138+
err error
139+
after int
140+
r io.Reader
141+
read int
142+
}
143+
144+
func (r *readerWithErr) Read(p []byte) (int, error) {
145+
if r.err != nil && r.read > r.after {
146+
return 0, r.err
147+
}
148+
149+
n, err := r.r.Read(p[:1])
150+
if n > 0 {
151+
r.read += n
152+
}
153+
return n, err
154+
}

0 commit comments

Comments
 (0)