Skip to content

Commit

Permalink
sstable: always use target buffer for CompressAndChecksum
Browse files Browse the repository at this point in the history
Currently `CompressAndChecksum` can alias the original data buffer if
we do not compress the data (either because compression is disabled,
or the data was not compressible enough). In most cases, we write out
the resulting data which can mangle the buffer. This leads most
callers to check if the buffer is not compressed and make a copy.

This change moves the copy into `CompressAndChecksum`; we always use
the dst buffer, even if we don't compress. This simplifies the callers
and makes things less fragile.
  • Loading branch information
RaduBerinde committed Jan 30, 2025
1 parent 310fac7 commit e02e7e0
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 82 deletions.
59 changes: 19 additions & 40 deletions sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,6 @@ func (b PhysicalBlock) Clone() PhysicalBlock {
return PhysicalBlock{data: data, trailer: b.trailer}
}

// CloneUsingBuf makes a copy of the block data, using the given slice if it has
// enough capacity.
func (b PhysicalBlock) CloneUsingBuf(buf []byte) (_ PhysicalBlock, newBuf []byte) {
newBuf = append(buf[:0], b.data...)
return PhysicalBlock{data: newBuf, trailer: b.trailer}, newBuf
}

// IsCompressed returns true if the block is compressed.
func (b *PhysicalBlock) IsCompressed() bool {
return CompressionIndicator(b.trailer[0]) != NoCompressionIndicator
}

// WriteTo writes the block (including its trailer) to the provided Writable. If
// err == nil, n is the number of bytes successfully written to the Writable.
//
Expand All @@ -247,35 +235,37 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
}

// CompressAndChecksum compresses and checksums the provided block, returning
// the compressed block and its trailer. The dst argument is used for the
// compressed payload if it's sufficiently large. If it's not, a new buffer is
// allocated and *dst is updated to point to it.
// the compressed block and its trailer. The result is appended to the dst
// argument.
//
// If the compressed block is not sufficiently smaller than the original block,
// the compressed payload is discarded and the original, uncompressed block is
// used to avoid unnecessary decompression overhead at read time.
// the compressed payload is discarded and the original, uncompressed block data
// is used to avoid unnecessary decompression overhead at read time.
func CompressAndChecksum(
dst *[]byte, block []byte, compression Compression, checksummer *Checksummer,
dst *[]byte, blockData []byte, compression Compression, checksummer *Checksummer,
) PhysicalBlock {
buf := (*dst)[:0]
// Compress the buffer, discarding the result if the improvement isn't at
// least 12.5%.
algo := NoCompressionIndicator
if compression != NoCompression {
var compressed []byte
algo, compressed = compress(compression, block, *dst)
if algo != NoCompressionIndicator && cap(compressed) > cap(*dst) {
*dst = compressed[:cap(compressed)]
}
if len(compressed) < len(block)-len(block)/8 {
block = compressed
} else {
algo, buf = compress(compression, blockData, buf)
if len(buf) >= len(blockData)-len(blockData)/8 {
algo = NoCompressionIndicator
}
}
if algo == NoCompressionIndicator {
// We don't want to use the given blockData buffer directly: typically the
// result will be written to disk and that can mangle the buffer, leading to
// fragile code.
buf = append(buf[:0], blockData...)
}

*dst = buf

// Calculate the checksum.
pb := PhysicalBlock{data: block}
checksum := checksummer.Checksum(block, byte(algo))
pb := PhysicalBlock{data: buf}
checksum := checksummer.Checksum(buf, byte(algo))
pb.trailer = MakeTrailer(byte(algo), checksum)
return pb
}
Expand Down Expand Up @@ -375,19 +365,8 @@ func (b *Buffer) CompressAndChecksum() (PhysicalBlock, *BufHandle) {
// Grab a buffer to use as the destination for compression.
compressedBuf := compressedBuffers.Get()
pb := CompressAndChecksum(&compressedBuf.b, b.h.b, b.compression, &b.checksummer)
if pb.IsCompressed() {
// Compression was fruitful, and pb's data points into compressedBuf. We
// can reuse b.Buffer because we've copied the compressed data.
b.h.b = b.h.b[:0]
return pb, compressedBuf
}
// Compression was not fruitful, and pb's data points into b.h. The
// compressedBuf we retrieved from the pool isn't needed, but our b.h is.
// Use the compressedBuf as the new b.h.
pbHandle := b.h
b.h = compressedBuf
b.h.b = b.h.b[:0]
return pb, pbHandle
return pb, compressedBuf
}

// SetCompression changes the compression algorithm used by CompressAndChecksum.
Expand Down
16 changes: 0 additions & 16 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,14 +667,6 @@ func (w *RawColumnWriter) enqueueDataBlock(
w.opts.Compression,
&cb.blockBuf.checksummer,
)
if !cb.physical.IsCompressed() {
// If the block isn't compressed, cb.physical's underlying data points
// directly into a buffer owned by w.dataBlock. Clone it before passing
// it to the write queue to be asynchronously written to disk.
// TODO(jackson): Should we try to avoid this clone by tracking the
// lifetime of the DataBlockWriters?
cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
}
return w.enqueuePhysicalBlock(cb, separator)
}

