Skip to content

Commit

Permalink
record: add WAL read ahead usage, new chunk format, and major version
Browse files Browse the repository at this point in the history
Add usages of WAL readAheadForCorruption to record.Next() and
record.Read(). Write a new chunk format to include sync offsets
for reading ahead. Add a new format major version for the new
chunk format.
  • Loading branch information
EdwardX29 committed Feb 18, 2025
1 parent 9a86078 commit 4fa7ac2
Show file tree
Hide file tree
Showing 19 changed files with 1,764 additions and 100 deletions.
14 changes: 12 additions & 2 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ const (
// block.
FormatColumnarBlocks

// FormatWALSyncChunks is a format major version enabling the writing of
// WAL sync chunks. These new chunks are used to disambiguate between corruption
// and logical EOF during WAL replay. This is implemented by adding a new
// chunk wire format that encodes an additional "Synced Offset" field which acts
// as a commitment that the WAL should have been synced up until the offset.
FormatWALSyncChunks

// -- Add new versions here --

// FormatNewest is the most recent format major version.
Expand Down Expand Up @@ -235,7 +242,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises:
return sstable.TableFormatPebblev4
case FormatColumnarBlocks:
case FormatColumnarBlocks, FormatWALSyncChunks:
return sstable.TableFormatPebblev5
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand All @@ -248,7 +255,7 @@ func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
switch v {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises, FormatColumnarBlocks:
FormatFlushableIngestExcises, FormatColumnarBlocks, FormatWALSyncChunks:
return sstable.TableFormatPebblev1
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand Down Expand Up @@ -291,6 +298,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatColumnarBlocks: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatColumnarBlocks)
},
FormatWALSyncChunks: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatWALSyncChunks)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
8 changes: 6 additions & 2 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16))
require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17))
require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18))
require.Equal(t, FormatColumnarBlocks, FormatMajorVersion(19))

// When we add a new version, we should add a check for the new version in
// addition to updating these expected values.
require.Equal(t, FormatNewest, FormatMajorVersion(19))
require.Equal(t, internalFormatNewest, FormatMajorVersion(19))
require.Equal(t, FormatNewest, FormatMajorVersion(20))
require.Equal(t, internalFormatNewest, FormatMajorVersion(20))
}

func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
Expand Down Expand Up @@ -60,6 +61,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatColumnarBlocks))
require.Equal(t, FormatColumnarBlocks, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatWALSyncChunks))
require.Equal(t, FormatWALSyncChunks, d.FormatMajorVersion())

require.NoError(t, d.Close())

Expand Down Expand Up @@ -217,6 +220,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatColumnarBlocks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5},
FormatWALSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5},
}

// Valid versions.
Expand Down
32 changes: 23 additions & 9 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
QueueSemChan: d.commit.logSyncQSem,
Logger: opts.Logger,
EventListener: walEventListenerAdaptor{l: opts.EventListener},
WriteWALSyncOffsets: FormatMajorVersion(d.mu.formatVers.vers.Load()) >= FormatWALSyncChunks,
}
if opts.WALFailover != nil {
walOpts.Secondary = opts.WALFailover.Secondary
Expand Down Expand Up @@ -930,18 +931,31 @@ func (d *DB) replayWAL(
}
if err != nil {
// It is common to encounter a zeroed or invalid chunk due to WAL
// preallocation and WAL recycling. We need to distinguish these
// errors from EOF in order to recognize that the record was
// truncated and to avoid replaying subsequent WALs, but want
// to otherwise treat them like EOF.
if err == io.EOF {
// preallocation and WAL recycling. However zeroed or invalid chunks
// can also be a consequence of corruption / disk rot. When the log
// reader encounters one of these cases, it attempts to disambiguate
// by reading ahead looking for a future record. If a future chunk
// indicates the chunk at the original offset should've been valid, it
// surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
// errors are always indicative of corruption and data loss.
//
// Otherwise, the reader surfaces io.ErrUnexpectedEOF indicating that
// the WAL terminated uncleanly and ambiguously. If the WAL is the
// most recent logical WAL, the caller passes in (strictWALTail=false),
// indicating we should tolerate the unclean ending. If the WAL is an
// older WAL, the caller passes in (strictWALTail=true), indicating that
// the WAL should have been closed cleanly, and we should interpret
// the `io.ErrUnexpectedEOF` as corruption and stop recovery.
if errors.Is(err, io.EOF) {
break
} else if record.IsInvalidRecord(err) {
if !strictWALTail {
break
}
} else if errors.Is(err, io.ErrUnexpectedEOF) && !strictWALTail {
break
} else if errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
// If a read-ahead returns one of these errors, they should be marked with corruption.
// Other I/O related errors should not be marked with corruption and simply returned.
err = errors.Mark(err, ErrCorruption)
}

return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
}

Expand Down
2 changes: 1 addition & 1 deletion open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestNewDBFilenames(t *testing.T) {
"LOCK",
"MANIFEST-000001",
"OPTIONS-000003",
"marker.format-version.000006.019",
"marker.format-version.000007.020",
"marker.manifest.000001.MANIFEST-000001",
},
}
Expand Down
97 changes: 96 additions & 1 deletion record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package record

