Skip to content

Commit

Permalink
Merge pull request #6949 from onflow/leo/pebble-storage-db
Browse files Browse the repository at this point in the history
[Storage Refactor] Init pebble DB in scaffold.
  • Loading branch information
zhangchiqing authored Feb 12, 2025
2 parents f00769a + 8299048 commit c895ec9
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 42 deletions.
10 changes: 9 additions & 1 deletion cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/onflow/crypto"
Expand Down Expand Up @@ -151,6 +152,9 @@ type BaseConfig struct {
DynamicStartupEpoch string
DynamicStartupSleepInterval time.Duration
datadir string
pebbleDir string
badgerDB *badger.DB
pebbleDB *pebble.DB
secretsdir string
secretsDBEnabled bool
InsecureSecretsDB bool
Expand All @@ -164,7 +168,6 @@ type BaseConfig struct {
MetricsEnabled bool
guaranteesCacheSize uint
receiptsCacheSize uint
db *badger.DB
HeroCacheMetricsEnable bool
SyncCoreConfig chainsync.Config
CodecFactory func() network.Codec
Expand Down Expand Up @@ -198,6 +201,7 @@ type NodeConfig struct {
MetricsRegisterer prometheus.Registerer
Metrics Metrics
DB *badger.DB
PebbleDB *pebble.DB
SecretsDB *badger.DB
Storage Storage
ProtocolEvents *events.Distributor
Expand Down Expand Up @@ -253,6 +257,7 @@ type StateExcerptAtBoot struct {

func DefaultBaseConfig() *BaseConfig {
datadir := "/data/protocol"
pebbleDir := "/data/protocol-pebble"

// NOTE: if the codec used in the network component is ever changed any code relying on
// the message format specific to the codec must be updated. i.e: the AuthorizedSenderValidator.
Expand All @@ -269,6 +274,9 @@ func DefaultBaseConfig() *BaseConfig {
ObserverMode: false,
BootstrapDir: "bootstrap",
datadir: datadir,
pebbleDir: pebbleDir,
badgerDB: nil,
pebbleDB: nil,
secretsdir: NotSet,
secretsDBEnabled: true,
level: "info",
Expand Down
104 changes: 88 additions & 16 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

gcemd "cloud.google.com/go/compute/metadata"
"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/hashicorp/go-multierror"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/pflag"
"golang.org/x/time/rate"
"google.golang.org/api/option"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/onflow/flow-go/admin/commands/common"
storageCommands "github.com/onflow/flow-go/admin/commands/storage"
"github.com/onflow/flow-go/cmd/build"
"github.com/onflow/flow-go/cmd/scaffold"
"github.com/onflow/flow-go/config"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
"github.com/onflow/flow-go/fvm/initialize"
Expand Down Expand Up @@ -164,6 +167,7 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
fnb.flags.StringVar(&fnb.BaseConfig.BindAddr, "bind", defaultConfig.BindAddr, "address to bind on")
fnb.flags.StringVarP(&fnb.BaseConfig.BootstrapDir, "bootstrapdir", "b", defaultConfig.BootstrapDir, "path to the bootstrap directory")
fnb.flags.StringVarP(&fnb.BaseConfig.datadir, "datadir", "d", defaultConfig.datadir, "directory to store the public database (protocol state)")
fnb.flags.StringVar(&fnb.BaseConfig.pebbleDir, "pebble-dir", defaultConfig.pebbleDir, "directory to store the public pebble database (protocol state)")
fnb.flags.StringVar(&fnb.BaseConfig.secretsdir, "secretsdir", defaultConfig.secretsdir, "directory to store private database (secrets)")
fnb.flags.StringVarP(&fnb.BaseConfig.level, "loglevel", "l", defaultConfig.level, "level for logging output")
fnb.flags.Uint32Var(&fnb.BaseConfig.debugLogLimit, "debug-log-limit", defaultConfig.debugLogLimit, "max number of debug/trace log events per second")
Expand Down Expand Up @@ -1057,14 +1061,22 @@ func (fnb *FlowNodeBuilder) initProfiler() error {
return nil
}

func (fnb *FlowNodeBuilder) initDB() error {

// if a db has been passed in, use that instead of creating one
if fnb.BaseConfig.db != nil {
fnb.DB = fnb.BaseConfig.db
func (fnb *FlowNodeBuilder) initBadgerDB() error {
// if the badger DB is already set, use it.
// the badger DB might be set by the follower engine
if fnb.BaseConfig.badgerDB != nil {
fnb.DB = fnb.BaseConfig.badgerDB
return nil
}

// if the badger DB is not set, then the datadir must be provided to initialize
// the badger DB
// since we've set an default directory for the badger DB, this check
// is not necessary, but rather a sanity check
if fnb.BaseConfig.datadir == NotSet {
return fmt.Errorf("missing required flag '--datadir'")
}

// Pre-create DB path (Badger creates only one-level dirs)
err := os.MkdirAll(fnb.BaseConfig.datadir, 0700)
if err != nil {
Expand Down Expand Up @@ -1111,6 +1123,24 @@ func (fnb *FlowNodeBuilder) initDB() error {
return nil
}

func (fnb *FlowNodeBuilder) initPebbleDB() error {
// if the pebble DB is already set, use it
// the pebble DB might be set by the follower engine
if fnb.BaseConfig.pebbleDB != nil {
fnb.PebbleDB = fnb.BaseConfig.pebbleDB
return nil
}

db, closer, err := scaffold.InitPebbleDB(fnb.BaseConfig.pebbleDir)
if err != nil {
return err
}

fnb.PebbleDB = db
fnb.ShutdownFunc(closer.Close)
return nil
}

func (fnb *FlowNodeBuilder) initSecretsDB() error {

// if the secrets DB is disabled (only applicable for Consensus Follower,
Expand Down Expand Up @@ -1888,11 +1918,55 @@ func WithBindAddress(bindAddress string) Option {
}
}

// WithDataDir set the data directory for the badger database
// It will be ignored if WithBadgerDB is used
func WithDataDir(dataDir string) Option {
return func(config *BaseConfig) {
if config.db == nil {
config.datadir = dataDir
if config.badgerDB != nil {
log.Warn().Msgf("ignoring data directory %s as badger database is already set", dataDir)
return
}

config.datadir = dataDir
}
}

// WithBadgerDB sets the badger database instance
// If used, then WithDataDir method will be ignored
func WithBadgerDB(db *badger.DB) Option {
return func(config *BaseConfig) {
if config.datadir != "" && config.datadir != NotSet {
log.Warn().Msgf("ignoring data directory is already set for badger %v", config.datadir)
config.datadir = ""
}

config.badgerDB = db
}
}

// WithPebbleDir set the data directory for the pebble database
// It will be ignored if WithPebbleDB is used
func WithPebbleDir(dataDir string) Option {
return func(config *BaseConfig) {
if config.pebbleDB != nil {
log.Warn().Msgf("ignoring data directory %s as pebble database is already set", dataDir)
return
}

config.pebbleDir = dataDir
}
}

// WithPebbleDB sets the pebble database instance
// If used, then WithPebbleDir method will be ignored
func WithPebbleDB(db *pebble.DB) Option {
return func(config *BaseConfig) {
if config.pebbleDir != "" && config.pebbleDir != NotSet {
log.Warn().Msgf("ignoring data directory is already set for pebble %v", config.pebbleDir)
config.pebbleDir = ""
}

config.pebbleDB = db
}
}

Expand Down Expand Up @@ -1926,14 +2000,6 @@ func WithLogLevel(level string) Option {
}
}

// WithDB takes precedence over WithDataDir and datadir will be set to empty if DB is set using this option
func WithDB(db *badger.DB) Option {
return func(config *BaseConfig) {
config.db = db
config.datadir = ""
}
}

// FlowNode creates a new Flow node builder with the given name.
func FlowNode(role string, opts ...Option) *FlowNodeBuilder {
config := DefaultBaseConfig()
Expand Down Expand Up @@ -2039,7 +2105,13 @@ func (fnb *FlowNodeBuilder) onStart() error {
return err
}

if err := fnb.initDB(); err != nil {
// we always initialize both badger and pebble databases
// even if we only use one of them, this simplify the code and checks
if err := fnb.initBadgerDB(); err != nil {
return err
}

if err := fnb.initPebbleDB(); err != nil {
return err
}

Expand Down
34 changes: 34 additions & 0 deletions cmd/scaffold/pebble_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package scaffold

import (
"fmt"
"io"
"os"

"github.com/cockroachdb/pebble"

pebblestorage "github.com/onflow/flow-go/storage/pebble"
)

func InitPebbleDB(dir string) (*pebble.DB, io.Closer, error) {
// if the pebble DB is not set, we skip initialization
// the pebble DB must be provided to initialize
// since we've set an default directory for the pebble DB, this check
// is not necessary, but rather a sanity check
if dir == "not set" {
return nil, nil, fmt.Errorf("missing required flag '--pebble-dir'")
}

// Pre-create DB path
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, nil, fmt.Errorf("could not create pebble db (path: %s): %w", dir, err)
}

db, err := pebblestorage.OpenDefaultPebbleDB(dir)
if err != nil {
return nil, nil, fmt.Errorf("could not open newly created pebble db (path: %s): %w", dir, err)
}

return db, db, nil
}
25 changes: 25 additions & 0 deletions cmd/scaffold/pebble_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package scaffold_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/cmd/scaffold"
"github.com/onflow/flow-go/utils/unittest"
)

func TestInitPebbleDB(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
_, closer, err := scaffold.InitPebbleDB(dir)
require.NoError(t, err)
require.NoError(t, closer.Close())
})
}

func TestInitPebbleDBDirNotSet(t *testing.T) {
_, _, err := scaffold.InitPebbleDB(cmd.NotSet)
require.Error(t, err)
require.Contains(t, err.Error(), "missing required flag")
}
44 changes: 21 additions & 23 deletions follower/consensus_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/onflow/crypto"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -36,8 +37,8 @@ type Config struct {
networkPrivKey crypto.PrivateKey // the network private key of this node
bootstrapNodes []BootstrapNodeInfo // the bootstrap nodes to use
bindAddr string // address to bind on
db *badger.DB // the badger DB storage to use for the protocol state
dataDir string // directory to store the protocol state (if the badger storage is not provided)
badgerDB *badger.DB // badger instance
pebbleDB *pebble.DB // pebble instance
bootstrapDir string // path to the bootstrap directory
logLevel string // log level
exposeMetrics bool // whether to expose metrics
Expand All @@ -47,34 +48,28 @@ type Config struct {

type Option func(c *Config)

// WithDataDir sets the underlying directory to be used to store the database
// If a database is supplied, then data directory will be set to empty string
func WithDataDir(dataDir string) Option {
// currently used by rosetta
func WithDB(db *badger.DB) Option {
return func(cf *Config) {
if cf.db == nil {
cf.dataDir = dataDir
}
cf.badgerDB = db
}
}

func WithBootstrapDir(bootstrapDir string) Option {
func WithPebbleDB(db *pebble.DB) Option {
return func(cf *Config) {
cf.bootstrapDir = bootstrapDir
cf.pebbleDB = db
}
}

func WithLogLevel(level string) Option {
func WithBootstrapDir(bootstrapDir string) Option {
return func(cf *Config) {
cf.logLevel = level
cf.bootstrapDir = bootstrapDir
}
}

// WithDB sets the underlying database that will be used to store the chain state
// WithDB takes precedence over WithDataDir and datadir will be set to empty if DB is set using this option
func WithDB(db *badger.DB) Option {
func WithLogLevel(level string) Option {
return func(cf *Config) {
cf.db = db
cf.dataDir = ""
cf.logLevel = level
}
}

Expand Down Expand Up @@ -133,18 +128,18 @@ func getBaseOptions(config *Config) []cmd.Option {
if config.bootstrapDir != "" {
options = append(options, cmd.WithBootstrapDir(config.bootstrapDir))
}
if config.dataDir != "" {
options = append(options, cmd.WithDataDir(config.dataDir))
}
if config.bindAddr != "" {
options = append(options, cmd.WithBindAddress(config.bindAddr))
}
if config.badgerDB != nil {
options = append(options, cmd.WithBadgerDB(config.badgerDB))
}
if config.pebbleDB != nil {
options = append(options, cmd.WithPebbleDB(config.pebbleDB))
}
if config.logLevel != "" {
options = append(options, cmd.WithLogLevel(config.logLevel))
}
if config.db != nil {
options = append(options, cmd.WithDB(config.db))
}
if config.exposeMetrics {
options = append(options, cmd.WithMetricsEnabled(config.exposeMetrics))
}
Expand Down Expand Up @@ -187,6 +182,8 @@ func NewConsensusFollower(
networkPrivKey: networkPrivKey,
bootstrapNodes: bootstapIdentities,
bindAddr: bindAddr,
badgerDB: nil,
pebbleDB: nil,
logLevel: "info",
exposeMetrics: false,
}
Expand All @@ -206,6 +203,7 @@ func NewConsensusFollower(
anb.FollowerDistributor.AddOnBlockFinalizedConsumer(cf.onBlockFinalized)
cf.NodeConfig = anb.NodeConfig

// Build will initialize the database
cf.Component, err = anb.Build()
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit c895ec9

Please sign in to comment.