diff --git a/metamorphic/ops.go b/metamorphic/ops.go index a9bfb1ff6f..eae83df670 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -898,19 +898,14 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { } db := t.getDB(o.dbID) if b.Empty() { - // Ingestion will be a no-op. But still do a DeleteRange and RangeKeyDelete - // as the generator/key manager expect this to succeed regardless. - // - // TODO(bilal): Take out this case when we support standalone Excises with - // no ingestions. - err = firstError(err, db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) - err = firstError(err, db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) - h.Recordf("%s // %v", o, err) + h.Recordf("%s // %v", o, o.simulateExcise(db, t)) return } + if o.sstContainsExciseTombstone { // Add a rangedel and rangekeydel to the batch. This ensures it'll end up - // inside the sstable. + // inside the sstable. Note that all entries in the sstable will have the + // same sequence number, so the ordering within the batch doesn't matter. err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) } @@ -919,38 +914,39 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { h.Recordf("Build(%s) // %v", o.batchID, err2) return } - err = firstError(err, err2) err = firstError(err, b.Close()) if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 { - // No-op. - h.Recordf("%s // %v", o, err) + h.Recordf("%s // %v", o, o.simulateExcise(db, t)) return } - if !t.testOpts.useExcise { - // Do a rangedel and rangekeydel before the ingestion. This mimics the - // behaviour of an excise. - err = firstError(err, db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) - err = firstError(err, db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) - } if t.testOpts.useExcise { err = firstError(err, t.withRetries(func() error { - _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ + _, err := db.IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, }, o.sstContainsExciseTombstone) return err })) } else { + err = firstError(err, o.simulateExcise(db, t)) err = firstError(err, t.withRetries(func() error { - return t.getDB(o.dbID).Ingest([]string{path}) + return db.Ingest([]string{path}) })) } h.Recordf("%s // %v", o, err) } +func (o *ingestAndExciseOp) simulateExcise(db *pebble.DB, t *Test) error { + // Simulate the excise using a DeleteRange and RangeKeyDelete. + return errors.CombineErrors( + db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts), + db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts), + ) +} + func (o *ingestAndExciseOp) receiver() objID { return o.dbID } func (o *ingestAndExciseOp) syncObjs() objIDSlice { // Ingest should not be concurrent with mutating the batches that will be