Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage Refactor] Refactor saving execution results #6906

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0c5345f
refactor events
zhangchiqing Jan 13, 2025
f62e8e7
add events
zhangchiqing Jan 13, 2025
0276508
transaction results and events
zhangchiqing Jan 13, 2025
ae289b8
refactor receipts results transactions results
zhangchiqing Jan 14, 2025
2be0689
refactor commits operation
zhangchiqing Jan 14, 2025
7fee0e6
refactor commits
zhangchiqing Jan 14, 2025
80b08bf
refactor state
zhangchiqing Jan 15, 2025
9c88816
refactor executed
zhangchiqing Jan 16, 2025
706a76c
fix roll back
zhangchiqing Jan 17, 2025
56ddcaf
fix execution state
zhangchiqing Jan 17, 2025
ed6e97d
fix tests
zhangchiqing Jan 17, 2025
bb99b85
fix tests and mocks
zhangchiqing Jan 18, 2025
2fdc19b
fix tests
zhangchiqing Jan 18, 2025
23bcaf8
fix transaction_results
zhangchiqing Jan 18, 2025
c5c9c9b
add receipts to be used by payloads
zhangchiqing Jan 18, 2025
e020c45
update badger.All
zhangchiqing Jan 18, 2025
5c19365
move execution receipts
zhangchiqing Jan 18, 2025
a3aebb1
fix bootstrap
zhangchiqing Jan 21, 2025
e20d127
fix execution bootstrapper
zhangchiqing Jan 21, 2025
f6fa66b
fix access tests
zhangchiqing Jan 21, 2025
bd2f97c
implement execution storage layer
zhangchiqing Jan 21, 2025
f567af3
refactor results storage
zhangchiqing Jan 21, 2025
7bb9546
fix results
zhangchiqing Jan 21, 2025
9251392
fix transaction results tests
zhangchiqing Jan 22, 2025
ab10668
fix badger.All
zhangchiqing Jan 22, 2025
1ec4e69
fix tests
zhangchiqing Jan 22, 2025
eb22e23
update execution state
zhangchiqing Jan 22, 2025
1a29adc
fix rollback tests
zhangchiqing Jan 22, 2025
1186df0
fix access tests
zhangchiqing Jan 22, 2025
4ab489d
fix testutil nodes
zhangchiqing Jan 22, 2025
652756a
fix execution state extract
zhangchiqing Jan 22, 2025
185cb28
fix read blocks
zhangchiqing Jan 22, 2025
750bc76
fix util
zhangchiqing Jan 22, 2025
a43a21b
fix lint
zhangchiqing Jan 22, 2025
80b7d33
fix execution builder
zhangchiqing Jan 22, 2025
178ec55
fix access_node_builder
zhangchiqing Jan 22, 2025
9c19a09
fix observer_builder
zhangchiqing Jan 22, 2025
2672842
remove unused
zhangchiqing Jan 22, 2025
839c717
fix lint
zhangchiqing Jan 22, 2025
9297c73
refactor badgerimpl.ToDB
zhangchiqing Feb 12, 2025
0fbe3a2
fix events
zhangchiqing Feb 12, 2025
e9fe482
fix indexer core
zhangchiqing Feb 12, 2025
b7462c1
fix mocks
zhangchiqing Feb 13, 2025
dfc9b00
fix bootstrapping
zhangchiqing Feb 13, 2025
2532b07
fix GetLastExecutedBlock
zhangchiqing Feb 14, 2025
598a21a
fix tests
zhangchiqing Feb 14, 2025
8eebce7
remove unused myreceipt.Store method, making save execution result
zhangchiqing Feb 21, 2025
930aacf
fix mocks
zhangchiqing Feb 21, 2025
d96a8ce
remove concurrent test
zhangchiqing Feb 21, 2025
dedf906
reuse store methods in receipts and results
zhangchiqing Feb 21, 2025
a323c20
remove test case
zhangchiqing Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ type FlowAccessNodeBuilder struct {
VersionControl *version.VersionControl
StopControl *stop.StopControl

// storage
events storage.Events
lightTransactionResults storage.LightTransactionResults
transactionResultErrorMessages storage.TransactionResultErrorMessages
Comment on lines +350 to +352
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access node syncs events, transaction results from EN with indexer engine, I'm refactoring them altogether along with the execution node's saving results process.


// The sync engine participants provider is the libp2p peer store for the access node
// which is not available until after the network has started.
// Hence, a factory function that needs to be called just before creating the sync engine
Expand Down Expand Up @@ -872,7 +877,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProtocolDB is a storage abstraction, so that we can change in a single place to decide using badger or pebble.

return nil
}).
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
Expand Down Expand Up @@ -961,13 +966,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
indexerCore, err := indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
builder.DB,
builder.ProtocolDB,
builder.Storage.RegisterIndex,
builder.Storage.Headers,
builder.Storage.Events,
builder.events,
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.lightTransactionResults,
builder.RootChainID.Chain(),
indexerDerivedChainData,
builder.collectionExecutedMetric,
Expand Down Expand Up @@ -1834,19 +1839,19 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
}).
Module("reporter", func(node *cmd.NodeConfig) error {
builder.Reporter = index.NewReporter()
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
builder.EventsIndex = index.NewEventsIndex(builder.Reporter, builder.Storage.Events)
builder.EventsIndex = index.NewEventsIndex(builder.Reporter, builder.events)
return nil
}).
Module("transaction result index", func(node *cmd.NodeConfig) error {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.Storage.LightTransactionResults)
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.lightTransactionResults)
return nil
}).
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
Expand All @@ -1870,7 +1875,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
}