import (
"bytes"
"context"
"encoding/binary"
"io"
Expand Down Expand Up @@ -49,6 +50,19 @@ const (
SyncConcurrency = 1 << syncConcurrencyBits
)

const (

// noOpBytes is a byte written to help record.Reader distinguish between
// the recyclable format and the sync format when there is between 11 and 18
// bytes remaining in a block.
noOpByte = byte(255)

// noOpSequenceLength is a length of 4 bytes to be written to the zeroed header
// to distinguish when there is the special case of having between 11 and 18 bytes
// remaining in a block.
noOpSequenceLength = 4
)

type syncSlot struct {
wg *sync.WaitGroup
err *error
Expand Down Expand Up @@ -473,6 +487,17 @@ type LogWriter struct {
pendingSyncsBackingIndex pendingSyncsWithHighestSyncIndex

pendingSyncForSyncQueueBacking pendingSyncForSyncQueue

// syncedOffset is the offset in the log that is durably synced after a
// flush. This member is used to write the WAL Sync chunk format's "Offset"
// field in the header.
syncedOffset atomic.Uint64

// emitFragment is set at runtime depending on which FormatMajorVersion
// is used. emitFragment will be set to writing WAL Sync chunk formats
// if the FormatMajorVersion is greater than or equal to FormatWALSyncChunks,
// otherwise it will write the recyclable chunk format.
emitFragment func(n int, p []byte) (remainingP []byte)
}

// LogWriterConfig is a struct used for configuring new LogWriters
Expand All @@ -497,6 +522,9 @@ type LogWriterConfig struct {
// package) precede the lower layer locks (in the record package). These
// callbacks are serialized since they are invoked from the flushLoop.
ExternalSyncQueueCallback ExternalSyncQueueCallback

// WriteWALSyncOffsets represents whether to write the WAL sync chunk format.
WriteWALSyncOffsets bool
}

// ExternalSyncQueueCallback is to be run when a PendingSync has been
Expand Down Expand Up @@ -537,6 +565,13 @@ func NewLogWriter(
return time.AfterFunc(d, f)
},
}

if logWriterConfig.WriteWALSyncOffsets {
r.emitFragment = r.emitFragmentSyncOffsets
} else {
r.emitFragment = r.emitFragmentRecyclable
}

m := &LogWriterMetrics{}
if logWriterConfig.ExternalSyncQueueCallback != nil {
r.pendingSyncsBackingIndex.init(logWriterConfig.ExternalSyncQueueCallback)
Expand Down Expand Up @@ -584,6 +619,11 @@ func (w *LogWriter) flushLoop(context.Context) {
f.Unlock()
}()

// writtenOffset is the amount of data that has been written
// but not necessarily synced. This is used to update logWriter's
// syncedOffset after a sync.
var writtenOffset uint64 = 0

// The flush loop performs flushing of full and partial data blocks to the
// underlying writer (LogWriter.w), syncing of the writer, and notification
// to sync requests that they have completed.
Expand Down Expand Up @@ -694,9 +734,11 @@ func (w *LogWriter) flushLoop(context.Context) {
f.Lock()
continue
}
writtenOffset += uint64(len(data))
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, snap)
f.Lock()
if synced && f.fsyncLatency != nil {
w.syncedOffset.Store(writtenOffset)
f.fsyncLatency.Observe(float64(syncLatency))
}
f.err = err
Expand Down Expand Up @@ -954,6 +996,11 @@ func (w *LogWriter) Size() int64 {
return w.blockNum*blockSize + int64(w.block.written.Load())
}

// emitEOFTrailer writes a special recyclable chunk header to signal EOF.
// The reason why this function writes the recyclable chunk header instead
// of having a function for writing recyclable and WAL sync chunks as
// emitFragment does it because there is no reason to add 8 additional
// bytes to the EOFTrailer for the SyncedOffset as it will be zeroed out anyway.
func (w *LogWriter) emitEOFTrailer() {
// Write a recyclable chunk header with a different log number. Readers
// will treat the header as EOF when the log number does not match.
Expand All @@ -966,7 +1013,7 @@ func (w *LogWriter) emitEOFTrailer() {
b.written.Store(i + int32(recyclableHeaderSize))
}

func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
func (w *LogWriter) emitFragmentRecyclable(n int, p []byte) (remainingP []byte) {
b := w.block
i := b.written.Load()
first := n == 0
Expand Down Expand Up @@ -1003,6 +1050,54 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
return p[r:]
}

func (w *LogWriter) emitFragmentSyncOffsets(n int, p []byte) (remainingP []byte) {
b := w.block
i := b.written.Load()
first := n == 0
last := blockSize-i-walSyncHeaderSize >= int32(len(p))

if last {
if first {
b.buf[i+6] = walSyncFullChunkEncoding
} else {
b.buf[i+6] = walSyncLastChunkEncoding
}
} else {
if first {
b.buf[i+6] = walSyncFirstChunkEncoding
} else {
b.buf[i+6] = walSyncMiddleChunkEncoding
}
}

binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
binary.LittleEndian.PutUint64(b.buf[i+11:i+19], w.syncedOffset.Load())

r := copy(b.buf[i+walSyncHeaderSize:], p)
j := i + int32(walSyncHeaderSize+r)
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
b.written.Store(j)

if blockSize-b.written.Load() < walSyncHeaderSize {
// There is no room for another fragment in the block, so fill the
// remaining bytes with zeros and queue the block for flushing.
clear(b.buf[b.written.Load():])

// When there is more than 11 bytes (recyclableHeaderSize) and strictly
// less than 19 bytes (walSyncHeaderSize) remaining, we need to mark
// this special case so that the reader will be able to correctly interpret
// what to do when it sees a zeroed header. The no-op sequence is written
// in the next 4 bytes of what would have been the header.
if blockSize-b.written.Load() >= recyclableHeaderSize {
noOpSequence := bytes.Repeat([]byte{noOpByte}, noOpSequenceLength)
copy(b.buf[b.written.Load()+7:], noOpSequence)
}
w.queueBlock()
}
return p[r:]
}

// Metrics must typically be called after Close, since the callee will no
// longer modify the returned LogWriterMetrics. It is also current if there is
// nothing left to flush in the flush loop, but that is an implementation
Expand Down
Loading

0 comments on commit 4fa7ac2

Please sign in to comment.