Skip to content

Commit

Permalink
db: avoid mutex-protected I/O in Ingest and NewEventuallyFileOnlySnap…
Browse files Browse the repository at this point in the history
…shot

Creation of an eventually file-only snapshot requires inspecting the flushable
queue to look for overlap with the key ranges of the new snapshot. This is
performed while holding the DB mutex to avoid racing with other goroutines that
may mutate the flushable queue. Previously, if ingested sstables were queued as
flushable ingests, checking for overlap could perform read I/O while holding
the database mutex.

Similarly, ingestion requires determining which ingested sstables overlap any
flushables. This is used for correctness (to ensure we sequence the sstable
higher in the LSM than any overlapping flushables) and for ingest statistics
that we return to the caller. Previously, if ingested sstables were already in
the queue as flushable ingests, checking for overlap could perform read I/O
while holding both the database mutex and the commit pipeline mutex.

This commit refactors these overlap checks to use a new computePossibleOverlaps
method of the flushable interface. This allows the `ingestedFlushable`
implementation to avoid I/O and determine overlap using simple file boundary
comparisons. This required some gymnastics to support the IngestAndExcise case
that requires knowledge of which files overlapped flushables and must detect
overlaps with both ingested sstables and an excise span.
  • Loading branch information
jbowens committed Jan 9, 2024
1 parent eeac388 commit 065169e
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 145 deletions.
7 changes: 7 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,13 @@ func (b *flushableBatch) readyForFlush() bool {
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (b *flushableBatch) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
}

// Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
// two in sync.
type flushableBatchIter struct {
Expand Down
59 changes: 25 additions & 34 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,19 +1578,12 @@ func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFile
if err := d.closed.Load(); err != nil {
panic(err)
}

internalKeyRanges := make([]internalKeyRange, len(keyRanges))
for i := range keyRanges {
if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
panic("pebble: key ranges for eventually-file-only-snapshot not in order")
}
internalKeyRanges[i] = internalKeyRange{
smallest: base.MakeInternalKey(keyRanges[i].Start, InternalKeySeqNumMax, InternalKeyKindMax),
largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, keyRanges[i].End),
}
}

return d.makeEventuallyFileOnlySnapshot(keyRanges, internalKeyRanges)
return d.makeEventuallyFileOnlySnapshot(keyRanges)
}

// Close closes the DB.
Expand Down Expand Up @@ -1748,25 +1741,17 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
return errors.Errorf("Compact start %s is not less than end %s",
d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
}
iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
iEnd := base.MakeInternalKey(end, 0, 0)
m := (&fileMetadata{}).ExtendPointKeyBounds(d.cmp, iStart, iEnd)
meta := []*fileMetadata{m}

d.mu.Lock()
maxLevelWithFiles := 1
cur := d.mu.versions.currentVersion()
for level := 0; level < numLevels; level++ {
overlaps := cur.Overlaps(level, d.cmp, start, end, iEnd.IsExclusiveSentinel())
overlaps := cur.Overlaps(level, d.cmp, start, end, false)
if !overlaps.Empty() {
maxLevelWithFiles = level + 1
}
}

