Skip to content

Commit

Permalink
internal/manifest: rename uses of 'file' to refer to sstable
Browse files Browse the repository at this point in the history
With the introduction of blob files, the manifest and version edits will need
to account for the addition and removal of blob files. The term 'file' when
referring to a 'sstable' is vague and potentially confusing.
  • Loading branch information
jbowens committed Feb 12, 2025
1 parent 419f239 commit da2f9c7
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 261 deletions.
12 changes: 6 additions & 6 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (d *DB) Checkpoint(
}
}

var excludedFiles map[deletedFileEntry]*fileMetadata
var excludedTables map[deletedFileEntry]*fileMetadata
var remoteFiles []base.DiskFileNum
// Set of FileBacking.DiskFileNum which will be required by virtual sstables
// in the checkpoint.
Expand All @@ -272,10 +272,10 @@ func (d *DB) Checkpoint(
iter := current.Levels[l].Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if excludeFromCheckpoint(f, opt, d.cmp) {
if excludedFiles == nil {
excludedFiles = make(map[deletedFileEntry]*fileMetadata)
if excludedTables == nil {
excludedTables = make(map[deletedFileEntry]*fileMetadata)
}
excludedFiles[deletedFileEntry{
excludedTables[deletedFileEntry{
Level: l,
FileNum: f.FileNum,
}] = f
Expand Down Expand Up @@ -325,7 +325,7 @@ func (d *DB) Checkpoint(

ckErr = d.writeCheckpointManifest(
fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
excludedFiles, removeBackingTables,
excludedTables, removeBackingTables,
)
if ckErr != nil {
return ckErr
Expand Down Expand Up @@ -480,7 +480,7 @@ func (d *DB) writeCheckpointManifest(
if len(excludedFiles) > 0 {
// Write out an additional VersionEdit that deletes the excluded SST files.
ve := versionEdit{
DeletedFiles: excludedFiles,
DeletedTables: excludedFiles,
RemovedBackingTables: removeBackingTables,
}

Expand Down
96 changes: 48 additions & 48 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,9 @@ func (c *compaction) hasExtraLevelData() bool {
// this compaction have revisions of the same user key present in both sstables,
// when it shouldn't (eg. when splitting flushes).
func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
if n := len(ve.NewFiles); n > 1 {
meta := ve.NewFiles[n-1].Meta
prevMeta := ve.NewFiles[n-2].Meta
if n := len(ve.NewTables); n > 1 {
meta := ve.NewTables[n-1].Meta
prevMeta := ve.NewTables[n-2].Meta
if !prevMeta.Largest.IsExclusiveSentinel() &&
c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
var ingestSplitFiles []ingestSplitFile
ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)

updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newTableEntry) {
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
Expand All @@ -1274,7 +1274,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {

if suggestSplit || ingestFlushable.exciseSpan.Valid() {
// We could add deleted files to ve.
ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.FileMetadata)
}

ctx := context.Background()
Expand All @@ -1287,7 +1287,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
},
v: c.version,
}
replacedFiles := make(map[base.FileNum][]newFileEntry)
replacedFiles := make(map[base.FileNum][]newTableEntry)
for _, file := range ingestFlushable.files {
var fileToSplit *fileMetadata
var level int
Expand All @@ -1312,7 +1312,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
}

// Add the current flushableIngest file to the version.
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
ve.NewTables = append(ve.NewTables, newTableEntry{Level: level, Meta: file.FileMetadata})
if fileToSplit != nil {
ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
ingestFile: file.FileMetadata,
Expand Down Expand Up @@ -1340,7 +1340,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
return nil, err
}

if _, ok := ve.DeletedFiles[deletedFileEntry{
if _, ok := ve.DeletedTables[deletedFileEntry{
Level: l,
FileNum: m.FileNum,
}]; !ok {
Expand Down Expand Up @@ -1500,16 +1500,16 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}
if err == nil {
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
for i := range ve.NewFiles {
e := &ve.NewFiles[i]
for i := range ve.NewTables {
e := &ve.NewTables[i]
info.Output = append(info.Output, e.Meta.TableInfo())
// Ingested tables are not necessarily flushed to L0. Record the level of
// each ingested file explicitly.
if ingest {
info.IngestLevels = append(info.IngestLevels, e.Level)
}
}
if len(ve.NewFiles) == 0 {
if len(ve.NewTables) == 0 {
info.Err = errEmptyTable
}

Expand Down Expand Up @@ -1548,15 +1548,15 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}
}

if len(ve.DeletedFiles) > 0 {
if len(ve.DeletedTables) > 0 {
// Iterate through all other compactions, and check if their inputs have
// been replaced due to an ingest-time split or excise. In that case,
// cancel the compaction.
for c2 := range d.mu.compact.inProgress {
for i := range c2.inputs {
iter := c2.inputs[i].files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
if _, ok := ve.DeletedTables[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
c2.cancel.Store(true)
break
}
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
flushed = d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.updateTableStatsLocked(ve.NewTables)
if ingest {
d.mu.versions.metrics.Flush.AsIngestCount++
for _, l := range c.metrics {
Expand Down Expand Up @@ -2293,22 +2293,22 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
//
// d.mu must be held when calling this method.
func (d *DB) cleanupVersionEdit(ve *versionEdit) {
obsoleteFiles := make([]*fileBacking, 0, len(ve.NewFiles))
obsoleteFiles := make([]*fileBacking, 0, len(ve.NewTables))
deletedFiles := make(map[base.FileNum]struct{})
for key := range ve.DeletedFiles {
for key := range ve.DeletedTables {
deletedFiles[key.FileNum] = struct{}{}
}
for i := range ve.NewFiles {
if ve.NewFiles[i].Meta.Virtual {
for i := range ve.NewTables {
if ve.NewTables[i].Meta.Virtual {
// We handle backing files separately.
continue
}
if _, ok := deletedFiles[ve.NewFiles[i].Meta.FileNum]; ok {
if _, ok := deletedFiles[ve.NewTables[i].Meta.FileNum]; ok {
// This file is being moved in this ve to a different level.
// Don't mark it as obsolete.
continue
}
obsoleteFiles = append(obsoleteFiles, ve.NewFiles[i].Meta.PhysicalMeta().FileBacking)
obsoleteFiles = append(obsoleteFiles, ve.NewTables[i].Meta.PhysicalMeta().FileBacking)
}
for i := range ve.CreatedBackingTables {
if ve.CreatedBackingTables[i].IsUnused() {
Expand Down Expand Up @@ -2387,8 +2387,8 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Done = true
info.Err = err
if err == nil {
for i := range ve.NewFiles {
e := &ve.NewFiles[i]
for i := range ve.NewTables {
e := &ve.NewTables[i]
info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
}
d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
Expand All @@ -2415,7 +2415,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// table list.
if err == nil {
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.updateTableStatsLocked(ve.NewTables)
}
d.deleteObsoleteFiles(jobID)

Expand Down Expand Up @@ -2444,7 +2444,7 @@ func (d *DB) runCopyCompaction(
return nil, compact.Stats{}, ErrCancelledCompaction
}
ve = &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{
DeletedTables: map[deletedFileEntry]*fileMetadata{
{Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
},
}
Expand Down Expand Up @@ -2592,7 +2592,7 @@ func (d *DB) runCopyCompaction(
}
deleteOnExit = true
}
ve.NewFiles = []newFileEntry{{
ve.NewTables = []newTableEntry{{
Level: c.outputLevel.level,
Meta: newMeta,
}}
Expand Down Expand Up @@ -2626,15 +2626,15 @@ func (d *DB) applyHintOnFile(
levelMetrics *LevelMetrics,
ve *versionEdit,
hintOverlap deletionHintOverlap,
) (newFiles []manifest.NewFileEntry, err error) {
) (newFiles []manifest.NewTableEntry, err error) {
if hintOverlap == hintDoesNotApply {
return nil, nil
}

// The hint overlaps with at least part of the file.
if hintOverlap == hintDeletesFile {
// The hint deletes the entirety of this file.
ve.DeletedFiles[deletedFileEntry{
ve.DeletedTables[deletedFileEntry{
Level: level,
FileNum: f.FileNum,
}] = f
Expand All @@ -2652,7 +2652,7 @@ func (d *DB) applyHintOnFile(
if err != nil {
return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
}
if _, ok := ve.DeletedFiles[deletedFileEntry{
if _, ok := ve.DeletedTables[deletedFileEntry{
Level: level,
FileNum: f.FileNum,
}]; !ok {
Expand Down Expand Up @@ -2712,7 +2712,7 @@ func (d *DB) runDeleteOnlyCompactionForLevel(
if err != nil {
return err
}
if _, ok := ve.DeletedFiles[manifest.DeletedFileEntry{Level: cl.level, FileNum: curFile.FileNum}]; ok {
if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{Level: cl.level, FileNum: curFile.FileNum}]; ok {
curFile = nil
}
if len(newFiles) > 0 {
Expand All @@ -2727,7 +2727,7 @@ func (d *DB) runDeleteOnlyCompactionForLevel(
break
}
}
if _, ok := ve.DeletedFiles[deletedFileEntry{
if _, ok := ve.DeletedTables[deletedFileEntry{
Level: cl.level,
FileNum: f.FileNum,
}]; !ok {
Expand Down Expand Up @@ -2787,7 +2787,7 @@ func (d *DB) runDeleteOnlyCompaction(
c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
ve = &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{},
DeletedTables: map[deletedFileEntry]*fileMetadata{},
}
for _, cl := range c.inputs {
levelMetrics := &LevelMetrics{}
Expand All @@ -2797,18 +2797,18 @@ func (d *DB) runDeleteOnlyCompaction(
c.metrics[cl.level] = levelMetrics
}
// Remove any files that were added and deleted in the same versionEdit.
ve.NewFiles = slices.DeleteFunc(ve.NewFiles, func(e manifest.NewFileEntry) bool {
deletedFileEntry := manifest.DeletedFileEntry{Level: e.Level, FileNum: e.Meta.FileNum}
if _, deleted := ve.DeletedFiles[deletedFileEntry]; deleted {
delete(ve.DeletedFiles, deletedFileEntry)
ve.NewTables = slices.DeleteFunc(ve.NewTables, func(e manifest.NewTableEntry) bool {
deletedFileEntry := manifest.DeletedTableEntry{Level: e.Level, FileNum: e.Meta.FileNum}
if _, deleted := ve.DeletedTables[deletedFileEntry]; deleted {
delete(ve.DeletedTables, deletedFileEntry)
return true
}
return false
})
// Remove any entries from CreatedBackingTables that are not used in any
// NewFiles.
usedBackingFiles := make(map[base.DiskFileNum]struct{})
for _, e := range ve.NewFiles {
for _, e := range ve.NewTables {
if e.Meta.Virtual {
usedBackingFiles[e.Meta.FileBacking.DiskFileNum] = struct{}{}
}
Expand Down Expand Up @@ -2841,10 +2841,10 @@ func (d *DB) runMoveCompaction(
},
}
ve = &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{
DeletedTables: map[deletedFileEntry]*fileMetadata{
{Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
},
NewFiles: []newFileEntry{
NewTables: []newTableEntry{
{Level: c.outputLevel.level, Meta: meta},
},
}
Expand Down Expand Up @@ -3060,12 +3060,12 @@ func (d *DB) compactAndWrite(
// tables in compact.Result.
func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
ve := &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{},
DeletedTables: map[deletedFileEntry]*fileMetadata{},
}
for _, cl := range c.inputs {
iter := cl.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
ve.DeletedFiles[deletedFileEntry{
ve.DeletedTables[deletedFileEntry{
Level: cl.level,
FileNum: f.FileNum,
}] = f
Expand Down Expand Up @@ -3096,7 +3096,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
}

inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
ve.NewFiles = make([]newFileEntry, len(result.Tables))
ve.NewTables = make([]newTableEntry, len(result.Tables))
for i := range result.Tables {
t := &result.Tables[i]

Expand Down Expand Up @@ -3134,7 +3134,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
}

ve.NewFiles[i] = newFileEntry{
ve.NewTables[i] = newTableEntry{
Level: c.outputLevel.level,
Meta: fileMeta,
}
Expand All @@ -3154,11 +3154,11 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
}

// Sanity check that the tables are ordered and don't overlap.
for i := 1; i < len(ve.NewFiles); i++ {
if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) {
for i := 1; i < len(ve.NewTables); i++ {
if ve.NewTables[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewTables[i].Meta.Smallest.UserKey) {
return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true),
ve.NewFiles[i].Meta.DebugString(c.formatKey, true),
ve.NewTables[i-1].Meta.DebugString(c.formatKey, true),
ve.NewTables[i].Meta.DebugString(c.formatKey, true),
)
}
}
Expand Down Expand Up @@ -3259,11 +3259,11 @@ func validateVersionEdit(
}

// Validate both new and deleted files.
for _, f := range ve.NewFiles {
for _, f := range ve.NewTables {
validateKey(f.Meta, f.Meta.Smallest.UserKey)
validateKey(f.Meta, f.Meta.Largest.UserKey)
}
for _, m := range ve.DeletedFiles {
for _, m := range ve.DeletedTables {
validateKey(m, m.Smallest.UserKey)
validateKey(m, m.Largest.UserKey)
}
Expand Down
Loading

0 comments on commit da2f9c7

Please sign in to comment.