Skip to content

Commit ce9b648

Browse files
committed
sstable/block: fix Buffer growth bug
Fix a bug whereby Buffer.AppendValue would fail to sufficiently grow the capacity of its backing byte slice. Previously, AppendValue used cap(buf) when calculating the argument to slices.Grow, but slices.Grow expects its argument to be in terms of the slice's current length, not capacity. Add a focused randomized test that catches the bug. Also, add invariant assertions around a BufHandle's existence in or out of a pool. Fix #4247.
1 parent ccf7577 commit ce9b648

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

sstable/block/compression.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func (b *Buffer) Append(v []byte) int {
338338
for size < newLen {
339339
size *= 2
340340
}
341-
b.h.b = slices.Grow(b.h.b, size-cap(b.h.b))
341+
b.h.b = slices.Grow(b.h.b, size-len(b.h.b))
342342
}
343343
b.h.b = b.h.b[:newLen]
344344
if n := copy(b.h.b[off:], v); n != len(v) {
@@ -411,10 +411,10 @@ func (h *BufHandle) Release() {
411411
panic(errors.AssertionFailedf("pool has no maximum size"))
412412
}
413413
// Note we avoid releasing buffers that are larger than the configured
414-
// maximum to the pool. This avoids holding on to occassional large buffers
415-
// necesary for, for example, single large values.
414+
// maximum to the pool. This avoids holding on to occasional large buffers
415+
// necessary for, for example, singlular large values.
416416
if h.b != nil && len(h.b) < h.pool.Max {
417-
if invariants.Enabled {
417+
if invariants.Sometimes(50) {
418418
// Set the bytes to a random value. Cap the number of bytes being
419419
// randomized to prevent test timeouts.
420420
l := min(cap(h.b), 1000)
@@ -423,7 +423,7 @@ func (h *BufHandle) Release() {
423423
h.b[j] = byte(rand.Uint32())
424424
}
425425
}
426-
h.pool.pool.Put(h)
426+
h.pool.Put(h)
427427
}
428428
}
429429

@@ -446,12 +446,29 @@ type bufferSyncPool struct {
446446
pool sync.Pool
447447
}
448448

449+
// Put returns a buffer to the pool. While the buffer is in the pool, its pool
450+
// member is zeroed. This is used to validate invariants around double use of a
451+
// buffer.
452+
func (p *bufferSyncPool) Put(bh *BufHandle) {
453+
if bh.pool != p {
454+
panic(errors.AssertionFailedf("buffer has pool %v; trying to return it to pool %v", bh.pool, p))
455+
}
456+
bh.pool = nil
457+
p.pool.Put(bh)
458+
}
459+
449460
// Get retrieves a new buf from the pool, or allocates one of the configured
450461
// default size if the pool is empty.
451462
func (p *bufferSyncPool) Get() *BufHandle {
452463
v := p.pool.Get()
453464
if v != nil {
454-
return v.(*BufHandle)
465+
bh := v.(*BufHandle)
466+
if bh.pool != nil {
467+
panic(errors.AssertionFailedf("buffer has a pool; was it inserted into a pool twice?"))
468+
}
469+
// Set the pool so we know where to return the buffer to.
470+
bh.pool = p
471+
return bh
455472
}
456473
if invariants.Enabled && p.Default == 0 {
457474
// Guard against accidentally forgetting to initialize a buffer sync pool.

sstable/block/compression_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package block
66

77
import (
88
"encoding/binary"
9+
"fmt"
910
"math/rand/v2"
1011
"testing"
1112
"time"
@@ -92,3 +93,45 @@ func decompress(algo CompressionIndicator, b []byte) (*cache.Value, error) {
9293
}
9394
return decoded, nil
9495
}
96+
97+
func TestBufferRandomized(t *testing.T) {
98+
seed := uint64(time.Now().UnixNano())
99+
t.Logf("seed %d", seed)
100+
rng := rand.New(rand.NewPCG(0, seed))
101+
102+
var b Buffer
103+
b.Init(SnappyCompression, ChecksumTypeCRC32c)
104+
defer b.Release()
105+
vbuf := make([]byte, 0, 1<<10) // 1 KiB
106+
107+
for i := 0; i < 25; i++ {
108+
t.Run(fmt.Sprintf("iteration %d", i), func(t *testing.T) {
109+
// Randomly release and reinitialize the buffer.
110+
if rng.IntN(5) == 1 {
111+
b.Release()
112+
b.Init(SnappyCompression, ChecksumTypeCRC32c)
113+
}
114+
115+
aggregateSizeOfKVs := rng.IntN(4<<20-(1<<10)) + 1<<10 // [1 KiB, 4 MiB)
116+
size := 0
117+
for b.Size() < aggregateSizeOfKVs {
118+
vlen := rng.IntN(aggregateSizeOfKVs-b.Size()) + 1
119+
if cap(vbuf) < vlen {
120+
vbuf = make([]byte, vlen)
121+
} else {
122+
vbuf = vbuf[:vlen]
123+
}
124+
for i := range vbuf {
125+
vbuf[i] = byte(rng.Uint32())
126+
}
127+
b.Append(vbuf)
128+
size += vlen
129+
require.Equal(t, size, b.Size())
130+
s := b.Get()
131+
require.Equal(t, vbuf, s[len(s)-len(vbuf):])
132+
}
133+
_, bh := b.CompressAndChecksum()
134+
bh.Release()
135+
})
136+
}
137+
}

0 commit comments

Comments
 (0)