Skip to content

Commit

Permalink
Revert "Merge pull request #6499 from The-K-R-O-K/UlyanaAndrukhiv/649…
Browse files Browse the repository at this point in the history
…7-refactor-executionNodesForBlockID"

This reverts commit 1a793e8.
  • Loading branch information
illia-malachyn committed Feb 7, 2025
1 parent 409565d commit 83dbe69
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 807 deletions.
151 changes: 67 additions & 84 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ type FlowAccessNodeBuilder struct {
stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1436,18 +1435,14 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress
var lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}

ingestionDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(ingestionDependable)
versionControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(versionControlDependable)
stopControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(stopControlDependable)
var lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

builder.
BuildConsensusFollower().
Expand Down Expand Up @@ -1627,14 +1622,14 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
rootBlockHeight, err := node.State.Params().FinalizedRoot()
rootBlock, err := node.State.Params().FinalizedRoot()
if err != nil {
return fmt.Errorf("could not get root block height: %w", err)
}

lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
rootBlockHeight.Height,
rootBlock.Height,
)
if err != nil {
return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err)
Expand All @@ -1644,11 +1639,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(
node.Metrics.Cache,
node.DB,
bstorage.DefaultCacheSize,
)
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
}

return nil
Expand Down Expand Up @@ -1706,56 +1697,39 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("transaction result query mode 'compare' is not supported")
}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
ExecNodeIdentitiesProvider: builder.ExecNodeIdentitiesProvider,
back, err := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
}
builder.nodeBackend = back

engineBuilder, err := rpc.NewBuilder(
node.Logger,
Expand Down Expand Up @@ -1804,12 +1778,25 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
builder.ExecNodeIdentitiesProvider,
preferredENIdentifiers,
fixedENIdentifiers,
)
}

Expand Down Expand Up @@ -1905,25 +1892,23 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.pingEnabled {
builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
node.IdentityProvider,
node.IDTranslator,
node.Me,
builder.PingMetrics,
builder.pingEnabled,
builder.nodeInfoFile,
node.PingService,
)
if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}
builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
node.IdentityProvider,
node.IDTranslator,
node.Me,
builder.PingMetrics,
builder.pingEnabled,
builder.nodeInfoFile,
node.PingService,
)
if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}

return ping, nil
})
}
return ping, nil
})

return builder.FlowNodeBuilder.Build()
}
Expand Down Expand Up @@ -2018,11 +2003,9 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
// Returns:
// - The libp2p node instance for the public network.
// - Any error encountered during initialization. Any error should be considered fatal.
func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(
networkKey crypto.PrivateKey,
bindAddress string,
networkMetrics module.LibP2PMetrics,
) (p2p.LibP2PNode, error) {
func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.PrivateKey, bindAddress string, networkMetrics module.LibP2PMetrics) (p2p.LibP2PNode,
error,
) {
connManager, err := connection.NewConnManager(builder.Logger, networkMetrics, &builder.FlowConfig.NetworkConfig.ConnectionManager)
if err != nil {
return nil, fmt.Errorf("could not create connection manager: %w", err)
Expand Down
55 changes: 18 additions & 37 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

"github.com/onflow/flow/protobuf/go/flow/access"

"github.com/onflow/flow-go/crypto"

"github.com/onflow/flow-go/admin/commands"
stateSyncCommands "github.com/onflow/flow-go/admin/commands/state_synchronization"
"github.com/onflow/flow-go/cmd"
Expand All @@ -39,6 +37,7 @@ import (
hotstuffvalidator "github.com/onflow/flow-go/consensus/hotstuff/validator"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/apiproxy"
"github.com/onflow/flow-go/engine/access/rest"
Expand All @@ -50,7 +49,6 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/common/follower"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/execution/computation/query"
"github.com/onflow/flow-go/engine/protocol"
Expand Down Expand Up @@ -1562,41 +1560,24 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
),
}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

accessBackend, err := backend.New(backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down
Loading

0 comments on commit 83dbe69

Please sign in to comment.