Skip to content

Commit

Permalink
db: use BufferPool on external iterators
Browse files Browse the repository at this point in the history
Use a block.BufferPool to hold on to allocated block buffers during iteration
of external sstables. During external iteration, blocks read aren't inserted
into a shared block cache. Previously this meant that every block read
allocated a new buffer for the block. Using a small buffer pool allows block
reads to reuse buffers over the life of the external iterator, reducing CPU
from requesting memory from the memory allocator. Additionally, plumbing a
buffer pool means the block reader won't attempt to store the block in the
zero-sized, ephemeral block cache used by the external iterator.

```
goos: darwin
goarch: arm64
pkg: github.com/cockroachdb/pebble
cpu: Apple M1 Pro
                                                          │   old.txt   │               new.txt               │
                                                          │   sec/op    │   sec/op     vs base                │
ExternalIter_NonOverlapping_Scan/keys=100/files=1-10        34.05µ ± 1%   27.64µ ± 1%  -18.81% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=100/files=10-10       121.3µ ± 1%   112.8µ ± 1%   -6.96% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=100/files=100-10      1.045m ± 2%   1.016m ± 2%   -2.77% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=10000/files=1-10      2.413m ± 1%   1.754m ± 1%  -27.32% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=10000/files=10-10     2.756m ± 4%   2.172m ± 8%  -21.18% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=10000/files=100-10    4.068m ± 2%   3.671m ± 1%   -9.76% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=100000/files=1-10     24.42m ± 1%   17.72m ± 2%  -27.46% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=100000/files=10-10    27.39m ± 1%   21.11m ± 2%  -22.92% (p=0.000 n=10)
ExternalIter_NonOverlapping_Scan/keys=100000/files=100-10   32.01m ± 1%   27.68m ± 2%  -13.50% (p=0.000 n=10)
geomean                                                     2.385m        1.976m       -17.16%
```
  • Loading branch information
jbowens committed Feb 1, 2025
1 parent 01687f6 commit 9173e12
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 39 deletions.
74 changes: 49 additions & 25 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func NewExternalIterWithContext(
}
}

ro := o.MakeReaderOptions()
var readers [][]*sstable.Reader
for _, levelFiles := range files {
subReaders, err := openExternalTables(ctx, levelFiles, o.MakeReaderOptions())
subReaders, err := openExternalTables(ctx, levelFiles, ro)
readers = append(readers, subReaders)
if err != nil {
// Close all the opened readers.
Expand All @@ -75,9 +76,9 @@ func NewExternalIterWithContext(
prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
boundsBuf: buf.boundsBuf,
batch: nil,
// Add the readers to the Iterator so that Close closes them, and
// SetOptions can re-construct iterators from them.
externalReaders: readers,
// Add the external iter state to the Iterator so that Close closes it,
// and SetOptions can re-construct iterators using its state.
externalIter: &externalIterState{readers: readers},
newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
internalIterOpts, iterKinds) (iterSet, error) {
// NB: External iterators are currently constructed without any
Expand All @@ -90,6 +91,8 @@ func NewExternalIterWithContext(
},
seqNum: base.SeqNumMax,
}
dbi.externalIter.bufferPool.Init(2)

if iterOpts != nil {
dbi.opts = *iterOpts
dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
Expand All @@ -101,6 +104,25 @@ func NewExternalIterWithContext(
return dbi, nil
}

// externalIterState encapsulates state that is specific to external iterators.
// An external *pebble.Iterator maintains a pointer to the externalIterState and
// calls Close when the Iterator is Closed, providing an opportuntity for the
// external iterator to release resources particular to external iterators.
type externalIterState struct {
bufferPool block.BufferPool
readers [][]*sstable.Reader
}

func (e *externalIterState) Close() (err error) {
for _, readers := range e.readers {
for _, r := range readers {
err = firstError(err, r.Close())
}
}
e.bufferPool.Release()
return err
}

func validateExternalIterOpts(iterOpts *IterOptions) error {
switch {
case iterOpts.PointKeyFilters != nil:
Expand All @@ -115,7 +137,9 @@ func validateExternalIterOpts(iterOpts *IterOptions) error {
return nil
}

func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
func createExternalPointIter(
ctx context.Context, it *Iterator, readEnv block.ReadEnv,
) (topLevelIterator, error) {
// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
Expand All @@ -129,20 +153,15 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
}
mlevels := it.alloc.mlevels[:0]

// TODO(jackson): External iterators never provide categorized iterator
// stats today because they exist outside the context of a *DB. If the
// sstables being read are on the physical filesystem, we may still want to
// thread a CategoryStatsCollector through so that we collect their stats.

if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
if len(it.externalIter.readers) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalIter.readers))
}
// We set a synthetic sequence number, with lower levels having higer numbers.
seqNum := 0
for _, readers := range it.externalReaders {
for _, readers := range it.externalIter.readers {
seqNum += len(readers)
}
for _, readers := range it.externalReaders {
for _, readers := range it.externalIter.readers {
for _, r := range readers {
var (
rangeDelIter keyspan.FragmentIterator
Expand All @@ -158,15 +177,13 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
seqNum--
pointIter, err = r.NewPointIter(
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
sstable.NeverUseFilterBlock,
block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil},
sstable.MakeTrivialReaderProvider(r))
sstable.NeverUseFilterBlock, readEnv, sstable.MakeTrivialReaderProvider(r))
if err != nil {
return nil, err
}
rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
}, block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil})
}, readEnv)
if err != nil {
return nil, err
}
Expand All @@ -186,7 +203,16 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
}

