Skip to content

Commit 3748221

Browse files
committed
sstable/block: add Reader type
Move block-reading logic into a block.Reader type. This is in preparation for use of the block.Reader type within blob file readers. Informs #112.
1 parent 8d3363b commit 3748221

18 files changed

+380
-298
lines changed

db_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cockroachdb/pebble/internal/testutils"
3131
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
3232
"github.com/cockroachdb/pebble/sstable"
33+
"github.com/cockroachdb/pebble/sstable/block"
3334
"github.com/cockroachdb/pebble/vfs"
3435
"github.com/cockroachdb/pebble/vfs/errorfs"
3536
"github.com/cockroachdb/pebble/wal"
@@ -1417,7 +1418,7 @@ func (t *testTracer) IsTracingEnabled(ctx context.Context) bool {
14171418
}
14181419

14191420
func TestTracing(t *testing.T) {
1420-
defer sstable.DeterministicReadBlockDurationForTesting()()
1421+
defer block.DeterministicReadBlockDurationForTesting()()
14211422

14221423
var tracer testTracer
14231424
buf := &tracer.buf

metrics_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/cockroachdb/pebble/internal/manual"
2222
"github.com/cockroachdb/pebble/internal/testkeys"
2323
"github.com/cockroachdb/pebble/objstorage/remote"
24-
"github.com/cockroachdb/pebble/sstable"
2524
"github.com/cockroachdb/pebble/sstable/block"
2625
"github.com/cockroachdb/pebble/vfs"
2726
"github.com/cockroachdb/pebble/vfs/errorfs"
@@ -119,7 +118,7 @@ func TestMetrics(t *testing.T) {
119118
if runtime.GOARCH == "386" {
120119
t.Skip("skipped on 32-bit due to slightly varied output")
121120
}
122-
defer sstable.DeterministicReadBlockDurationForTesting()()
121+
defer block.DeterministicReadBlockDurationForTesting()()
123122

124123
var d *DB
125124
var iters map[string]*Iterator

sstable/block/block.go

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@
55
package block
66

77
import (
8+
"context"
89
"encoding/binary"
10+
"path/filepath"
11+
"runtime"
912
"time"
1013

1114
"github.com/cespare/xxhash/v2"
15+
"github.com/cockroachdb/crlib/crtime"
16+
"github.com/cockroachdb/crlib/fifo"
1217
"github.com/cockroachdb/errors"
1318
"github.com/cockroachdb/pebble/internal/base"
19+
"github.com/cockroachdb/pebble/internal/cache"
1420
"github.com/cockroachdb/pebble/internal/crc"
21+
"github.com/cockroachdb/pebble/internal/invariants"
22+
"github.com/cockroachdb/pebble/internal/sstableinternal"
23+
"github.com/cockroachdb/pebble/objstorage"
24+
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
1525
)
1626

1727
// Handle is the file offset and length of a block.
@@ -145,6 +155,26 @@ func (c *Checksummer) Checksum(block []byte, blockType byte) (checksum uint32) {
145155
return checksum
146156
}
147157

158+
// ValidateChecksum validates the checksum of a block.
159+
func ValidateChecksum(checksumType ChecksumType, b []byte, bh Handle) error {
160+
expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
161+
var computedChecksum uint32
162+
switch checksumType {
163+
case ChecksumTypeCRC32c:
164+
computedChecksum = crc.New(b[:bh.Length+1]).Value()
165+
case ChecksumTypeXXHash64:
166+
computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
167+
default:
168+
return errors.Errorf("unsupported checksum type: %d", checksumType)
169+
}
170+
if expectedChecksum != computedChecksum {
171+
return base.CorruptionErrorf("block %d/%d: %s checksum mismatch %x != %x",
172+
errors.Safe(bh.Offset), errors.Safe(bh.Length), checksumType,
173+
expectedChecksum, computedChecksum)
174+
}
175+
return nil
176+
}
177+
148178
// Metadata is an in-memory buffer that stores metadata for a block. It is
149179
// allocated together with the buffer storing the block and is initialized once
150180
// when the block is read from disk.
@@ -321,3 +351,284 @@ func (env *ReadEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
321351
env.IterStats.Accumulate(blockLength, 0, readDuration)
322352
}
323353
}
354+
355+
// A Reader reads blocks from a single file, handling caching, checksum
356+
// validation and decompression.
357+
type Reader struct {
358+
readable objstorage.Readable
359+
cacheOpts sstableinternal.CacheOptions
360+
loadBlockSema *fifo.Semaphore
361+
logger base.LoggerAndTracer
362+
checksumType ChecksumType
363+
}
364+
365+
// Init initializes the Reader to read blocks from the provided Readable.
366+
func (r *Reader) Init(
367+
readable objstorage.Readable,
368+
cacheOpts sstableinternal.CacheOptions,
369+
sema *fifo.Semaphore,
370+
logger base.LoggerAndTracer,
371+
checksumType ChecksumType,
372+
) {
373+
r.readable = readable
374+
r.cacheOpts = cacheOpts
375+
r.loadBlockSema = sema
376+
r.logger = logger
377+
r.checksumType = checksumType
378+
if r.cacheOpts.Cache == nil {
379+
r.cacheOpts.Cache = cache.New(0)
380+
} else {
381+
r.cacheOpts.Cache.Ref()
382+
}
383+
if r.cacheOpts.CacheID == 0 {
384+
r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
385+
}
386+
}
387+
388+
// FileNum returns the file number of the file being read.
389+
func (r *Reader) FileNum() base.DiskFileNum {
390+
return r.cacheOpts.FileNum
391+
}
392+
393+
// ChecksumType returns the checksum type used by the reader.
394+
func (r *Reader) ChecksumType() ChecksumType {
395+
return r.checksumType
396+
}
397+
398+
// Read reads the block referenced by the provided handle. The readHandle is
399+
// optional.
400+
func (r *Reader) Read(
401+
ctx context.Context,
402+
env ReadEnv,
403+
readHandle objstorage.ReadHandle,
404+
bh Handle,
405+
initBlockMetadataFn func(*Metadata, []byte) error,
406+
) (handle BufferHandle, _ error) {
407+
var cv *cache.Value
408+
var crh cache.ReadHandle
409+
hit := true
410+
if env.BufferPool == nil {
411+
var errorDuration time.Duration
412+
var err error
413+
cv, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle(
414+
ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
415+
if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) {
416+
r.logger.Eventf(
417+
ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String())
418+
}
419+
// TODO(sumeer): consider tracing when waited longer than some duration
420+
// for turn to do the read.
421+
if err != nil {
422+
return BufferHandle{}, err
423+
}
424+
} else {
425+
// The compaction path uses env.BufferPool, and does not coordinate read
426+
// using a cache.ReadHandle. This is ok since only a single compaction is
427+
// reading a block.
428+
cv = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
429+
if cv != nil {
430+
hit = true
431+
}
432+
}
433+
// INVARIANT: hit => cv != nil
434+
if cv != nil {
435+
if hit {
436+
// Cache hit.
437+
if readHandle != nil {
438+
readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+TrailerLen))
439+
}
440+
env.BlockServedFromCache(bh.Length)
441+
}
442+
if invariants.Enabled && crh.Valid() {
443+
panic("cache.ReadHandle must not be valid")
444+
}
445+
return CacheBufferHandle(cv), nil
446+
}
447+
448+
// Need to read. First acquire loadBlockSema, if needed.
449+
if sema := r.loadBlockSema; sema != nil {
450+
if err := sema.Acquire(ctx, 1); err != nil {
451+
// An error here can only come from the context.
452+
return BufferHandle{}, err
453+
}
454+
defer sema.Release(1)
455+
}
456+
value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
457+
if err != nil {
458+
if crh.Valid() {
459+
crh.SetReadError(err)
460+
}
461+
return BufferHandle{}, err
462+
}
463+
h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
464+
return h, nil
465+
}
466+
467+
// TODO(sumeer): should the threshold be configurable.
468+
const slowReadTracingThreshold = 5 * time.Millisecond
469+
470+
// doRead is a helper for Read that does the read, checksum check,
471+
// decompression, and returns either a Value or an error.
472+
func (r *Reader) doRead(
473+
ctx context.Context,
474+
env ReadEnv,
475+
readHandle objstorage.ReadHandle,
476+
bh Handle,
477+
initBlockMetadataFn func(*Metadata, []byte) error,
478+
) (Value, error) {
479+
compressed := Alloc(int(bh.Length+TrailerLen), env.BufferPool)
480+
readStopwatch := makeStopwatch()
481+
var err error
482+
if readHandle != nil {
483+
err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
484+
} else {
485+
err = r.readable.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
486+
}
487+
readDuration := readStopwatch.stop()
488+
// Call IsTracingEnabled to avoid the allocations of boxing integers into an
489+
// interface{}, unless necessary.
490+
if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
491+
_, file1, line1, _ := runtime.Caller(1)
492+
_, file2, line2, _ := runtime.Caller(2)
493+
r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
494+
int(bh.Length+TrailerLen), readDuration.String(),
495+
r.cacheOpts.FileNum,
496+
filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
497+
filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
498+
}
499+
if err != nil {
500+
compressed.Release()
501+
return Value{}, err
502+
}
503+
env.BlockRead(bh.Length, readDuration)
504+
if err = ValidateChecksum(r.checksumType, compressed.BlockData(), bh); err != nil {
505+
compressed.Release()
506+
err = errors.Wrapf(err, "pebble/table: table %s", r.cacheOpts.FileNum)
507+
return Value{}, err
508+
}
509+
typ := CompressionIndicator(compressed.BlockData()[bh.Length])
510+
compressed.Truncate(int(bh.Length))
511+
var decompressed Value
512+
if typ == NoCompressionIndicator {
513+
decompressed = compressed
514+
} else {
515+
// Decode the length of the decompressed value.
516+
decodedLen, prefixLen, err := DecompressedLen(typ, compressed.BlockData())
517+
if err != nil {
518+
compressed.Release()
519+
return Value{}, err
520+
}
521+
decompressed = Alloc(decodedLen, env.BufferPool)
522+
err = DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
523+
compressed.Release()
524+
if err != nil {
525+
decompressed.Release()
526+
return Value{}, err
527+
}
528+
}
529+
if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil {
530+
decompressed.Release()
531+
return Value{}, err
532+
}
533+
return decompressed, nil
534+
}
535+
536+
// Readable returns the underlying objstorage.Readable.
537+
//
538+
// Users should avoid accessing the underlying Readable if it can be avoided.
539+
func (r *Reader) Readable() objstorage.Readable {
540+
return r.readable
541+
}
542+
543+
// GetFromCache retrieves the block from the cache, if it is present.
544+
//
545+
// Users should prefer using Read, which handles reading from object storage on
546+
// a cache miss.
547+
func (r *Reader) GetFromCache(bh Handle) *cache.Value {
548+
return r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
549+
}
550+
551+
// UsePreallocatedReadHandle returns a ReadHandle that reads from the reader and
552+
// uses the provided preallocated read handle to back the read handle, avoiding
553+
// an unnecessary allocation.
554+
func (r *Reader) UsePreallocatedReadHandle(
555+
readBeforeSize objstorage.ReadBeforeSize, rh *objstorageprovider.PreallocatedReadHandle,
556+
) objstorage.ReadHandle {
557+
return objstorageprovider.UsePreallocatedReadHandle(r.readable, readBeforeSize, rh)
558+
}
559+
560+
// Close releases resources associated with the Reader.
561+
func (r *Reader) Close() error {
562+
r.cacheOpts.Cache.Unref()
563+
var err error
564+
if r.readable != nil {
565+
err = r.readable.Close()
566+
r.readable = nil
567+
}
568+
return err
569+
}
570+
571+
// ReadRaw reads len(buf) bytes from the provided Readable at the given offset
572+
// into buf. It's used to read the footer of a table.
573+
func ReadRaw(
574+
ctx context.Context,
575+
f objstorage.Readable,
576+
readHandle objstorage.ReadHandle,
577+
logger base.LoggerAndTracer,
578+
fileNum base.DiskFileNum,
579+
buf []byte,
580+
off int64,
581+
) ([]byte, error) {
582+
size := f.Size()
583+
if size < int64(len(buf)) {
584+
return nil, base.CorruptionErrorf("pebble/table: invalid table %s (file size is too small)", errors.Safe(fileNum))
585+
}
586+
587+
readStopwatch := makeStopwatch()
588+
var err error
589+
if readHandle != nil {
590+
err = readHandle.ReadAt(ctx, buf, off)
591+
} else {
592+
err = f.ReadAt(ctx, buf, off)
593+
}
594+
readDuration := readStopwatch.stop()
595+
// Call IsTracingEnabled to avoid the allocations of boxing integers into an
596+
// interface{}, unless necessary.
597+
if readDuration >= slowReadTracingThreshold && logger.IsTracingEnabled(ctx) {
598+
logger.Eventf(ctx, "reading footer of %d bytes took %s",
599+
len(buf), readDuration.String())
600+
}
601+
if err != nil {
602+
return nil, errors.Wrap(err, "pebble/table: invalid table (could not read footer)")
603+
}
604+
return buf, nil
605+
}
606+
607+
// DeterministicReadBlockDurationForTesting is for tests that want a
608+
// deterministic value of the time to read a block (that is not in the cache).
609+
// The return value is a function that must be called before the test exits.
610+
func DeterministicReadBlockDurationForTesting() func() {
611+
drbdForTesting := deterministicReadBlockDurationForTesting
612+
deterministicReadBlockDurationForTesting = true
613+
return func() {
614+
deterministicReadBlockDurationForTesting = drbdForTesting
615+
}
616+
}
617+
618+
var deterministicReadBlockDurationForTesting = false
619+
620+
type deterministicStopwatchForTesting struct {
621+
startTime crtime.Mono
622+
}
623+
624+
func makeStopwatch() deterministicStopwatchForTesting {
625+
return deterministicStopwatchForTesting{startTime: crtime.NowMono()}
626+
}
627+
628+
func (w deterministicStopwatchForTesting) stop() time.Duration {
629+
dur := w.startTime.Elapsed()
630+
if deterministicReadBlockDurationForTesting {
631+
dur = slowReadTracingThreshold
632+
}
633+
return dur
634+
}

