Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: avoid mutex-protected I/O in Ingest and NewEventuallyFileOnlySnapshot #3197

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,13 @@ func (b *flushableBatch) readyForFlush() bool {
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (b *flushableBatch) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
computePossibleOverlapsGenericImpl[*flushableBatch](b, b.cmp, fn, bounded)
}

// Note: flushableBatchIter mirrors the implementation of batchIter. Keep the
// two in sync.
type flushableBatchIter struct {
Expand Down
59 changes: 25 additions & 34 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,19 +1578,12 @@ func (d *DB) NewEventuallyFileOnlySnapshot(keyRanges []KeyRange) *EventuallyFile
if err := d.closed.Load(); err != nil {
panic(err)
}

internalKeyRanges := make([]internalKeyRange, len(keyRanges))
for i := range keyRanges {
if i > 0 && d.cmp(keyRanges[i-1].End, keyRanges[i].Start) > 0 {
panic("pebble: key ranges for eventually-file-only-snapshot not in order")
}
internalKeyRanges[i] = internalKeyRange{
smallest: base.MakeInternalKey(keyRanges[i].Start, InternalKeySeqNumMax, InternalKeyKindMax),
largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, keyRanges[i].End),
}
}

return d.makeEventuallyFileOnlySnapshot(keyRanges, internalKeyRanges)
return d.makeEventuallyFileOnlySnapshot(keyRanges)
}

// Close closes the DB.
Expand Down Expand Up @@ -1748,25 +1741,17 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
return errors.Errorf("Compact start %s is not less than end %s",
d.opts.Comparer.FormatKey(start), d.opts.Comparer.FormatKey(end))
}
iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
iEnd := base.MakeInternalKey(end, 0, 0)
m := (&fileMetadata{}).ExtendPointKeyBounds(d.cmp, iStart, iEnd)
meta := []*fileMetadata{m}

d.mu.Lock()
maxLevelWithFiles := 1
cur := d.mu.versions.currentVersion()
for level := 0; level < numLevels; level++ {
overlaps := cur.Overlaps(level, d.cmp, start, end, iEnd.IsExclusiveSentinel())
overlaps := cur.Overlaps(level, d.cmp, start, end, false)
if !overlaps.Empty() {
maxLevelWithFiles = level + 1
}
}