keyRanges := make([]internalKeyRange, len(meta))
for i := range meta {
keyRanges[i] = internalKeyRange{smallest: m.Smallest, largest: m.Largest}
}
// Determine if any memtable overlaps with the compaction range. We wait for
// any such overlap to flush (initiating a flush if necessary).
mem, err := func() (*flushableEntry, error) {
Expand All @@ -1776,25 +1761,31 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
// overlaps.
for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
mem := d.mu.mem.queue[i]
if ingestMemtableOverlaps(d.cmp, mem, keyRanges) {
var err error
var anyOverlaps bool
mem.computePossibleOverlaps(func(b bounded) shouldContinue {
anyOverlaps = true
return stopIteration
}, KeyRange{Start: start, End: end})
if !anyOverlaps {
continue
}
var err error
if mem.flushable == d.mu.mem.mutable {
// We have to hold both commitPipeline.mu and DB.mu when calling
// makeRoomForWrite(). Lock order requirements elsewhere force us to
// unlock DB.mu in order to grab commitPipeline.mu first.
d.mu.Unlock()
d.commit.mu.Lock()
d.mu.Lock()
defer d.commit.mu.Unlock()
if mem.flushable == d.mu.mem.mutable {
// We have to hold both commitPipeline.mu and DB.mu when calling
// makeRoomForWrite(). Lock order requirements elsewhere force us to
// unlock DB.mu in order to grab commitPipeline.mu first.
d.mu.Unlock()
d.commit.mu.Lock()
d.mu.Lock()
defer d.commit.mu.Unlock()
if mem.flushable == d.mu.mem.mutable {
// Only flush if the active memtable is unchanged.
err = d.makeRoomForWrite(nil)
}
// Only flush if the active memtable is unchanged.
err = d.makeRoomForWrite(nil)
}
mem.flushForced = true
d.maybeScheduleFlush()
return mem, err
}
mem.flushForced = true
d.maybeScheduleFlush()
return mem, err
}
return nil, nil
}()
Expand All @@ -1811,7 +1802,7 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
for level := 0; level < maxLevelWithFiles; {
for {
if err := d.manualCompact(
iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
start, end, level, parallelize); err != nil {
if errors.Is(err, ErrCancelledCompaction) {
continue
}
Expand Down
93 changes: 93 additions & 0 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

Expand All @@ -30,6 +31,35 @@ type flushable interface {
// memTable.readyForFlush for one implementation which needs to check whether
// there are any outstanding write references.
readyForFlush() bool
// computePossibleOverlaps determines whether the flushable's keys overlap
// with the bounds of any of the provided bounded items. If an item overlaps
// or might overlap but it's not possible to determine overlap cheaply,
// computePossibleOverlaps invokes the provided function with the object
// that might overlap. computePossibleOverlaps must not perform any I/O and
// implementations should invoke the provided function for items that would
// require I/O to determine overlap.
computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
}

type shouldContinue bool

const (
continueIteration shouldContinue = true
stopIteration
)

type bounded interface {
// InternalKeyBounds returns a start key and an end key. Both bounds are
// inclusive.
InternalKeyBounds() (InternalKey, InternalKey)
}

func sliceAsBounded[B bounded](s []B) []bounded {
ret := make([]bounded, len(s))
for i := 0; i < len(s); i++ {
ret[i] = s[i]
}
return ret
}

// flushableEntry wraps a flushable and adds additional metadata and
Expand Down Expand Up @@ -252,3 +282,66 @@ func (s *ingestedFlushable) readyForFlush() bool {
// determine where to place the files in the lsm.
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (s *ingestedFlushable) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
for i := range bounded {
smallest, largest := bounded[i].InternalKeyBounds()
for j := 0; j < len(s.files); j++ {
if sstableKeyCompare(s.comparer.Compare, s.files[j].Largest, smallest) >= 0 {
// This file's largest key is larger than smallest. Either the
// file overlaps the bounds, or it lies strictly after the
// bounds. Either way we can stop iterating since the files are
// sorted. But first, determine if there's overlap and call fn
// if necessary.
if sstableKeyCompare(s.comparer.Compare, s.files[j].Smallest, largest) <= 0 {
// The file overlaps in key boundaries. The file doesn't necessarily
// contain any keys within the key range, but we would need to
// perform I/O to know for sure. The flushable interface dictates
// that we're not permitted to perform I/O here, so err towards
// assuming overlap.
if fn(bounded[i]) == stopIteration {
return
}
}
break
}
}
}
}

// computePossibleOverlapsGenericImpl is an implemention of the flushable
// interface's computePossibleOverlaps function for flushable implementations
// with only in-memory state that do not have special requirements and should
// read through the ordinary flushable iterators.
//
// This function must only be used with implementations that are infallible (eg,
// memtable iterators) and will panic if an error is encountered.
func computePossibleOverlapsGenericImpl[F flushable](
f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
) {
iter := f.newIter(nil)
rangeDelIter := f.newRangeDelIter(nil)
rkeyIter := f.newRangeKeyIter(nil)
for _, b := range bounded {
s, l := b.InternalKeyBounds()
kr := internalKeyRange{s, l}
if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
if fn(b) == stopIteration {
break
}
}
}

for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
if c != nil {
if err := c.Close(); err != nil {
// This implementation must be used in circumstances where
// reading through the iterator is infallible.
panic(err)
}
}
}
}
Loading

0 comments on commit 065169e

Please sign in to comment.