func finishInitializingExternal(ctx context.Context, it *Iterator) error {
pointIter, err := createExternalPointIter(ctx, it)
readEnv := block.ReadEnv{
Stats: &it.stats.InternalStats,
// TODO(jackson): External iterators never provide categorized iterator
// stats today because they exist outside the context of a *DB. If the
// sstables being read are on the physical filesystem, we may still want to
// thread a CategoryStatsCollector through so that we collect their stats.
IterStats: nil,
BufferPool: &it.externalIter.bufferPool,
}
pointIter, err := createExternalPointIter(ctx, it, readEnv)
if err != nil {
return err
}
Expand All @@ -208,14 +234,14 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
// this optimization.
// We set a synthetic sequence number, with lower levels having higer numbers.
seqNum := 0
for _, readers := range it.externalReaders {
for _, readers := range it.externalIter.readers {
seqNum += len(readers)
}
for _, readers := range it.externalReaders {
for _, readers := range it.externalIter.readers {
for _, r := range readers {
transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
if rki, err := r.NewRawRangeKeyIter(ctx, transforms, block.ReadEnv{Stats: &it.stats.InternalStats, IterStats: nil}); err != nil {
if rki, err := r.NewRawRangeKeyIter(ctx, transforms, readEnv); err != nil {
return err
} else if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
Expand Down Expand Up @@ -251,9 +277,7 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
}

func openExternalTables(
ctx context.Context,
files []sstable.ReadableFile,
readerOpts sstable.ReaderOptions,
ctx context.Context, files []sstable.ReadableFile, readerOpts sstable.ReaderOptions,
) (readers []*sstable.Reader, err error) {
readers = make([]*sstable.Reader, 0, len(files))
for i := range files {
Expand Down
8 changes: 5 additions & 3 deletions external_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"math/rand/v2"
"testing"

"github.com/cockroachdb/datadriven"
Expand All @@ -15,7 +16,6 @@ import (
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func TestExternalIterator(t *testing.T) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func BenchmarkExternalIter_NonOverlapping_Scan(b *testing.B) {
b.Run(fmt.Sprintf("keys=%d", keyCount), func(b *testing.B) {
for _, fileCount := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("files=%d", fileCount), func(b *testing.B) {
prng := rand.New(rand.NewSource(0))
prng := rand.New(rand.NewPCG(0, 0))

var fs vfs.FS = vfs.NewMem()
filenames := make([]string, fileCount)
Expand All @@ -97,7 +97,9 @@ func BenchmarkExternalIter_NonOverlapping_Scan(b *testing.B) {
for j := 0; j < keyCount/fileCount; j++ {
key := testkeys.Key(ks, int64(len(keys)))
keys = append(keys, key)
_, err = prng.Read(valBuf[:])
for i := range valBuf {
valBuf[i] = byte(prng.Uint32())
}
require.NoError(b, err)
require.NoError(b, w.Set(key, valBuf[:]))
}
Expand Down
16 changes: 5 additions & 11 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekeystack"
"github.com/cockroachdb/pebble/internal/treeprinter"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -239,8 +238,7 @@ type Iterator struct {
prefixOrFullSeekKey []byte
readSampling readSampling
stats IteratorStats
externalReaders [][]*sstable.Reader

externalIter *externalIterState
// Following fields used when constructing an iterator stack, eg, in Clone
// and SetOptions or when re-fragmenting a batch's range keys/range dels.
// Non-nil if this Iterator includes a Batch.
Expand Down Expand Up @@ -2387,11 +2385,8 @@ func (i *Iterator) Close() error {
if i.version != nil {
i.version.Unref()
}

for _, readers := range i.externalReaders {
for _, r := range readers {
err = firstError(err, r.Close())
}
if i.externalIter != nil {
err = firstError(err, i.externalIter.Close())
}

// Close the closer for the current value if one was open.
Expand All @@ -2401,7 +2396,6 @@ func (i *Iterator) Close() error {
}

if i.rangeKey != nil {

i.rangeKey.rangeKeyBuffers.PrepareForReuse()
*i.rangeKey = iteratorRangeKeyState{
rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
Expand Down Expand Up @@ -2581,7 +2575,7 @@ func (i *Iterator) processBounds(lower, upper []byte) {
//
// If only lower and upper bounds need to be modified, prefer SetBounds.
func (i *Iterator) SetOptions(o *IterOptions) {
if i.externalReaders != nil {
if i.externalIter != nil {
if err := validateExternalIterOpts(o); err != nil {
panic(err)
}
Expand Down Expand Up @@ -2753,7 +2747,7 @@ func (i *Iterator) SetOptions(o *IterOptions) {

// Iterators created through NewExternalIter have a different iterator
// initialization process.
if i.externalReaders != nil {
if i.externalIter != nil {
finishInitializingExternal(i.ctx, i)
return
}
Expand Down

0 comments on commit 9173e12

Please sign in to comment.