sstable/colblk_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ func (w *RawColumnWriter) rewriteSuffixes(
10591059
// Copy over the filter block if it exists.
10601060
if w.filterBlock != nil {
10611061
if filterBlockBH, ok := l.FilterByName(w.filterBlock.metaName()); ok {
1062-
filterBlock, _, err := readBlockBuf(sstBytes, filterBlockBH, r.checksumType, nil)
1062+
filterBlock, _, err := readBlockBuf(sstBytes, filterBlockBH, r.blockReader.ChecksumType(), nil)
10631063
if err != nil {
10641064
return errors.Wrap(err, "reading filter")
10651065
}

sstable/copier.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func CopySpan(
8585
var preallocRH objstorageprovider.PreallocatedReadHandle
8686
// ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
8787
// and lower-level index blocks with one read.
88-
rh := objstorageprovider.UsePreallocatedReadHandle(
89-
r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
88+
rh := r.blockReader.UsePreallocatedReadHandle(
89+
objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
9090
defer rh.Close()
9191
rh.SetupForCompaction()
9292
indexH, err := r.readTopLevelIndexBlock(ctx, block.NoReadEnv, rh)
@@ -131,7 +131,7 @@ func CopySpan(
131131
var blocksNotInCache []indexEntry
132132

133133
for i := range blocks {
134-
cv := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
134+
cv := r.blockReader.GetFromCache(blocks[i].bh.Handle)
135135
if cv == nil {
136136
// Cache miss. Add this block to the list of blocks that are not in cache.
137137
blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]

0 commit comments

Comments
 (0)