Skip to content

Commit c3da6bf

Browse files
authored
Buffer the output of gzip.Writer to avoid stalling (google#923)
Use a bufio.Writer to buffer gzipped output while we are reading from the other end of an io.Pipe to allow gzip to keep compressing its input. A 64K buffer was chosen for its humor value. The default size of bufio.Writer was too small when testing against a local registry. Increasing beyond 64K didn't seem to have any noticeable effect. It might make sense to make this smaller, but I don't see a reason to worry about it ATM.
1 parent 76199f1 commit c3da6bf

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

pkg/v1/internal/gzip/zip.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package gzip
1616

1717
import (
18+
"bufio"
1819
"bytes"
1920
"compress/gzip"
2021
"io"
@@ -38,11 +39,19 @@ func ReadCloser(r io.ReadCloser) io.ReadCloser {
3839
func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
3940
pr, pw := io.Pipe()
4041

42+
// For highly compressible layers, gzip.Writer will output a very small
43+
// number of bytes per Write(). This is normally fine, but when pushing
44+
// to a registry, we want to ensure that we're taking full advantage of
45+
// the available bandwidth instead of sending tons of tiny writes over
46+
// the wire.
47+
// 64K ought to be small enough for anybody.
48+
bw := bufio.NewWriterSize(pw, 2<<16)
49+
4150
// Returns err so we can pw.CloseWithError(err)
4251
go func() error {
4352
// TODO(go1.14): Just defer {pw,gw,r}.Close like you'd expect.
4453
// Context: https://golang.org/issue/24283
45-
gw, err := gzip.NewWriterLevel(pw, level)
54+
gw, err := gzip.NewWriterLevel(bw, level)
4655
if err != nil {
4756
return pw.CloseWithError(err)
4857
}
@@ -52,9 +61,20 @@ func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
5261
defer gw.Close()
5362
return pw.CloseWithError(err)
5463
}
64+
65+
// Close gzip writer to Flush it and write gzip trailers.
66+
if err := gw.Close(); err != nil {
67+
return pw.CloseWithError(err)
68+
}
69+
70+
// Flush bufio writer to ensure we write out everything.
71+
if err := bw.Flush(); err != nil {
72+
return pw.CloseWithError(err)
73+
}
74+
75+
// We dont' really care if these fail.
5576
defer pw.Close()
5677
defer r.Close()
57-
defer gw.Close()
5878

5979
return nil
6080
}()

pkg/v1/stream/layer.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package stream
1616

1717
import (
18+
"bufio"
1819
"compress/gzip"
1920
"crypto/sha256"
2021
"encoding/hex"
@@ -130,6 +131,7 @@ type compressedReader struct {
130131

131132
h, zh hash.Hash // collects digests of compressed and uncompressed stream.
132133
pr io.Reader
134+
bw *bufio.Writer
133135
count *countWriter
134136

135137
l *Layer // stream.Layer to update upon Close.
@@ -144,14 +146,22 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
144146
// capture compressed digest, and a countWriter to capture compressed
145147
// size.
146148
pr, pw := io.Pipe()
147-
zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), l.compression)
149+
150+
// Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count.
151+
mw := io.MultiWriter(pw, zh, count)
152+
153+
// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
154+
// 64K ought to be small enough for anybody.
155+
bw := bufio.NewWriterSize(mw, 2<<16)
156+
zw, err := gzip.NewWriterLevel(bw, l.compression)
148157
if err != nil {
149158
return nil, err
150159
}
151160

152161
cr := &compressedReader{
153162
closer: newMultiCloser(zw, l.blob),
154163
pr: pr,
164+
bw: bw,
155165
h: h,
156166
zh: zh,
157167
count: count,
@@ -183,6 +193,11 @@ func (cr *compressedReader) Close() error {
183193
return err
184194
}
185195

196+
// Flush the buffer.
197+
if err := cr.bw.Flush(); err != nil {
198+
return err
199+
}
200+
186201
diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil)))
187202
if err != nil {
188203
return err

0 commit comments

Comments
 (0)