Skip to content

Commit

Permalink
Merge pull request #6202 from The-K-R-O-K/AndriiSlisarchuk/5790-add-s…
Browse files Browse the repository at this point in the history
…top-control

[Access] Stop Control feature for AN
  • Loading branch information
peterargue authored Sep 12, 2024
2 parents 9653906 + 2b4d7d0 commit df5769f
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 10 deletions.
32 changes: 32 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -173,6 +174,7 @@ type AccessNodeConfig struct {
programCacheSize uint
checkPayerBalance bool
versionControlEnabled bool
stopControlEnabled bool
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -276,6 +278,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
programCacheSize: 0,
checkPayerBalance: false,
versionControlEnabled: true,
stopControlEnabled: false,
}
}

Expand Down Expand Up @@ -325,6 +328,7 @@ type FlowAccessNodeBuilder struct {
ExecutionDatastoreManager edstorage.DatastoreManager
ExecutionDataTracker tracker.Storage
VersionControl *version.VersionControl
StopControl *stop.StopControl

// The sync engine participants provider is the libp2p peer store for the access node
// which is not available until after the network has started.
Expand Down Expand Up @@ -994,6 +998,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

if builder.stopControlEnabled {
builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer)
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
}
Expand Down Expand Up @@ -1260,6 +1268,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"version-control-enabled",
defaultConfig.versionControlEnabled,
"whether to enable the version control feature. Default value is true")
flags.BoolVar(&builder.stopControlEnabled,
"stop-control-enabled",
defaultConfig.stopControlEnabled,
"whether to enable the stop control feature. Default value is false")
// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled,
"execution-data-sync-enabled",
Expand Down Expand Up @@ -1590,6 +1602,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.IndexerDependencies.Add(ingestionDependable)
versionControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(versionControlDependable)
stopControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(stopControlDependable)
var lastFullBlockHeight *counters.PersistentStrictMonotonicCounter

builder.
Expand Down Expand Up @@ -1824,6 +1838,24 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return versionControl, nil
}).
Component("stop control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.stopControlEnabled {
noop := &module.NoopReadyDoneAware{}
stopControlDependable.Init(noop)
return noop, nil
}

stopControl := stop.NewStopControl(
builder.Logger,
)

builder.VersionControl.AddVersionUpdatesConsumer(stopControl.OnVersionUpdate)

builder.StopControl = stopControl
stopControlDependable.Init(builder.StopControl)

return stopControl, nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down
33 changes: 33 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
"github.com/onflow/flow-go/engine/execution/computation/query"
Expand Down Expand Up @@ -164,6 +165,7 @@ type ObserverServiceConfig struct {
executionDataPruningInterval time.Duration
localServiceAPIEnabled bool
versionControlEnabled bool
stopControlEnabled bool
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
Expand Down Expand Up @@ -239,6 +241,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
executionDataPruningInterval: pruner.DefaultPruningInterval,
localServiceAPIEnabled: false,
versionControlEnabled: true,
stopControlEnabled: false,
executionDataDir: filepath.Join(homedir, ".flow", "execution_data"),
executionDataStartHeight: 0,
executionDataConfig: edrequester.ExecutionDataConfig{
Expand Down Expand Up @@ -280,6 +283,7 @@ type ObserverServiceBuilder struct {
TxResultsIndex *index.TransactionResultsIndex
IndexerDependencies *cmd.DependencyList
VersionControl *version.VersionControl
StopControl *stop.StopControl

ExecutionDataDownloader execution_data.Downloader
ExecutionDataRequester state_synchronization.ExecutionDataRequester
Expand Down Expand Up @@ -681,6 +685,10 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"version-control-enabled",
defaultConfig.versionControlEnabled,
"whether to enable the version control feature. Default value is true")
flags.BoolVar(&builder.stopControlEnabled,
"stop-control-enabled",
defaultConfig.stopControlEnabled,
"whether to enable the stop control feature. Default value is false")
flags.BoolVar(&builder.localServiceAPIEnabled, "local-service-api-enabled", defaultConfig.localServiceAPIEnabled, "whether to use local indexed data for api queries")
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")
Expand Down Expand Up @@ -1523,6 +1531,10 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return nil, err
}

if builder.stopControlEnabled {
builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer)
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
}
Expand Down Expand Up @@ -1826,6 +1838,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {

versionControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(versionControlDependable)
stopControlDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(stopControlDependable)

builder.Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
Expand Down Expand Up @@ -1859,6 +1873,25 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {

return versionControl, nil
})
builder.Component("stop control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.stopControlEnabled {
noop := &module.NoopReadyDoneAware{}
stopControlDependable.Init(noop)
return noop, nil
}

stopControl := stop.NewStopControl(
builder.Logger,
)

builder.VersionControl.AddVersionUpdatesConsumer(stopControl.OnVersionUpdate)

builder.StopControl = stopControl
stopControlDependable.Init(builder.StopControl)

return stopControl, nil
})

builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
accessMetrics := builder.AccessMetrics
config := builder.rpcConf
Expand Down
9 changes: 3 additions & 6 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,7 @@ func (suite *Suite) TestGetSealedTransaction() {
// create the ingest engine
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)

ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections, transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
require.NoError(suite.T(), err)