Expand Down Expand Up @@ -1164,14 +1156,6 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
w.opts.Compression,
&cb.blockBuf.checksummer,
)
if !cb.physical.IsCompressed() {
// If the block isn't compressed, cb.physical's underlying data points
// directly into a buffer owned by w.dataBlock. Clone it before passing
// it to the write queue to be asynchronously written to disk.
// TODO(jackson): Should we try to avoid this clone by tracking the
// lifetime of the DataBlockWriters?
cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf)
}
if err := w.enqueuePhysicalBlock(cb, sep); err != nil {
return err
}
Expand Down
23 changes: 2 additions & 21 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/sstableinternal"
"github.com/cockroachdb/pebble/internal/treeprinter"
"github.com/cockroachdb/pebble/objstorage"
Expand Down Expand Up @@ -675,8 +674,7 @@ func (w *layoutWriter) Abort() {
}

// WriteDataBlock constructs a trailer for the provided data block and writes
// the block and trailer to the writer. It returns the block's handle. It can
// mangle b.
// the block and trailer to the writer. It returns the block's handle.
func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
return w.writeBlock(b, w.compression, buf)
}
Expand All @@ -692,8 +690,6 @@ func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (blo
// second-level) and writes the block and trailer to the writer. It remembers
// the last-written index block's handle and adds it to the file's meta index
// when the writer is finished.
//
// WriteIndexBlock can mangle b.
func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
h, err := w.writeBlock(b, w.compression, &w.buf)
if err == nil {
Expand All @@ -716,17 +712,13 @@ func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err er
// WritePropertiesBlock constructs a trailer for the provided properties block
// and writes the block and trailer to the writer. It automatically adds the
// properties block to the file's meta index when the writer is finished.
//
// WritePropertiesBlock can mangle b.
func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
return w.writeNamedBlock(b, metaPropertiesName)
}

// WriteRangeKeyBlock constructs a trailer for the provided range key block and
// writes the block and trailer to the writer. It automatically adds the range
// key block to the file's meta index when the writer is finished.
//
// WriteRangeKeyBlock can mangle the block data.
func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
return w.writeNamedBlock(b, metaRangeKeyName)
}
Expand All @@ -735,13 +727,10 @@ func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
// block and writes the block and trailer to the writer. It automatically adds
// the range deletion block to the file's meta index when the writer is
// finished.
//
// WriteRangeDeletionBlock can mangle the block data.
func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
return w.writeNamedBlock(b, metaRangeDelV2Name)
}

// writeNamedBlock can mangle the block data.
func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
if err == nil {
Expand Down Expand Up @@ -770,20 +759,12 @@ func (w *layoutWriter) WriteValueIndexBlock(
return h, nil
}

// writeBlock checksums, compresses, and writes out a block. It can mangle b.
// writeBlock checksums, compresses, and writes out a block.
func (w *layoutWriter) writeBlock(
b []byte, compression block.Compression, buf *blockBuf,
) (block.Handle, error) {
pb := block.CompressAndChecksum(&buf.dataBuf, b, compression, &buf.checksummer)
h, err := w.writePrecompressedBlock(pb)
// This method is allowed to mangle b, but that only happens when the block
// data is not compressible. Mangle it anyway in invariant builds to catch
// callers that don't handle this.
if invariants.Enabled && invariants.Sometimes(1) {
for i := range b {
b[i] = 0xFF
}
}
return h, err
}

Expand Down
6 changes: 1 addition & 5 deletions sstable/rowblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,13 +1968,9 @@ func (w *RawRowWriter) addDataBlock(b, sep []byte, bhp block.HandleWithPropertie
w.layout.compression,
&blockBuf.checksummer,
)
if !pb.IsCompressed() {
// If the block isn't compressed, pb's underlying data points
// directly b. Clone it before writing it, as writing can mangle the buffer.
pb, blockBuf.dataBuf = pb.CloneUsingBuf(blockBuf.dataBuf)
}

// layout.WriteDataBlock keeps layout.offset up-to-date for us.
// Note that this can mangle the pb data.
bh, err := w.layout.writePrecompressedBlock(pb)
if err != nil {
return err
Expand Down

0 comments on commit e02e7e0

Please sign in to comment.