Skip to content

Commit

Permalink
internal/manifest: rename FileMetadata to TableMetadata
Browse files Browse the repository at this point in the history
The manifest Version will need to track the lifecycle of blob files in addition
to the existing sstable files. Rename FileMetadata to TableMetadata to
disambiguate.
  • Loading branch information
jbowens committed Feb 12, 2025
1 parent da2f9c7 commit 2f4d773
Show file tree
Hide file tree
Showing 61 changed files with 636 additions and 629 deletions.
12 changes: 6 additions & 6 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type CheckpointSpan struct {
// excludeFromCheckpoint returns true if an SST file should be excluded from the
// checkpoint because it does not overlap with the spans of interest
// (opt.restrictToSpans).
func excludeFromCheckpoint(f *fileMetadata, opt *checkpointOptions, cmp Compare) bool {
func excludeFromCheckpoint(f *tableMetadata, opt *checkpointOptions, cmp Compare) bool {
if len(opt.restrictToSpans) == 0 {
// Option not set; don't exclude anything.
return false
Expand Down Expand Up @@ -262,7 +262,7 @@ func (d *DB) Checkpoint(
}
}

var excludedTables map[deletedFileEntry]*fileMetadata
var excludedTables map[deletedFileEntry]*tableMetadata
var remoteFiles []base.DiskFileNum
// Set of FileBacking.DiskFileNum which will be required by virtual sstables
// in the checkpoint.
Expand All @@ -273,7 +273,7 @@ func (d *DB) Checkpoint(
for f := iter.First(); f != nil; f = iter.Next() {
if excludeFromCheckpoint(f, opt, d.cmp) {
if excludedTables == nil {
excludedTables = make(map[deletedFileEntry]*fileMetadata)
excludedTables = make(map[deletedFileEntry]*tableMetadata)
}
excludedTables[deletedFileEntry{
Level: l,
Expand Down Expand Up @@ -427,7 +427,7 @@ func (d *DB) writeCheckpointManifest(
destDir vfs.File,
manifestFileNum base.DiskFileNum,
manifestSize int64,
excludedFiles map[deletedFileEntry]*fileMetadata,
excludedTables map[deletedFileEntry]*tableMetadata,
removeBackingTables []base.DiskFileNum,
) error {
// Copy the MANIFEST, and create a pointer to it. We copy rather
Expand Down Expand Up @@ -477,10 +477,10 @@ func (d *DB) writeCheckpointManifest(
}
}

if len(excludedFiles) > 0 {
if len(excludedTables) > 0 {
// Write out an additional VersionEdit that deletes the excluded SST files.
ve := versionEdit{
DeletedTables: excludedFiles,
DeletedTables: excludedTables,
RemovedBackingTables: removeBackingTables,
}

Expand Down
55 changes: 29 additions & 26 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ type compaction struct {
func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
var seqNum base.SeqNum
for _, cl := range c.inputs {
cl.files.Each(func(m *manifest.FileMetadata) {
cl.files.Each(func(m *manifest.TableMetadata) {
seqNum = max(seqNum, m.LargestSeqNumAbsolute)
})
}
Expand Down Expand Up @@ -850,7 +850,7 @@ func (c *compaction) newInputIters(
}
}
if hasRangeKeys {
newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -949,8 +949,11 @@ func (c *compaction) newRangeDelIter(
l manifest.Layer,
) (*noCloseIter, error) {
opts.layer = l
iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
iiopts, iterRangeDeletions)
iterSet, err := newIters(context.Background(), f.TableMetadata, &opts,
internalIterOpts{
compaction: true,
readEnv: block.ReadEnv{BufferPool: &c.bufferPool},
}, iterRangeDeletions)
if err != nil {
return nil, err
} else if iterSet.rangeDeletion == nil {
Expand Down Expand Up @@ -1255,7 +1258,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
var ingestSplitFiles []ingestSplitFile
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newTableEntry) {
updateLevelMetricsOnExcise := func(m *tableMetadata, level int, added []newTableEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
Expand All @@ -1274,7 +1277,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {

if suggestSplit || ingestFlushable.exciseSpan.Valid() {
// We could add deleted files to ve.
ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.FileMetadata)
ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
}

ctx := context.Background()
Expand All @@ -1289,13 +1292,13 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
}
replacedFiles := make(map[base.FileNum][]newTableEntry)
for _, file := range ingestFlushable.files {
var fileToSplit *fileMetadata
var fileToSplit *tableMetadata
var level int

// This file fits perfectly within the excise span, so we can slot it at L6.
if ingestFlushable.exciseSpan.Valid() &&
ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
ingestFlushable.exciseSpan.Contains(d.cmp, file.TableMetadata.Smallest) &&
ingestFlushable.exciseSpan.Contains(d.cmp, file.TableMetadata.Largest) {
level = 6
} else {
// TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
Expand All @@ -1304,18 +1307,18 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
return nil, err
}
level, fileToSplit, err = ingestTargetLevel(
ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit,
ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.TableMetadata, suggestSplit,
)
if err != nil {
return nil, err
}
}

// Add the current flushableIngest file to the version.
ve.NewTables = append(ve.NewTables, newTableEntry{Level: level, Meta: file.FileMetadata})
ve.NewTables = append(ve.NewTables, newTableEntry{Level: level, Meta: file.TableMetadata})
if fileToSplit != nil {
ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
ingestFile: file.FileMetadata,
ingestFile: file.TableMetadata,
splitFile: fileToSplit,
level: level,
})
Expand Down Expand Up @@ -1997,7 +2000,7 @@ type deleteCompactionHint struct {
// be deleted.
tombstoneLevel int
// The file containing the range tombstone(s) that created the hint.
tombstoneFile *fileMetadata
tombstoneFile *tableMetadata
// The smallest and largest sequence numbers of the abutting tombstones
// merged to form this hint. All of a tables' keys must be less than the
// tombstone smallest sequence number to be deleted. All of a tables'
Expand Down Expand Up @@ -2036,7 +2039,7 @@ func (h deleteCompactionHint) String() string {
}

func (h *deleteCompactionHint) canDeleteOrExcise(
cmp Compare, m *fileMetadata, snapshots compact.Snapshots, exciseEnabled bool,
cmp Compare, m *tableMetadata, snapshots compact.Snapshots, exciseEnabled bool,
) deletionHintOverlap {
// The file can only be deleted if all of its keys are older than the
// earliest tombstone aggregated into the hint. Note that we use
Expand Down Expand Up @@ -2112,8 +2115,8 @@ func checkDeleteCompactionHints(
snapshots compact.Snapshots,
exciseEnabled bool,
) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
var files map[*fileMetadata]bool
var byLevel [numLevels][]*fileMetadata
var files map[*tableMetadata]bool
var byLevel [numLevels][]*tableMetadata

// Delete-only compactions can be quadratic (O(mn)) in terms of runtime
// where m = number of files in the delete-only compaction and n = number
Expand Down Expand Up @@ -2178,7 +2181,7 @@ func checkDeleteCompactionHints(
// equal to the number of files it deletes. First, determine how many files are
// affected by this hint.
filesDeletedByCurrentHint := 0
var filesDeletedByLevel [7][]*fileMetadata
var filesDeletedByLevel [7][]*tableMetadata
for l := h.tombstoneLevel + 1; l < numLevels; l++ {
overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
iter := overlaps.Iter()
Expand Down Expand Up @@ -2222,7 +2225,7 @@ func checkDeleteCompactionHints(
if files == nil {
// Construct files lazily, assuming most calls will not
// produce delete-only compactions.
files = make(map[*fileMetadata]bool)
files = make(map[*tableMetadata]bool)
}
files[m] = true
}
Expand Down Expand Up @@ -2444,7 +2447,7 @@ func (d *DB) runCopyCompaction(
return nil, compact.Stats{}, ErrCancelledCompaction
}
ve = &versionEdit{
DeletedTables: map[deletedFileEntry]*fileMetadata{
DeletedTables: map[deletedFileEntry]*tableMetadata{
{Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
},
}
Expand All @@ -2470,7 +2473,7 @@ func (d *DB) runCopyCompaction(
// To ease up cleanup of the local file and tracking of refs, we create
// a new FileNum. This has the potential of making the block cache less
// effective, however.
newMeta := &fileMetadata{
newMeta := &tableMetadata{
Size: inputMeta.Size,
CreationTime: inputMeta.CreationTime,
SmallestSeqNum: inputMeta.SmallestSeqNum,
Expand Down Expand Up @@ -2621,7 +2624,7 @@ func (d *DB) runCopyCompaction(
// of tables deleted or excised.
func (d *DB) applyHintOnFile(
h deleteCompactionHint,
f *fileMetadata,
f *tableMetadata,
level int,
levelMetrics *LevelMetrics,
ve *versionEdit,
Expand Down Expand Up @@ -2787,7 +2790,7 @@ func (d *DB) runDeleteOnlyCompaction(
c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
ve = &versionEdit{
DeletedTables: map[deletedFileEntry]*fileMetadata{},
DeletedTables: map[deletedFileEntry]*tableMetadata{},
}
for _, cl := range c.inputs {
levelMetrics := &LevelMetrics{}
Expand Down Expand Up @@ -2841,7 +2844,7 @@ func (d *DB) runMoveCompaction(
},
}
ve = &versionEdit{
DeletedTables: map[deletedFileEntry]*fileMetadata{
DeletedTables: map[deletedFileEntry]*tableMetadata{
{Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
},
NewTables: []newTableEntry{
Expand Down Expand Up @@ -3060,7 +3063,7 @@ func (d *DB) compactAndWrite(
// tables in compact.Result.
func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
ve := &versionEdit{
DeletedTables: map[deletedFileEntry]*fileMetadata{},
DeletedTables: map[deletedFileEntry]*tableMetadata{},
}
for _, cl := range c.inputs {
iter := cl.files.Iter()
Expand Down Expand Up @@ -3100,7 +3103,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
for i := range result.Tables {
t := &result.Tables[i]

fileMeta := &fileMetadata{
fileMeta := &tableMetadata{
FileNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
CreationTime: t.CreationTime.Unix(),
Size: t.WriterMeta.Size,
Expand Down Expand Up @@ -3252,7 +3255,7 @@ func (d *DB) newCompactionOutput(
func validateVersionEdit(
ve *versionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
) {
validateKey := func(f *manifest.FileMetadata, key []byte) {
validateKey := func(f *manifest.TableMetadata, key []byte) {
if err := vk.Validate(key); err != nil {
logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
}
Expand Down
Loading

0 comments on commit 2f4d773

Please sign in to comment.