Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport tx error messages to v0.37.12 #7025

Open
wants to merge 2 commits into
base: v0.37.12-tx-error-messages
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 76 additions & 12 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/ingestion"
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -161,7 +163,6 @@ type AccessNodeConfig struct {
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
Expand All @@ -173,6 +174,9 @@ type AccessNodeConfig struct {
programCacheSize uint
checkPayerBalance bool
versionControlEnabled bool
storeTxResultErrorMessages bool
stopControlEnabled bool
registerDBPruneThreshold uint64
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -244,7 +248,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
apiRatelimits: nil,
apiBurstlimits: nil,
TxResultCacheSize: 0,
TxErrorMessagesCacheSize: 1000,
PublicNetworkConfig: PublicNetworkConfig{
BindAddress: cmd.NotSet,
Metrics: metrics.NewNoopCollector(),
Expand Down Expand Up @@ -276,6 +279,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
programCacheSize: 0,
checkPayerBalance: false,
versionControlEnabled: true,
storeTxResultErrorMessages: false,
stopControlEnabled: false,
registerDBPruneThreshold: pruner.DefaultThreshold,
}
}

Expand Down Expand Up @@ -343,6 +349,9 @@ type FlowAccessNodeBuilder struct {
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1229,7 +1238,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.BoolVar(&builder.retryEnabled, "retry-enabled", defaultConfig.retryEnabled, "whether to enable the retry mechanism at the access node level")
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.UintVar(&builder.TxResultCacheSize, "transaction-result-cache-size", defaultConfig.TxResultCacheSize, "transaction result cache size.(Disabled by default i.e 0)")
flags.UintVar(&builder.TxErrorMessagesCacheSize, "transaction-error-messages-cache-size", defaultConfig.TxErrorMessagesCacheSize, "transaction error messages cache size.(By default 1000)")
flags.StringVarP(&builder.nodeInfoFile,
"node-info-file",
"",
Expand Down Expand Up @@ -1359,7 +1367,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"tx-result-query-mode",
defaultConfig.rpcConf.BackendConfig.TxResultQueryMode,
"mode to use when querying transaction results. one of [local-only, execution-nodes-only(default), failover]")

flags.BoolVar(&builder.storeTxResultErrorMessages,
"store-tx-result-error-messages",
defaultConfig.storeTxResultErrorMessages,
"whether to enable storing transaction error messages into the db")
// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1464,9 +1475,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
return errors.New("circuit-breaker-restore-timeout must be greater than 0")
}
}
if builder.TxErrorMessagesCacheSize == 0 {
return errors.New("transaction-error-messages-cache-size must be greater than 0")
}

if builder.checkPayerBalance && !builder.executionDataIndexingEnabled {
return errors.New("execution-data-indexing-enabled must be set if check-payer-balance is enabled")
Expand Down Expand Up @@ -1579,7 +1587,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1768,8 +1777,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.TxResultsIndex = index.NewTransactionResultsIndex(builder.Reporter, builder.Storage.LightTransactionResults)
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
Expand All @@ -1786,6 +1795,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return nil
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.Storage.TransactionResultErrorMessages = bstorage.NewTransactionResultErrorMessages(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
}

return nil
}).
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
noop := &module.NoopReadyDoneAware{}
Expand Down Expand Up @@ -1893,6 +1909,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
Expand All @@ -1904,7 +1921,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalance: builder.checkPayerBalance,
Expand Down Expand Up @@ -1980,6 +1996,28 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
preferredENIdentifiers,
fixedENIdentifiers,
)
}

builder.IngestEng, err = ingestion.New(
node.Logger,
node.EngineRegistry,
Expand All @@ -1993,8 +2031,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.Storage.Receipts,
builder.collectionExecutedMetric,
processedBlockHeight,
processedFinalizedBlockHeight,
lastFullBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
Expand All @@ -2012,6 +2051,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return builder.RequestEng, nil
})

if builder.storeTxResultErrorMessages {
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
builder.DB,
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
})
builder.Component("transaction result error messages engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engine, err := tx_error_messages.New(
node.Logger,
node.State,
node.Storage.Headers,
processedTxErrorMessagesBlockHeight,
builder.TxResultErrorMessagesCore,
)
if err != nil {
return nil, err
}
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)

return engine, nil
})
}

if builder.supportsObserver {
builder.Component("public sync request handler", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
syncRequestHandler, err := synceng.NewRequestHandlerEngine(
Expand Down
Loading