// 1. Assume that follower engine updated the block storage and the protocol state. The block is reported as sealed
Expand Down Expand Up @@ -848,8 +847,7 @@ func (suite *Suite) TestGetTransactionResult() {
require.NoError(suite.T(), err)

// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections, transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
require.NoError(suite.T(), err)

background, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1078,8 +1076,7 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)

// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections, transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
require.NoError(suite.T(), err)

// create another block as a predecessor of the block created earlier
Expand Down
3 changes: 1 addition & 2 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
)
require.NoError(s.T(), err)

eng, err := New(s.log, s.net, s.proto.state, s.me, s.request, s.blocks, s.headers, s.collections,
s.transactions, s.results, s.receipts, s.collectionExecutedMetric, processedHeight, s.lastFullBlockHeight)
eng, err := New(s.log, s.net, s.proto.state, s.me, s.request, s.blocks, s.headers, s.collections, s.transactions, s.results, s.receipts, s.collectionExecutedMetric, processedHeight, s.lastFullBlockHeight)
require.NoError(s.T(), err)

eng.ComponentManager.Start(ctx)
Expand Down
157 changes: 157 additions & 0 deletions engine/common/stop/stop_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package stop

import (
"fmt"

"github.com/coreos/go-semver/semver"
"github.com/rs/zerolog"
"go.uber.org/atomic"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/irrecoverable"
)

type VersionMetadata struct {
// incompatibleBlockHeight is the height of the block that is incompatible with the current node version.
incompatibleBlockHeight uint64
// updatedVersion is the expected node version to continue working with new blocks.
updatedVersion string
}

// StopControl is responsible for managing the stopping behavior of the node
// when an incompatible block height is encountered.
type StopControl struct {
component.Component
cm *component.ComponentManager

log zerolog.Logger

versionData *atomic.Pointer[VersionMetadata]

// Notifier for new processed block height
processedHeightChannel chan uint64
// Signal channel to notify when processing is done
doneProcessingEvents chan struct{}

// Stores latest processed block height
lastProcessedHeight counters.StrictMonotonousCounter
}

// NewStopControl creates a new StopControl instance.
//
// Parameters:
// - log: The logger used for logging.
//
// Returns:
// - A pointer to the newly created StopControl instance.
func NewStopControl(
log zerolog.Logger,
) *StopControl {
sc := &StopControl{
log: log.With().
Str("component", "stop_control").
Logger(),
lastProcessedHeight: counters.NewMonotonousCounter(0),
versionData: atomic.NewPointer[VersionMetadata](nil),
processedHeightChannel: make(chan uint64),
doneProcessingEvents: make(chan struct{}),
}

sc.cm = component.NewComponentManagerBuilder().
AddWorker(sc.processEvents).
Build()
sc.Component = sc.cm

return sc
}

// OnVersionUpdate is called when a version update occurs.
//
// It updates the incompatible block height and the expected node version
// based on the provided height and semver.
//
// Parameters:
// - height: The block height that is incompatible with the current node version.
// - version: The new semantic version object that is expected for compatibility.
func (sc *StopControl) OnVersionUpdate(height uint64, version *semver.Version) {
// If the version was updated, store new version information
if version != nil {
sc.log.Info().
Uint64("height", height).
Str("semver", version.String()).
Msg("Received version update")

sc.versionData.Store(&VersionMetadata{
incompatibleBlockHeight: height,
updatedVersion: version.String(),
})
return
}

// If semver is 0, but notification was received, this means that the version update was deleted.
sc.versionData.Store(nil)
}

// onProcessedBlock is called when a new block is processed block.
// when the last compatible block is processed, the StopControl will cause the node to crash
//
// Parameters:
// - ctx: The context used to signal an irrecoverable error.
func (sc *StopControl) onProcessedBlock(ctx irrecoverable.SignalerContext) {
versionData := sc.versionData.Load()
if versionData == nil {
return
}

newHeight := sc.lastProcessedHeight.Value()
if newHeight >= versionData.incompatibleBlockHeight-1 {
ctx.Throw(fmt.Errorf("processed block at height %d is incompatible with the current node version, please upgrade to version %s starting from block height %d",
newHeight, versionData.updatedVersion, versionData.incompatibleBlockHeight))
}
}

// updateProcessedHeight updates the last processed height and triggers notifications.
//
// Parameters:
// - height: The height of the latest processed block.
func (sc *StopControl) updateProcessedHeight(height uint64) {
select {
case sc.processedHeightChannel <- height: // Successfully sent the height to the channel
case <-sc.doneProcessingEvents: // Process events are done, do not block
}
}

// RegisterHeightRecorder registers an execution data height recorder with the StopControl.
//
// Parameters:
// - recorder: The execution data height recorder to register.
func (sc *StopControl) RegisterHeightRecorder(recorder execution_data.ProcessedHeightRecorder) {
recorder.SetHeightUpdatesConsumer(sc.updateProcessedHeight)
}

// processEvents processes incoming events related to block heights and version updates.
//
// Parameters:
// - ctx: The context used to handle irrecoverable errors.
// - ready: A function to signal that the component is ready to start processing events.
func (sc *StopControl) processEvents(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

defer close(sc.doneProcessingEvents) // Ensure the signal channel is closed when done

for {
select {
case <-ctx.Done():
return
case height, ok := <-sc.processedHeightChannel:
if !ok {
return
}
if sc.lastProcessedHeight.Set(height) {
sc.onProcessedBlock(ctx)
}
}
}
}
Loading

0 comments on commit df5769f

Please sign in to comment.