keyRanges := make([]internalKeyRange, len(meta))
for i := range meta {
keyRanges[i] = internalKeyRange{smallest: m.Smallest, largest: m.Largest}
}
// Determine if any memtable overlaps with the compaction range. We wait for
// any such overlap to flush (initiating a flush if necessary).
mem, err := func() (*flushableEntry, error) {
Expand All @@ -1776,25 +1761,31 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
// overlaps.
for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
mem := d.mu.mem.queue[i]
if ingestMemtableOverlaps(d.cmp, mem, keyRanges) {
var err error
var anyOverlaps bool
mem.computePossibleOverlaps(func(b bounded) shouldContinue {
anyOverlaps = true
return stopIteration
}, KeyRange{Start: start, End: end})
if !anyOverlaps {
continue
}
var err error
if mem.flushable == d.mu.mem.mutable {
// We have to hold both commitPipeline.mu and DB.mu when calling
// makeRoomForWrite(). Lock order requirements elsewhere force us to
// unlock DB.mu in order to grab commitPipeline.mu first.
d.mu.Unlock()
d.commit.mu.Lock()
d.mu.Lock()
defer d.commit.mu.Unlock()
if mem.flushable == d.mu.mem.mutable {
// We have to hold both commitPipeline.mu and DB.mu when calling
// makeRoomForWrite(). Lock order requirements elsewhere force us to
// unlock DB.mu in order to grab commitPipeline.mu first.
d.mu.Unlock()
d.commit.mu.Lock()
d.mu.Lock()
defer d.commit.mu.Unlock()
if mem.flushable == d.mu.mem.mutable {
// Only flush if the active memtable is unchanged.
err = d.makeRoomForWrite(nil)
}
// Only flush if the active memtable is unchanged.
err = d.makeRoomForWrite(nil)
}
mem.flushForced = true
d.maybeScheduleFlush()
return mem, err
}
mem.flushForced = true
d.maybeScheduleFlush()
return mem, err
}
return nil, nil
}()
Expand All @@ -1811,7 +1802,7 @@ func (d *DB) Compact(start, end []byte, parallelize bool) error {
for level := 0; level < maxLevelWithFiles; {
for {
if err := d.manualCompact(
iStart.UserKey, iEnd.UserKey, level, parallelize); err != nil {
start, end, level, parallelize); err != nil {
if errors.Is(err, ErrCancelledCompaction) {
continue
}
Expand Down
93 changes: 93 additions & 0 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

Expand All @@ -30,6 +31,35 @@ type flushable interface {
// memTable.readyForFlush for one implementation which needs to check whether
// there are any outstanding write references.
readyForFlush() bool
// computePossibleOverlaps determines whether the flushable's keys overlap
// with the bounds of any of the provided bounded items. If an item overlaps
// or might overlap but it's not possible to determine overlap cheaply,
// computePossibleOverlaps invokes the provided function with the object
// that might overlap. computePossibleOverlaps must not perform any I/O and
// implementations should invoke the provided function for items that would
// require I/O to determine overlap.
computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
}

type shouldContinue bool

const (
continueIteration shouldContinue = true
stopIteration = false
)

type bounded interface {
// InternalKeyBounds returns a start key and an end key. Both bounds are
// inclusive.
InternalKeyBounds() (InternalKey, InternalKey)
}

func sliceAsBounded[B bounded](s []B) []bounded {
ret := make([]bounded, len(s))
for i := 0; i < len(s); i++ {
ret[i] = s[i]
}
return ret
}

// flushableEntry wraps a flushable and adds additional metadata and
Expand Down Expand Up @@ -252,3 +282,66 @@ func (s *ingestedFlushable) readyForFlush() bool {
// determine where to place the files in the lsm.
return true
}

// computePossibleOverlaps is part of the flushable interface.
func (s *ingestedFlushable) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
for i := range bounded {
smallest, largest := bounded[i].InternalKeyBounds()
for j := 0; j < len(s.files); j++ {
if sstableKeyCompare(s.comparer.Compare, s.files[j].Largest, smallest) >= 0 {
// This file's largest key is larger than smallest. Either the
// file overlaps the bounds, or it lies strictly after the
// bounds. Either way we can stop iterating since the files are
// sorted. But first, determine if there's overlap and call fn
// if necessary.
if sstableKeyCompare(s.comparer.Compare, s.files[j].Smallest, largest) <= 0 {
// The file overlaps in key boundaries. The file doesn't necessarily
// contain any keys within the key range, but we would need to
// perform I/O to know for sure. The flushable interface dictates
// that we're not permitted to perform I/O here, so err towards
// assuming overlap.
if !fn(bounded[i]) {
return
}
}
break
}
}
}
}

// computePossibleOverlapsGenericImpl is an implemention of the flushable
// interface's computePossibleOverlaps function for flushable implementations
// with only in-memory state that do not have special requirements and should
// read through the ordinary flushable iterators.
//
// This function must only be used with implementations that are infallible (eg,
// memtable iterators) and will panic if an error is encountered.
func computePossibleOverlapsGenericImpl[F flushable](
f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
) {
iter := f.newIter(nil)
rangeDelIter := f.newRangeDelIter(nil)
rkeyIter := f.newRangeKeyIter(nil)
for _, b := range bounded {
s, l := b.InternalKeyBounds()
kr := internalKeyRange{s, l}
if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
if !fn(b) {
break
}
}
}

for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
if c != nil {
if err := c.Close(); err != nil {
// This implementation must be used in circumstances where
// reading through the iterator is infallible.
panic(err)
}
}
}
}
Loading
Loading