return nil
Expand Down Expand Up @@ -2030,7 +2035,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
TxResultErrorMessages: builder.transactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
Expand Down Expand Up @@ -2116,7 +2121,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
builder.nodeBackend,
node.Storage.TransactionResultErrorMessages,
builder.transactionResultErrorMessages,
builder.ExecNodeIdentitiesProvider,
)
}
Expand Down
81 changes: 52 additions & 29 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
storageerr "github.com/onflow/flow-go/storage"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/procedure"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
Expand Down Expand Up @@ -128,17 +128,22 @@ type ExecutionNode struct {

ingestionUnit *engine.Unit

collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
ledgerStorage *ledger.Ledger
registerStore *storehouse.RegisterStore
events *storage.Events
serviceEvents *storage.ServiceEvents
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
ledgerStorage *ledger.Ledger
registerStore *storehouse.RegisterStore

// storage
events storageerr.Events
serviceEvents storageerr.ServiceEvents
txResults storageerr.TransactionResults
results storageerr.ExecutionResults
receipts storageerr.ExecutionReceipts
myReceipts storageerr.MyExecutionReceipts
commits storageerr.Commits

providerEngine exeprovider.ProviderEngine
checkerEng *checker.Engine
syncCore *chainsync.Core
Expand Down Expand Up @@ -202,7 +207,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Module("system specs", exeNode.LoadSystemSpecs).
Module("execution metrics", exeNode.LoadExecutionMetrics).
Module("sync core", exeNode.LoadSyncCore).
Module("execution receipts storage", exeNode.LoadExecutionReceiptsStorage).
Module("execution storage", exeNode.LoadExecutionStorage).
Module("follower distributor", exeNode.LoadFollowerDistributor).
Module("authorization checking function", exeNode.LoadAuthorizationCheckingFunction).
Module("execution data datastore", exeNode.LoadExecutionDataDatastore).
Expand Down Expand Up @@ -279,9 +284,9 @@ func (exeNode *ExecutionNode) LoadExecutionMetrics(node *NodeConfig) error {
// report the highest executed block height as soon as possible
// this is guaranteed to exist because LoadBootstrapper has inserted
// the root block as executed block
var height uint64
var blockID flow.Identifier
err := node.DB.View(procedure.GetLastExecutedBlock(&height, &blockID))
reader := node.ProtocolDB.Reader()
err := operation.RetrieveExecutedBlock(reader, &blockID)
if err != nil {
// database has not been bootstrapped yet
if errors.Is(err, storageerr.ErrNotFound) {
Expand All @@ -290,7 +295,12 @@ func (exeNode *ExecutionNode) LoadExecutionMetrics(node *NodeConfig) error {
return fmt.Errorf("could not get highest executed block: %w", err)
}

exeNode.collector.ExecutionLastExecutedBlockHeight(height)
executed, err := node.Storage.Headers.ByBlockID(blockID)
if err != nil {
return fmt.Errorf("could not get header by id: %v: %w", blockID, err)
}

exeNode.collector.ExecutionLastExecutedBlockHeight(executed.Height)
return nil
}

Expand All @@ -300,11 +310,19 @@ func (exeNode *ExecutionNode) LoadSyncCore(node *NodeConfig) error {
return err
}

func (exeNode *ExecutionNode) LoadExecutionReceiptsStorage(
func (exeNode *ExecutionNode) LoadExecutionStorage(
node *NodeConfig,
) error {
exeNode.results = storage.NewExecutionResults(node.Metrics.Cache, node.DB)
exeNode.myReceipts = storage.NewMyExecutionReceipts(node.Metrics.Cache, node.DB, node.Storage.Receipts.(*storage.ExecutionReceipts))
db := node.ProtocolDB
exeNode.commits = store.NewCommits(node.Metrics.Cache, db)
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
exeNode.serviceEvents = store.NewServiceEvents(node.Metrics.Cache, db)
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
return nil
}

Expand Down Expand Up @@ -438,7 +456,7 @@ func (exeNode *ExecutionNode) LoadGCPBlockDataUploader(
retryableUploader := uploader.NewBadgerRetryableUploaderWrapper(
asyncUploader,
node.Storage.Blocks,
node.Storage.Commits,
exeNode.commits,
node.Storage.Collections,
exeNode.events,
exeNode.results,
Expand Down Expand Up @@ -743,14 +761,18 @@ func (exeNode *ExecutionNode) LoadExecutionState(
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
pebbleimpl.ToDB(chunkDataPackDB), node.Storage.Collections, exeNode.exeConf.chunkDataPackCacheSize)

// Needed for gRPC server, make sure to assign to main scoped vars
exeNode.events = storage.NewEvents(node.Metrics.Cache, node.DB)
exeNode.serviceEvents = storage.NewServiceEvents(node.Metrics.Cache, node.DB)
exeNode.txResults = storage.NewTransactionResults(node.Metrics.Cache, node.DB, exeNode.exeConf.transactionResultsCacheSize)
getLatestFinalized := func() (uint64, error) {
final, err := node.State.Final().Head()
if err != nil {
return 0, err
}

return final.Height, nil
}

exeNode.executionState = state.NewExecutionState(
exeNode.ledgerStorage,
node.Storage.Commits,
exeNode.commits,
node.Storage.Blocks,
node.Storage.Headers,
node.Storage.Collections,
Expand All @@ -760,7 +782,8 @@ func (exeNode *ExecutionNode) LoadExecutionState(
exeNode.events,
exeNode.serviceEvents,
exeNode.txResults,
node.DB,
node.ProtocolDB,
getLatestFinalized,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the execution state to implement getting highest finalized and executed block.

node.Tracer,
exeNode.registerStore,
exeNode.exeConf.enableStorehouse,
Expand Down Expand Up @@ -1316,7 +1339,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
exeNode.events,
exeNode.results,
exeNode.txResults,
node.Storage.Commits,
exeNode.commits,
exeNode.metricsProvider,
node.RootChainID,
signature.NewBlockSignerDecoder(exeNode.committee),
Expand All @@ -1330,7 +1353,7 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
// check if the execution database already exists
bootstrapper := bootstrap.NewBootstrapper(node.Logger)

commit, bootstrapped, err := bootstrapper.IsBootstrapped(node.DB)
commit, bootstrapped, err := bootstrapper.IsBootstrapped(node.ProtocolDB)
if err != nil {
return fmt.Errorf("could not query database to know whether database has been bootstrapped: %w", err)
}
Expand All @@ -1355,7 +1378,7 @@ func (exeNode *ExecutionNode) LoadBootstrapper(node *NodeConfig) error {
return fmt.Errorf("could not load bootstrap state from checkpoint file: %w", err)
}

err = bootstrapper.BootstrapExecutionDatabase(node.DB, node.RootSeal)
err = bootstrapper.BootstrapExecutionDatabase(node.ProtocolDB, node.RootSeal)
if err != nil {
return fmt.Errorf("could not bootstrap execution database: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/grpcutils"
)
Expand Down Expand Up @@ -202,6 +203,7 @@ type NodeConfig struct {
Metrics Metrics
DB *badger.DB
PebbleDB *pebble.DB
ProtocolDB storage.DB
SecretsDB *badger.DB
Storage Storage
ProtocolEvents *events.Distributor
Expand Down
18 changes: 11 additions & 7 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ type ObserverServiceBuilder struct {
EventsIndex *index.EventsIndex
ScriptExecutor *backend.ScriptExecutor

// storage
events storage.Events
lightTransactionResults storage.LightTransactionResults

// available until after the network has started. Hence, a factory function that needs to be called just before
// creating the sync engine
SyncEngineParticipantsProviderFactory func() module.IdentifierProvider
Expand Down Expand Up @@ -1326,7 +1330,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
return nil
}).DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Note: using a DependableComponent here to ensure that the indexer does not block
Expand Down Expand Up @@ -1415,13 +1419,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
indexerCore, err := indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
builder.DB,
builder.ProtocolDB,
builder.Storage.RegisterIndex,
builder.Storage.Headers,
builder.Storage.Events,
builder.events,
builder.Storage.Collections,
builder.Storage.Transactions,
builder.Storage.LightTransactionResults,
builder.lightTransactionResults,
builder.RootChainID.Chain(),
indexerDerivedChainData,
collectionExecutedMetric,
Expand Down Expand Up @@ -1769,19 +1773,19 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
return nil
})
builder.Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
})
builder.Module("reporter", func(node *cmd.NodeConfig) error {
builder.Reporter = index.NewReporter()
return nil
})
builder.Module("events index", func(node *cmd.NodeConfig) error {
builder.EventsIndex = index.NewEventsIndex(builder.Reporter, builder.Storage.Events)
builder.EventsIndex = index.NewEventsIndex(builder.Reporter, builder.events)
return nil
})
builder.Module("transaction result index", func(node *cmd.NodeConfig) error {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.Storage.LightTransactionResults)
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.lightTransactionResults)
return nil
})
builder.Module("script executor", func(node *cmd.NodeConfig) error {
Expand Down
13 changes: 8 additions & 5 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
sutil "github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/logging"
)
Expand Down Expand Up @@ -1108,9 +1109,12 @@ func (fnb *FlowNodeBuilder) initBadgerDB() error {
return fmt.Errorf("could not open public db: %w", err)
}
fnb.DB = publicDB
// set badger db as protocol db
// TODO: making it dynamic to switch between badger and pebble
fnb.ProtocolDB = badgerimpl.ToDB(publicDB)

fnb.ShutdownFunc(func() error {
if err := fnb.DB.Close(); err != nil {
if err := publicDB.Close(); err != nil {
return fmt.Errorf("error closing protocol database: %w", err)
}
return nil
Expand Down Expand Up @@ -1222,7 +1226,6 @@ func (fnb *FlowNodeBuilder) initStorage() error {
collections := bstorage.NewCollections(fnb.DB, transactions)
setups := bstorage.NewEpochSetups(fnb.Metrics.Cache, fnb.DB)
epochCommits := bstorage.NewEpochCommits(fnb.Metrics.Cache, fnb.DB)
commits := bstorage.NewCommits(fnb.Metrics.Cache, fnb.DB)
protocolState := bstorage.NewEpochProtocolStateEntries(fnb.Metrics.Cache, setups, epochCommits, fnb.DB,
bstorage.DefaultEpochProtocolStateCacheSize, bstorage.DefaultProtocolStateIndexCacheSize)
protocolKVStores := bstorage.NewProtocolKVStore(fnb.Metrics.Cache, fnb.DB,
Expand All @@ -1232,8 +1235,6 @@ func (fnb *FlowNodeBuilder) initStorage() error {
fnb.Storage = Storage{
Headers: headers,
Guarantees: guarantees,
Receipts: receipts,
Results: results,
Seals: seals,
Index: index,
Payloads: payloads,
Expand All @@ -1246,7 +1247,9 @@ func (fnb *FlowNodeBuilder) initStorage() error {
VersionBeacons: versionBeacons,
EpochProtocolStateEntries: protocolState,
ProtocolKVStore: protocolKVStores,
Commits: commits,

Results: results,
Receipts: receipts,
}

return nil
Expand Down
Loading
Loading