Skip to content

Commit 3e66b72

Browse files
authored
feat(v2): metastore snapshot compression (#4052)
1 parent cd85f97 commit 3e66b72

File tree

5 files changed

+120
-28
lines changed

5 files changed

+120
-28
lines changed

pkg/experiment/metastore/fsm/boltdb.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
boltDBCompactedName = "metastore_compacted.boltdb"
2020
boltDBInitialMmapSize = 1 << 30
2121

22-
boltDBCompactionMaxTxnSize = 64 << 10
22+
boltDBCompactionMaxTxnSize = 1 << 20
2323
)
2424

2525
type boltdb struct {
@@ -127,7 +127,7 @@ func (db *boltdb) restore(snapshot io.Reader) error {
127127
}
128128
}
129129
restored.shutdown()
130-
err = os.RemoveAll(restored.path)
130+
_ = os.RemoveAll(restored.path)
131131
}
132132
if err != nil {
133133
return fmt.Errorf("failed to restore snapshot: %w", err)

pkg/experiment/metastore/fsm/fsm.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package fsm
33
import (
44
"context"
55
"encoding/binary"
6+
"flag"
67
"fmt"
78
"io"
89
"strconv"
@@ -36,9 +37,22 @@ type StateRestorer interface {
3637
Restore(*bbolt.Tx) error
3738
}
3839

40+
type Config struct {
41+
SnapshotCompression string `yaml:"snapshot_compression"`
42+
// Where the FSM BoltDB data is located.
43+
// Does not have to be a persistent volume.
44+
DataDir string `yaml:"data_dir"`
45+
}
46+
47+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
48+
f.StringVar(&cfg.SnapshotCompression, prefix+"snapshot-compression", "zstd", "Compression algorithm to use for snapshots. Supported compressions: zstd.")
49+
f.StringVar(&cfg.DataDir, prefix+"data-dir", "./data-metastore/data", "Directory to store the data.")
50+
}
51+
3952
// FSM implements the raft.FSM interface.
4053
type FSM struct {
4154
logger log.Logger
55+
config Config
4256
metrics *metrics
4357

4458
mu sync.RWMutex
@@ -54,13 +68,14 @@ type FSM struct {
5468

5569
type handler func(tx *bbolt.Tx, cmd *raft.Log, raw []byte) (proto.Message, error)
5670

57-
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*FSM, error) {
71+
func New(logger log.Logger, reg prometheus.Registerer, config Config) (*FSM, error) {
5872
fsm := FSM{
5973
logger: logger,
74+
config: config,
6075
metrics: newMetrics(reg),
6176
handlers: make(map[RaftLogEntryType]handler),
6277
}
63-
db := newDB(logger, fsm.metrics, dir)
78+
db := newDB(logger, fsm.metrics, config.DataDir)
6479
if err := db.open(false); err != nil {
6580
return nil, err
6681
}
@@ -133,29 +148,42 @@ func (fsm *FSM) restore() error {
133148
// Restore restores the FSM state from a snapshot.
134149
func (fsm *FSM) Restore(snapshot io.ReadCloser) (err error) {
135150
start := time.Now()
136-
_ = level.Info(fsm.logger).Log("msg", "restoring snapshot")
151+
level.Info(fsm.logger).Log("msg", "restoring snapshot")
137152
defer func() {
138153
_ = snapshot.Close()
139154
fsm.db.metrics.fsmRestoreSnapshotDuration.Observe(time.Since(start).Seconds())
140155
}()
156+
157+
level.Info(fsm.logger).Log("msg", "restoring snapshot")
158+
var r *snapshotReader
159+
if r, err = newSnapshotReader(snapshot); err != nil {
160+
level.Error(fsm.logger).Log("msg", "failed to create snapshot reader", "err", err)
161+
return err
162+
}
163+
// The wrapper never returns errors on Close.
164+
defer r.Close()
165+
141166
// Block all new transactions until we restore the snapshot.
142167
// TODO(kolesnikovae): set not-serving service status to not
143168
// block incoming requests.
144169
fsm.mu.Lock()
145170
defer fsm.mu.Unlock()
146171
fsm.txns.Wait()
147-
if err = fsm.db.restore(snapshot); err != nil {
148-
return fmt.Errorf("failed to restore database from snapshot: %w", err)
172+
if err = fsm.db.restore(r); err != nil {
173+
level.Error(fsm.logger).Log("msg", "failed to restore database from snapshot", "err", err)
174+
return err
149175
}
150176
// First we need to initialize the state: each restorer is called
151177
// synchronously and has exclusive access to the database.
152178
if err = fsm.init(); err != nil {
153-
return fmt.Errorf("failed to init state at restore: %w", err)
179+
level.Error(fsm.logger).Log("msg", "failed to init state at restore", "err", err)
180+
return err
154181
}
155182
// Then we restore the state: each restorer is given its own
156183
// transaction and run concurrently with others.
157184
if err = fsm.restore(); err != nil {
158-
return fmt.Errorf("failed to restore state from snapshot: %w", err)
185+
level.Error(fsm.logger).Log("msg", "failed to restore state from snapshot", "err", err)
186+
return err
159187
}
160188
return nil
161189
}
@@ -267,7 +295,11 @@ func (fsm *FSM) Read(fn func(*bbolt.Tx)) error {
267295
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
268296
// Snapshot should only capture a pointer to the state, and any
269297
// expensive IO should happen as part of FSMSnapshot.Persist.
270-
s := snapshot{logger: fsm.logger, metrics: fsm.metrics}
298+
s := snapshotWriter{
299+
logger: fsm.logger,
300+
metrics: fsm.metrics,
301+
compression: fsm.config.SnapshotCompression,
302+
}
271303
tx, err := fsm.db.boltdb.Begin(false)
272304
if err != nil {
273305
return nil, fmt.Errorf("failed to open a transaction for snapshot: %w", err)

pkg/experiment/metastore/fsm/snapshot.go

+74-12
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
package fsm
22

33
import (
4+
"bufio"
5+
"bytes"
46
"context"
7+
"io"
58
"runtime/pprof"
69
"time"
710

811
"github.com/go-kit/log"
912
"github.com/go-kit/log/level"
1013
"github.com/hashicorp/raft"
14+
"github.com/klauspost/compress/zstd"
1115
"go.etcd.io/bbolt"
1216
)
1317

14-
type snapshot struct {
15-
logger log.Logger
16-
tx *bbolt.Tx
17-
metrics *metrics
18+
type snapshotWriter struct {
19+
logger log.Logger
20+
tx *bbolt.Tx
21+
metrics *metrics
22+
compression string
23+
written int64
1824
}
1925

20-
func (s *snapshot) Persist(sink raft.SnapshotSink) (err error) {
26+
func (s *snapshotWriter) Persist(sink raft.SnapshotSink) (err error) {
2127
ctx := context.Background()
2228
pprof.SetGoroutineLabels(pprof.WithLabels(ctx, pprof.Labels("metastore_op", "persist")))
2329
defer pprof.SetGoroutineLabels(ctx)
@@ -39,19 +45,75 @@ func (s *snapshot) Persist(sink raft.SnapshotSink) (err error) {
3945
}
4046
}()
4147

42-
level.Info(s.logger).Log("msg", "persisting snapshot")
43-
var n int64
44-
n, err = s.tx.WriteTo(sink)
45-
s.metrics.boltDBPersistSnapshotSize.Set(float64(n))
46-
if err != nil {
48+
var dst io.Writer
49+
// Needed to measure the actual snapshot size written to the sink.
50+
w := &writeCounter{Writer: sink}
51+
if s.compression == "zstd" {
52+
// We do not want to bog down the CPU with compression: we are fine with
53+
// slowing down the snapshotting process, as it potentially may affect the
54+
// WAL writes if they are located on the same disk.
55+
var enc *zstd.Encoder
56+
if enc, err = zstd.NewWriter(w, zstd.WithEncoderConcurrency(1)); err != nil {
57+
level.Error(s.logger).Log("msg", "failed to create zstd encoder", "err", err)
58+
return err
59+
}
60+
dst = enc
61+
defer func() {
62+
if err = enc.Close(); err != nil {
63+
level.Error(s.logger).Log("msg", "zstd compression failed", "err", err)
64+
}
65+
}()
66+
}
67+
68+
level.Info(s.logger).Log("msg", "persisting snapshot", "compression", s.compression)
69+
if _, err = s.tx.WriteTo(dst); err != nil {
4770
level.Error(s.logger).Log("msg", "failed to write snapshot", "err", err)
71+
return err
4872
}
49-
return err
73+
74+
s.metrics.boltDBPersistSnapshotSize.Set(float64(w.written))
75+
return nil
5076
}
5177

52-
func (s *snapshot) Release() {
78+
type writeCounter struct {
79+
io.Writer
80+
written int64
81+
}
82+
83+
func (w *writeCounter) Write(p []byte) (int, error) {
84+
n, err := w.Writer.Write(p)
85+
w.written += int64(n)
86+
return n, err
87+
}
88+
89+
func (s *snapshotWriter) Release() {
5390
if s.tx != nil {
5491
// This is an in-memory rollback, no error expected.
5592
_ = s.tx.Rollback()
5693
}
5794
}
95+
96+
type snapshotReader struct {
97+
io.ReadCloser
98+
}
99+
100+
var zstdMagic = []byte{0x28, 0xB5, 0x2F, 0xFD}
101+
102+
func newSnapshotReader(snapshot io.ReadCloser) (*snapshotReader, error) {
103+
b := bufio.NewReader(snapshot)
104+
magic, err := b.Peek(4)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
s := snapshotReader{ReadCloser: io.NopCloser(b)}
110+
if bytes.Equal(magic, zstdMagic) {
111+
var dec *zstd.Decoder
112+
if dec, err = zstd.NewReader(b); err != nil {
113+
return nil, err
114+
}
115+
s.ReadCloser = dec.IOReadCloser()
116+
}
117+
118+
return &s, nil
119+
}

pkg/experiment/metastore/metastore.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ import (
3333
type Config struct {
3434
Address string `yaml:"address"`
3535
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."`
36-
DataDir string `yaml:"data_dir"`
3736
MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
3837
Raft raft.Config `yaml:"raft"`
38+
FSM fsm.Config `yaml:",inline" category:"advanced"`
3939
Index index.Config `yaml:",inline" category:"advanced"`
4040
DLQRecovery dlq.RecoveryConfig `yaml:",inline" category:"advanced"`
4141
Compactor compactor.Config `yaml:",inline" category:"advanced"`
@@ -45,10 +45,10 @@ type Config struct {
4545
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4646
const prefix = "metastore."
4747
f.StringVar(&cfg.Address, prefix+"address", "localhost:9095", "")
48-
f.StringVar(&cfg.DataDir, prefix+"data-dir", "./data-metastore/data", "")
4948
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive some (DNS?) updates.")
5049
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc-client-config", f)
5150
cfg.Raft.RegisterFlagsWithPrefix(prefix+"raft.", f)
51+
cfg.FSM.RegisterFlagsWithPrefix(prefix, f)
5252
cfg.Compactor.RegisterFlagsWithPrefix(prefix, f)
5353
cfg.Scheduler.RegisterFlagsWithPrefix(prefix, f)
5454
cfg.Index.RegisterFlagsWithPrefix(prefix, f)
@@ -117,9 +117,7 @@ func New(
117117
}
118118

119119
var err error
120-
121-
m.fsm, err = fsm.New(m.logger, m.reg, m.config.DataDir)
122-
if err != nil {
120+
if m.fsm, err = fsm.New(m.logger, m.reg, m.config.FSM); err != nil {
123121
return nil, fmt.Errorf("failed to initialize store: %w", err)
124122
}
125123

pkg/experiment/metastore/test/create.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewMetastoreSet(t *testing.T, cfg *metastore.Config, n int, bucket objstore
5353
icfg := *cfg
5454
icfg.MinReadyDuration = 0
5555
icfg.Address = grpcAddresses[i]
56-
icfg.DataDir = t.TempDir()
56+
icfg.FSM.DataDir = t.TempDir()
5757
icfg.Raft.ServerID = raftIds[i]
5858
icfg.Raft.Dir = t.TempDir()
5959
icfg.Raft.AdvertiseAddress = raftAddresses[i]

0 commit comments

Comments
 (0)