diff --git a/access/handler.go b/access/handler.go index bcf401a2884..7b4cd950cf6 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1427,7 +1427,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion()) - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error { for i := range txResults { index := messageIndex.Value() diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index b6f3c860d5c..b4b6397739c 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -293,7 +293,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { versionControlEnabled: true, storeTxResultErrorMessages: false, stopControlEnabled: false, - registerDBPruneThreshold: pruner.DefaultThreshold, + registerDBPruneThreshold: 0, } } diff --git a/cmd/bootstrap/transit/cmd/pull.go b/cmd/bootstrap/transit/cmd/pull.go index a3a0c35eae7..cd5360fef83 100644 --- a/cmd/bootstrap/transit/cmd/pull.go +++ b/cmd/bootstrap/transit/cmd/pull.go @@ -97,7 +97,7 @@ func pull(cmd *cobra.Command, args []string) { fullOutpath := filepath.Join(flagBootDir, "public-root-information", filepath.Base(file.Name)) fmd5 := utils.CalcMd5(fullOutpath) // only skip files that have an MD5 hash - if file.MD5 != nil && bytes.Equal(fmd5, file.MD5) { + if len(file.MD5) > 0 && bytes.Equal(fmd5, file.MD5) { log.Info().Str("source", file.Name).Str("dest", fullOutpath).Msgf("skipping existing file from transit servers") return } diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 7da905963ea..034d3f0720b 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -463,6 +463,9 @@ func main() { seals, getSealingConfigs, ) + if err != nil { + return nil, fmt.Errorf("could not initialize sealing engine: %w", err) + } // subscribe for finalization events from hotstuff followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock) diff --git a/consensus/hotstuff/README.md b/consensus/hotstuff/README.md index c2ede0b97c9..42ab9ce0a5e 100644 --- a/consensus/hotstuff/README.md +++ b/consensus/hotstuff/README.md @@ -173,7 +173,7 @@ The event handler is designed to be executed single-threaded. To separate general graph-theoretical concepts from the concrete blockchain application, `LevelledForest` refers to blocks as graph `vertices` and to a block's view number as `level`. * `Validator` validates the HotStuff-relevant aspects of - - QC: total weight of all signers is more than 2/3 of committee weight, validity of signatures, view number is strictly monotonously increasing; + - QC: total weight of all signers is more than 2/3 of committee weight, validity of signatures, view number is strictly monotonicly increasing; - TC: total weight of all signers is more than 2/3 of committee weight, validity of signatures, proof for entering view; - block proposal: from designated primary for the block's respective view, contains proposer's vote for its own block, QC in block is valid, a valid TC for the previous view is included if and only if the QC is not for the previous view; diff --git a/consensus/hotstuff/integration/instance_test.go b/consensus/hotstuff/integration/instance_test.go index 49b8e6e68bd..323e43c7640 100644 --- a/consensus/hotstuff/integration/instance_test.go +++ b/consensus/hotstuff/integration/instance_test.go @@ -451,7 +451,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance { // mock signature aggregator which doesn't perform any crypto operations and just tracks total weight aggregator := &mocks.TimeoutSignatureAggregator{} totalWeight := atomic.NewUint64(0) - newestView := counters.NewMonotonousCounter(0) + newestView := counters.NewMonotonicCounter(0) aggregator.On("View").Return(view).Maybe() aggregator.On("TotalWeight").Return(func() uint64 { return totalWeight.Load() diff --git a/consensus/hotstuff/pacemaker/view_tracker.go b/consensus/hotstuff/pacemaker/view_tracker.go index b52822d0d5a..58e6c5ec63e 100644 --- a/consensus/hotstuff/pacemaker/view_tracker.go +++ b/consensus/hotstuff/pacemaker/view_tracker.go @@ -118,15 +118,15 @@ func (vt *viewTracker) ProcessTC(tc *flow.TimeoutCertificate) (uint64, error) { } // updateLivenessData updates the current view, qc, tc. We want to avoid unnecessary data-base -// writes, which we enforce by requiring that the view number is STRICTLY monotonously increasing. +// writes, which we enforce by requiring that the view number is STRICTLY monotonicly increasing. // Otherwise, an exception is returned. No errors are expected, any error should be treated as exception. func (vt *viewTracker) updateLivenessData(newView uint64, qc *flow.QuorumCertificate, tc *flow.TimeoutCertificate) error { if newView <= vt.livenessData.CurrentView { // This should never happen: in the current implementation, it is trivially apparent that // newView is _always_ larger than currentView. This check is to protect the code from // future modifications that violate the necessary condition for - // STRICTLY monotonously increasing view numbers. - return fmt.Errorf("cannot move from view %d to %d: currentView must be strictly monotonously increasing", + // STRICTLY monotonicly increasing view numbers. + return fmt.Errorf("cannot move from view %d to %d: currentView must be strictly monotonicly increasing", vt.livenessData.CurrentView, newView) } diff --git a/consensus/hotstuff/safetyrules/safety_rules_test.go b/consensus/hotstuff/safetyrules/safety_rules_test.go index 8034c76793b..c4a928ffba1 100644 --- a/consensus/hotstuff/safetyrules/safety_rules_test.go +++ b/consensus/hotstuff/safetyrules/safety_rules_test.go @@ -196,7 +196,7 @@ func (s *SafetyRulesTestSuite) TestProduceVote_UpdateLockedOneChainView() { // TestProduceVote_InvalidCurrentView tests that no vote is created if `curView` has invalid values. // In particular, `SafetyRules` requires that: // - the block's view matches `curView` -// - that values for `curView` are monotonously increasing +// - that values for `curView` are monotonicly increasing // // Failing any of these conditions is a symptom of an internal bug; hence `SafetyRules` should // _not_ return a `NoVoteError`. @@ -208,7 +208,7 @@ func (s *SafetyRulesTestSuite) TestProduceVote_InvalidCurrentView() { require.Error(s.T(), err) require.False(s.T(), model.IsNoVoteError(err)) }) - s.Run("view-not-monotonously-increasing", func() { + s.Run("view-not-monotonicly-increasing", func() { // create block with view < HighestAcknowledgedView proposal := helper.MakeSignedProposal(helper.WithProposal(helper.MakeProposal( helper.WithBlock( @@ -450,7 +450,7 @@ func (s *SafetyRulesTestSuite) TestProduceVote_VotingOnInvalidProposals() { // TestProduceVote_VoteEquivocation tests scenario when we try to vote twice in same view. We require that replica // follows next rules: // - replica votes once per view -// - replica votes in monotonously increasing views +// - replica votes in monotonicly increasing views // // Voting twice per round on equivocating proposals is considered a byzantine behavior. // Expect a `model.NoVoteError` sentinel in such scenario. diff --git a/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go b/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go index f4125d96080..571f6ca7f13 100644 --- a/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go +++ b/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go @@ -35,7 +35,7 @@ type TimeoutAggregator struct { log zerolog.Logger hotstuffMetrics module.HotstuffMetrics engineMetrics module.EngineMetrics - lowestRetainedView counters.StrictMonotonousCounter // lowest view, for which we still process timeouts + lowestRetainedView counters.StrictMonotonicCounter // lowest view, for which we still process timeouts collectors hotstuff.TimeoutCollectors queuedTimeoutsNotifier engine.Notifier enteringViewNotifier engine.Notifier @@ -64,7 +64,7 @@ func NewTimeoutAggregator(log zerolog.Logger, log: log.With().Str("component", "hotstuff.timeout_aggregator").Logger(), hotstuffMetrics: hotstuffMetrics, engineMetrics: engineMetrics, - lowestRetainedView: counters.NewMonotonousCounter(lowestRetainedView), + lowestRetainedView: counters.NewMonotonicCounter(lowestRetainedView), collectors: collectors, queuedTimeoutsNotifier: engine.NewNotifier(), enteringViewNotifier: engine.NewNotifier(), diff --git a/consensus/hotstuff/timeoutcollector/timeout_collector.go b/consensus/hotstuff/timeoutcollector/timeout_collector.go index 8b68aadb5cd..21ee45c7492 100644 --- a/consensus/hotstuff/timeoutcollector/timeout_collector.go +++ b/consensus/hotstuff/timeoutcollector/timeout_collector.go @@ -20,8 +20,8 @@ type TimeoutCollector struct { timeoutsCache *TimeoutObjectsCache // cache for tracking double timeout and timeout equivocation notifier hotstuff.TimeoutAggregationConsumer processor hotstuff.TimeoutProcessor - newestReportedQC counters.StrictMonotonousCounter // view of newest QC that was reported - newestReportedTC counters.StrictMonotonousCounter // view of newest TC that was reported + newestReportedQC counters.StrictMonotonicCounter // view of newest QC that was reported + newestReportedTC counters.StrictMonotonicCounter // view of newest TC that was reported } var _ hotstuff.TimeoutCollector = (*TimeoutCollector)(nil) @@ -40,8 +40,8 @@ func NewTimeoutCollector(log zerolog.Logger, notifier: notifier, timeoutsCache: NewTimeoutObjectsCache(view), processor: processor, - newestReportedQC: counters.NewMonotonousCounter(0), - newestReportedTC: counters.NewMonotonousCounter(0), + newestReportedQC: counters.NewMonotonicCounter(0), + newestReportedTC: counters.NewMonotonicCounter(0), } } @@ -96,7 +96,7 @@ func (c *TimeoutCollector) processTimeout(timeout *model.TimeoutObject) error { // * Over larger time scales, the emitted events are for statistically increasing views. // * However, on short time scales there are _no_ monotonicity guarantees w.r.t. the views. // Explanation: - // While only QCs with strict monotonously increasing views pass the + // While only QCs with strict monotonicly increasing views pass the // `if c.newestReportedQC.Set(timeout.NewestQC.View)` statement, we emit the notification in a separate // step. Therefore, emitting the notifications is subject to races, where on very short time-scales // the notifications can be out of order. diff --git a/consensus/hotstuff/voteaggregator/vote_aggregator.go b/consensus/hotstuff/voteaggregator/vote_aggregator.go index efb2e476bfc..eefc7adc433 100644 --- a/consensus/hotstuff/voteaggregator/vote_aggregator.go +++ b/consensus/hotstuff/voteaggregator/vote_aggregator.go @@ -38,11 +38,11 @@ type VoteAggregator struct { hotstuffMetrics module.HotstuffMetrics engineMetrics module.EngineMetrics notifier hotstuff.VoteAggregationViolationConsumer - lowestRetainedView counters.StrictMonotonousCounter // lowest view, for which we still process votes + lowestRetainedView counters.StrictMonotonicCounter // lowest view, for which we still process votes collectors hotstuff.VoteCollectors queuedMessagesNotifier engine.Notifier finalizationEventsNotifier engine.Notifier - finalizedView counters.StrictMonotonousCounter // cache the last finalized view to queue up the pruning work, and unblock the caller who's delivering the finalization event. + finalizedView counters.StrictMonotonicCounter // cache the last finalized view to queue up the pruning work, and unblock the caller who's delivering the finalization event. queuedVotes *fifoqueue.FifoQueue queuedBlocks *fifoqueue.FifoQueue } @@ -79,8 +79,8 @@ func NewVoteAggregator( hotstuffMetrics: hotstuffMetrics, engineMetrics: engineMetrics, notifier: notifier, - lowestRetainedView: counters.NewMonotonousCounter(lowestRetainedView), - finalizedView: counters.NewMonotonousCounter(lowestRetainedView), + lowestRetainedView: counters.NewMonotonicCounter(lowestRetainedView), + finalizedView: counters.NewMonotonicCounter(lowestRetainedView), collectors: collectors, queuedVotes: queuedVotes, queuedBlocks: queuedBlocks, diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index b9e28697ee8..1eba8922b61 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -100,7 +100,7 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar // No errors are expected during normal operations. func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error { blocksSinceLastMessage := uint64(0) - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(accountStatusesResponse *backend.AccountStatusesResponse) error { // check if there are any events in the response. if not, do not send a message unless the last diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 8546b5faf13..0a7f319b87b 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -86,7 +86,7 @@ func (p *EventsDataProvider) Run() error { // No errors are expected during normal operations. func (p *EventsDataProvider) handleResponse() func(eventsResponse *backend.EventsResponse) error { blocksSinceLastMessage := uint64(0) - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(eventsResponse *backend.EventsResponse) error { // check if there are any events in the response. if not, do not send a message unless the last diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go index d870ba512c0..a3edf73ad9e 100644 --- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go @@ -90,7 +90,7 @@ func (p *SendAndGetTransactionStatusesDataProvider) createSubscription( // // No errors are expected during normal operations. func (p *SendAndGetTransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(txResults []*access.TransactionResult) error { diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index bacbe18da4d..4a7210b961c 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -101,7 +101,7 @@ func (p *TransactionStatusesDataProvider) createSubscription( // // No errors are expected during normal operations. func (p *TransactionStatusesDataProvider) handleResponse() func(txResults []*access.TransactionResult) error { - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(txResults []*access.TransactionResult) error { diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index 927ff347af4..164306a08c4 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -360,7 +360,7 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea } blocksSinceLastMessage := uint64(0) - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(resp *EventsResponse) error { // check if there are any events in the response. if not, do not send a message unless the last @@ -480,7 +480,7 @@ func (h *Handler) handleAccountStatusesResponse( } blocksSinceLastMessage := uint64(0) - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) return func(resp *AccountStatusesResponse) error { // check if there are any events in the response. if not, do not send a message unless the last diff --git a/engine/access/subscription/block_tracker.go b/engine/access/subscription/block_tracker.go index 51be3726fbd..85bdb74391e 100644 --- a/engine/access/subscription/block_tracker.go +++ b/engine/access/subscription/block_tracker.go @@ -35,9 +35,9 @@ type BlockTrackerImpl struct { broadcaster *engine.Broadcaster // finalizedHighestHeight contains the highest consecutive block height for which we have received a new notification. - finalizedHighestHeight counters.StrictMonotonousCounter + finalizedHighestHeight counters.StrictMonotonicCounter // sealedHighestHeight contains the highest consecutive block height for which we have received a new notification. - sealedHighestHeight counters.StrictMonotonousCounter + sealedHighestHeight counters.StrictMonotonicCounter } // NewBlockTracker creates a new BlockTrackerImpl instance. @@ -70,8 +70,8 @@ func NewBlockTracker( return &BlockTrackerImpl{ BaseTracker: NewBaseTrackerImpl(rootHeight, state, headers), state: state, - finalizedHighestHeight: counters.NewMonotonousCounter(lastFinalized.Height), - sealedHighestHeight: counters.NewMonotonousCounter(lastSealed.Height), + finalizedHighestHeight: counters.NewMonotonicCounter(lastFinalized.Height), + sealedHighestHeight: counters.NewMonotonicCounter(lastSealed.Height), broadcaster: broadcaster, }, nil } diff --git a/engine/access/subscription/execution_data_tracker.go b/engine/access/subscription/execution_data_tracker.go index db20dfd3d0e..b50fd1e2087 100644 --- a/engine/access/subscription/execution_data_tracker.go +++ b/engine/access/subscription/execution_data_tracker.go @@ -64,7 +64,7 @@ type ExecutionDataTrackerImpl struct { useIndex bool // highestHeight contains the highest consecutive block height that we have consecutive execution data for - highestHeight counters.StrictMonotonousCounter + highestHeight counters.StrictMonotonicCounter } // NewExecutionDataTracker creates a new ExecutionDataTrackerImpl instance. @@ -96,7 +96,7 @@ func NewExecutionDataTracker( log: log, headers: headers, broadcaster: broadcaster, - highestHeight: counters.NewMonotonousCounter(highestAvailableFinalizedHeight), + highestHeight: counters.NewMonotonicCounter(highestAvailableFinalizedHeight), indexReporter: indexReporter, useIndex: useIndex, } diff --git a/engine/collection/compliance/core.go b/engine/collection/compliance/core.go index dc5432d2925..521dde93453 100644 --- a/engine/collection/compliance/core.go +++ b/engine/collection/compliance/core.go @@ -43,8 +43,8 @@ type Core struct { headers storage.Headers state clusterkv.MutableState // track latest finalized view/height - used to efficiently drop outdated or too-far-ahead blocks - finalizedView counters.StrictMonotonousCounter - finalizedHeight counters.StrictMonotonousCounter + finalizedView counters.StrictMonotonicCounter + finalizedHeight counters.StrictMonotonicCounter pending module.PendingClusterBlockBuffer // pending block cache sync module.BlockRequester hotstuff module.HotStuff diff --git a/engine/common/follower/cache/cache.go b/engine/common/follower/cache/cache.go index 9d60ec379f6..56576e235d9 100644 --- a/engine/common/follower/cache/cache.go +++ b/engine/common/follower/cache/cache.go @@ -50,7 +50,7 @@ type Cache struct { byParent map[flow.Identifier]BlocksByID // lookup of blocks by their parentID, for finding a block's known children notifier hotstuff.ProposalViolationConsumer // equivocations will be reported using this notifier - lowestView counters.StrictMonotonousCounter // lowest view that the cache accepts blocks for + lowestView counters.StrictMonotonicCounter // lowest view that the cache accepts blocks for } // Peek performs lookup of cached block by blockID. diff --git a/engine/common/follower/compliance_engine.go b/engine/common/follower/compliance_engine.go index 8374955d27b..1e1a26f6452 100644 --- a/engine/common/follower/compliance_engine.go +++ b/engine/common/follower/compliance_engine.go @@ -193,7 +193,7 @@ func (e *ComplianceEngine) OnSyncedBlocks(blocks flow.Slashable[[]*messages.Bloc // OnFinalizedBlock informs the compliance layer about finalization of a new block. It does not block // and asynchronously executes the internal pruning logic. We accept inputs out of order, and only act -// on inputs with strictly monotonously increasing views. +// on inputs with strictly monotonicly increasing views. // // Implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` // CAUTION: the input to this callback is treated as trusted; precautions should be taken that messages diff --git a/engine/common/stop/stop_control.go b/engine/common/stop/stop_control.go index 81d5eea52d2..2655a22928f 100644 --- a/engine/common/stop/stop_control.go +++ b/engine/common/stop/stop_control.go @@ -36,7 +36,7 @@ type StopControl struct { doneProcessingEvents chan struct{} // Stores latest processed block height - lastProcessedHeight counters.StrictMonotonousCounter + lastProcessedHeight counters.StrictMonotonicCounter } // NewStopControl creates a new StopControl instance. @@ -53,7 +53,7 @@ func NewStopControl( log: log.With(). Str("component", "stop_control"). Logger(), - lastProcessedHeight: counters.NewMonotonousCounter(0), + lastProcessedHeight: counters.NewMonotonicCounter(0), versionData: atomic.NewPointer[VersionMetadata](nil), processedHeightChannel: make(chan uint64), doneProcessingEvents: make(chan struct{}), diff --git a/engine/common/version/version_control.go b/engine/common/version/version_control.go index e26ab92ab3f..138948c44a1 100644 --- a/engine/common/version/version_control.go +++ b/engine/common/version/version_control.go @@ -42,6 +42,7 @@ var defaultCompatibilityOverrides = map[string]struct{}{ "0.37.18": {}, "0.37.22": {}, "0.37.26": {}, + "0.38.0": {}, } // VersionControl manages the version control system for the node. @@ -66,7 +67,7 @@ type VersionControl struct { // Notifier for new finalized block height finalizedHeightNotifier engine.Notifier - finalizedHeight counters.StrictMonotonousCounter + finalizedHeight counters.StrictMonotonicCounter // lastProcessedHeight the last handled block height lastProcessedHeight *atomic.Uint64 @@ -107,7 +108,7 @@ func NewVersionControl( versionBeacons: versionBeacons, sealedRootBlockHeight: atomic.NewUint64(sealedRootBlockHeight), lastProcessedHeight: atomic.NewUint64(latestFinalizedBlockHeight), - finalizedHeight: counters.NewMonotonousCounter(latestFinalizedBlockHeight), + finalizedHeight: counters.NewMonotonicCounter(latestFinalizedBlockHeight), finalizedHeightNotifier: engine.NewNotifier(), startHeight: atomic.NewUint64(NoHeight), endHeight: atomic.NewUint64(NoHeight), diff --git a/engine/consensus/compliance/core.go b/engine/consensus/compliance/core.go index 5ab6d26c269..c3d945eb784 100644 --- a/engine/consensus/compliance/core.go +++ b/engine/consensus/compliance/core.go @@ -46,8 +46,8 @@ type Core struct { payloads storage.Payloads state protocol.ParticipantState // track latest finalized view/height - used to efficiently drop outdated or too-far-ahead blocks - finalizedView counters.StrictMonotonousCounter - finalizedHeight counters.StrictMonotonousCounter + finalizedView counters.StrictMonotonicCounter + finalizedHeight counters.StrictMonotonicCounter pending module.PendingBlockBuffer // pending block cache sync module.BlockRequester hotstuff module.HotStuff diff --git a/engine/consensus/sealing/core.go b/engine/consensus/sealing/core.go index f40b730d88c..69686187cbf 100644 --- a/engine/consensus/sealing/core.go +++ b/engine/consensus/sealing/core.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" otelTrace "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/consensus" @@ -34,13 +35,12 @@ import ( // - pre-validating approvals (if they are outdated or non-verifiable) // - pruning already processed collectorTree type Core struct { - unit *engine.Unit workerPool *workerpool.WorkerPool // worker pool used by collectors log zerolog.Logger // used to log relevant actions with context collectorTree *approvals.AssignmentCollectorTree // levelled forest for assignment collectors approvalsCache *approvals.LruCache // in-memory cache of approvals that weren't verified - counterLastSealedHeight counters.StrictMonotonousCounter // monotonous counter for last sealed block height - counterLastFinalizedHeight counters.StrictMonotonousCounter // monotonous counter for last finalized block height + counterLastSealedHeight counters.StrictMonotonicCounter // monotonic counter for last sealed block height + counterLastFinalizedHeight counters.StrictMonotonicCounter // monotonic counter for last finalized block height headers storage.Headers // used to access block headers in storage state protocol.State // used to access protocol state seals storage.Seals // used to get last sealed block @@ -50,6 +50,7 @@ type Core struct { sealingTracker consensus.SealingTracker // logic-aware component for tracking sealing progress. tracer module.Tracer // used to trace execution sealingConfigsGetter module.SealingConfigsGetter // used to access configs for sealing conditions + reporter *gatedSealingObservationReporter // used to avoid excess resource usage by sealing observation completions } func NewCore( @@ -58,7 +59,6 @@ func NewCore( tracer module.Tracer, conMetrics module.ConsensusMetrics, sealingTracker consensus.SealingTracker, - unit *engine.Unit, headers storage.Headers, state protocol.State, sealsDB storage.Seals, @@ -79,16 +79,16 @@ func NewCore( tracer: tracer, metrics: conMetrics, sealingTracker: sealingTracker, - unit: unit, approvalsCache: approvals.NewApprovalsLRUCache(1000), - counterLastSealedHeight: counters.NewMonotonousCounter(lastSealed.Height), - counterLastFinalizedHeight: counters.NewMonotonousCounter(lastSealed.Height), + counterLastSealedHeight: counters.NewMonotonicCounter(lastSealed.Height), + counterLastFinalizedHeight: counters.NewMonotonicCounter(lastSealed.Height), headers: headers, state: state, seals: sealsDB, sealsMempool: sealsMempool, requestTracker: approvals.NewRequestTracker(headers, 10, 30), sealingConfigsGetter: sealingConfigsGetter, + reporter: newGatedSealingObservationReporter(), } factoryMethod := func(result *flow.ExecutionResult) (approvals.AssignmentCollector, error) { @@ -561,7 +561,9 @@ func (c *Core) ProcessFinalizedBlock(finalizedBlockID flow.Identifier) error { // observes the latest state of `sealingObservation`. // * The `sealingObservation` lives in the scope of this function. Hence, when this goroutine exits // this function, `sealingObservation` lives solely in the scope of the newly-created goroutine. - c.unit.Launch(sealingObservation.Complete) + // We do this call asynchronously because we are in the hot path, and it is not required to progress, + // and the call may involve database transactions that would unnecessarily delay sealing. + c.reporter.reportAsync(sealingObservation) return nil } @@ -654,7 +656,7 @@ func (c *Core) getOutdatedBlockIDsFromRootSealingSegment(rootHeader *flow.Header } knownBlockIDs := make(map[flow.Identifier]struct{}) // track block IDs in the sealing segment - var outdatedBlockIDs flow.IdentifierList + outdatedBlockIDs := make(flow.IdentifierList, 0) for _, block := range rootSealingSegment.Blocks { knownBlockIDs[block.ID()] = struct{}{} for _, result := range block.Payload.Results { @@ -666,3 +668,25 @@ func (c *Core) getOutdatedBlockIDsFromRootSealingSegment(rootHeader *flow.Header } return outdatedBlockIDs.Lookup(), nil } + +// gatedSealingObservationReporter is a utility for gating asynchronous completion of sealing observations. +type gatedSealingObservationReporter struct { + reporting *atomic.Bool // true when a sealing observation is actively being asynchronously completed +} + +func newGatedSealingObservationReporter() *gatedSealingObservationReporter { + return &gatedSealingObservationReporter{ + reporting: atomic.NewBool(false), + } +} + +// reportAsync only allows one in-flight observation completion at a time. +// Any extra observations are dropped. +func (reporter *gatedSealingObservationReporter) reportAsync(observation consensus.SealingObservation) { + if reporter.reporting.CompareAndSwap(false, true) { + go func() { + observation.Complete() + reporter.reporting.Store(false) + }() + } +} diff --git a/engine/consensus/sealing/core_test.go b/engine/consensus/sealing/core_test.go index 71d503e0c0b..4376c040b34 100644 --- a/engine/consensus/sealing/core_test.go +++ b/engine/consensus/sealing/core_test.go @@ -70,7 +70,7 @@ func (s *ApprovalProcessingCoreTestSuite) SetupTest() { setter := unittest.NewSealingConfigs(flow.DefaultChunkAssignmentAlpha) var err error - s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter) + s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter) require.NoError(s.T(), err) s.setter = setter } @@ -327,7 +327,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_EmergencySealing( true, // enable emergency sealing ) require.NoError(s.T(), err) - s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter) + s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter) require.NoError(s.T(), err) s.setter = setter @@ -748,7 +748,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree( finalSnapShot.On("Descendants").Return(blockChildren, nil) s.State.On("Final").Return(finalSnapShot) - core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), + core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter) require.NoError(s.T(), err) @@ -827,7 +827,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree_ }, nil) s.State.On("Final").Return(finalSnapShot) - core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), + core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter) require.NoError(s.T(), err) diff --git a/engine/consensus/sealing/engine.go b/engine/consensus/sealing/engine.go index cf379a90db5..352a0e423f3 100644 --- a/engine/consensus/sealing/engine.go +++ b/engine/consensus/sealing/engine.go @@ -13,6 +13,8 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/messages" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool" "github.com/onflow/flow-go/module/metrics" msig "github.com/onflow/flow-go/module/signature" @@ -20,6 +22,7 @@ import ( "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/logging" ) type Event struct { @@ -60,10 +63,10 @@ type ( // Engine is a wrapper for approval processing `Core` which implements logic for // queuing and filtering network messages which later will be processed by sealing engine. -// Purpose of this struct is to provide an efficient way how to consume messages from network layer and pass -// them to `Core`. Engine runs 2 separate gorourtines that perform pre-processing and consuming messages by Core. +// Purpose of this struct is to provide an efficient way to consume messages from the network layer and pass +// them to `Core`. Engine runs multiple workers for pre-processing messages and executing `sealing.Core` business logic. type Engine struct { - unit *engine.Unit + component.Component workerPool *workerpool.WorkerPool core consensus.SealingCore log zerolog.Logger @@ -85,7 +88,7 @@ type Engine struct { rootHeader *flow.Header } -// NewEngine constructs new `Engine` which runs on it's own unit. +// NewEngine constructs a new Sealing Engine which runs on its own component. func NewEngine(log zerolog.Logger, tracer module.Tracer, conMetrics module.ConsensusMetrics, @@ -106,9 +109,7 @@ func NewEngine(log zerolog.Logger, ) (*Engine, error) { rootHeader := state.Params().FinalizedRoot() - unit := engine.NewUnit() e := &Engine{ - unit: unit, workerPool: workerpool.New(defaultAssignmentCollectorsWorkerPoolCapacity), log: log.With().Str("engine", "sealing.Engine").Logger(), me: me, @@ -131,6 +132,8 @@ func NewEngine(log zerolog.Logger, return nil, fmt.Errorf("could not initialize message handler for untrusted inputs: %w", err) } + e.Component = e.buildComponentManager() + // register engine with the approval provider _, err = net.Register(channels.ReceiveApprovals, e) if err != nil { @@ -144,7 +147,7 @@ func NewEngine(log zerolog.Logger, } signatureHasher := msig.NewBLSHasher(msig.ResultApprovalTag) - core, err := NewCore(log, e.workerPool, tracer, conMetrics, sealingTracker, unit, headers, state, sealsDB, assigner, signatureHasher, sealsMempool, approvalConduit, requiredApprovalsForSealConstructionGetter) + core, err := NewCore(log, e.workerPool, tracer, conMetrics, sealingTracker, headers, state, sealsDB, assigner, signatureHasher, sealsMempool, approvalConduit, requiredApprovalsForSealConstructionGetter) if err != nil { return nil, fmt.Errorf("failed to init sealing engine: %w", err) } @@ -158,6 +161,30 @@ func NewEngine(log zerolog.Logger, return e, nil } +// buildComponentManager creates the component manager with the necessary workers. +// It must only be called during initialization of the sealing engine, and the only +// reason it is factored out from NewEngine is so that it can be used in tests. +func (e *Engine) buildComponentManager() *component.ComponentManager { + builder := component.NewComponentManagerBuilder() + for i := 0; i < defaultSealingEngineWorkers; i++ { + builder.AddWorker(e.loop) + } + builder.AddWorker(e.finalizationProcessingLoop) + builder.AddWorker(e.blockIncorporatedEventsProcessingLoop) + builder.AddWorker(e.waitUntilWorkersFinish) + return builder.Build() +} + +// waitUntilWorkersFinish ensures that the Sealing Engine only finishes shutting down +// once the workerPool used by the Sealing Core has been shut down (after waiting +// for any pending tasks to complete). +func (e *Engine) waitUntilWorkersFinish(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + <-ctx.Done() + // After receiving shutdown signal, wait for the workerPool + e.workerPool.StopWait() +} + // setupTrustedInboundQueues initializes inbound queues for TRUSTED INPUTS (from other components within the // consensus node). We deliberately separate the queues for trusted inputs from the MessageHandler, which // handles external, untrusted inputs. This reduces the attack surface, as it makes it impossible for an external @@ -262,17 +289,22 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, eve e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) return nil } - return fmt.Errorf("unexpected error while processing engine message: %w", err) + // An unexpected exception should never happen here, because the `messageHandler` only puts the events into the + // respective queues depending on their type or returns an `IncompatibleInputTypeError` for events with unknown type. + // We cannot return the error here, because the networking layer calling `Process` will just log that error and + // continue on a best-effort basis, which is not safe in case of an unexpected exception. + e.log.Fatal().Err(err).Msg("unexpected error while processing engine message") } return nil } // processAvailableMessages is processor of pending events which drives events from networking layer to business logic in `Core`. // Effectively consumes messages from networking layer and dispatches them into corresponding sinks which are connected with `Core`. -func (e *Engine) processAvailableMessages() error { +// No errors expected during normal operations. +func (e *Engine) processAvailableMessages(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } @@ -310,53 +342,59 @@ func (e *Engine) processAvailableMessages() error { } } -// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events -func (e *Engine) finalizationProcessingLoop() { +// finalizationProcessingLoop contains the logic for processing of block finalization events. +// This method is intended to be executed by a single worker goroutine. +func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { finalizationNotifier := e.finalizationEventsNotifier.Channel() + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-finalizationNotifier: finalized, err := e.state.Final().Head() if err != nil { - e.log.Fatal().Err(err).Msg("could not retrieve last finalized block") + ctx.Throw(fmt.Errorf("could not retrieve last finalized block: %w", err)) } err = e.core.ProcessFinalizedBlock(finalized.ID()) if err != nil { - e.log.Fatal().Err(err).Msgf("could not process finalized block %v", finalized.ID()) + ctx.Throw(fmt.Errorf("could not process finalized block %v: %w", finalized.ID(), err)) } } } } -// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events -func (e *Engine) blockIncorporatedEventsProcessingLoop() { +// blockIncorporatedEventsProcessingLoop contains the logic for processing block incorporated events. +// This method is intended to be executed by a single worker goroutine. +func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.blockIncorporatedNotifier.Channel() - + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-c: - err := e.processBlockIncorporatedEvents() + err := e.processBlockIncorporatedEvents(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message") + ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err)) } } } } -func (e *Engine) loop() { +// loop contains the logic for processing incorporated results and result approvals via sealing.Core's +// business logic. This method is intended to be executed by multiple loop worker goroutines concurrently. +func (e *Engine) loop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { notifier := e.inboundEventsNotifier.Channel() + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-notifier: - err := e.processAvailableMessages() + err := e.processAvailableMessages(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing queued message") + ctx.Throw(fmt.Errorf("internal error processing queued message: %w", err)) } } } @@ -365,69 +403,31 @@ func (e *Engine) loop() { // processIncorporatedResult is a function that creates incorporated result and submits it for processing // to sealing core. In phase 2, incorporated result is incorporated at same block that is being executed. // This will be changed in phase 3. +// No errors expected during normal operations. func (e *Engine) processIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error { err := e.core.ProcessIncorporatedResult(incorporatedResult) e.engineMetrics.MessageHandled(metrics.EngineSealing, metrics.MessageExecutionReceipt) return err } +// onApproval checks that the result approval is valid before forwarding it to the Core for processing in a blocking way. +// No errors expected during normal operations. func (e *Engine) onApproval(originID flow.Identifier, approval *flow.ResultApproval) error { - // don't process approval if originID is mismatched + // don't process (silently ignore) approval if originID is mismatched if originID != approval.Body.ApproverID { + e.log.Warn().Bool(logging.KeyProtocolViolation, true). + Msgf("result approval generated by node %v received from different originID %v", approval.Body.ApproverID, originID) return nil } err := e.core.ProcessApproval(approval) e.engineMetrics.MessageHandled(metrics.EngineSealing, metrics.MessageResultApproval) if err != nil { - return fmt.Errorf("fatal internal error in sealing core logic") + return irrecoverable.NewExceptionf("fatal internal error in sealing core logic: %w", err) } return nil } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - err := e.ProcessLocal(event) - if err != nil { - // receiving an input of incompatible type from a trusted internal component is fatal - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.Process(channel, originID, event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.messageHandler.Process(e.me.NodeID(), event) -} - -// Ready returns a ready channel that is closed once the engine has fully -// started. For the propagation engine, we consider the engine up and running -// upon initialization. -func (e *Engine) Ready() <-chan struct{} { - // launch as many workers as we need - for i := 0; i < defaultSealingEngineWorkers; i++ { - e.unit.Launch(e.loop) - } - e.unit.Launch(e.finalizationProcessingLoop) - e.unit.Launch(e.blockIncorporatedEventsProcessingLoop) - return e.unit.Ready() -} - -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done(func() { - e.workerPool.StopWait() - }) -} - // OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` // It informs sealing.Core about finalization of respective block. // @@ -500,10 +500,10 @@ func (e *Engine) processIncorporatedBlock(incorporatedBlockID flow.Identifier) e // processBlockIncorporatedEvents performs processing of block incorporated hot stuff events // No errors expected during normal operations. -func (e *Engine) processBlockIncorporatedEvents() error { +func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } diff --git a/engine/consensus/sealing/engine_test.go b/engine/consensus/sealing/engine_test.go index 7815f424157..f83ada42144 100644 --- a/engine/consensus/sealing/engine_test.go +++ b/engine/consensus/sealing/engine_test.go @@ -1,18 +1,20 @@ package sealing import ( + "context" "sync" "testing" "time" + "github.com/gammazero/workerpool" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/consensus/hotstuff/model" - "github.com/onflow/flow-go/engine" mockconsensus "github.com/onflow/flow-go/engine/consensus/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/messages" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -36,6 +38,7 @@ type SealingEngineSuite struct { // Sealing Engine engine *Engine + cancel context.CancelFunc } func (s *SealingEngineSuite) SetupTest() { @@ -58,7 +61,7 @@ func (s *SealingEngineSuite) SetupTest() { s.engine = &Engine{ log: unittest.Logger(), - unit: engine.NewUnit(), + workerPool: workerpool.New(defaultAssignmentCollectorsWorkerPoolCapacity), core: s.core, me: me, engineMetrics: metrics, @@ -75,10 +78,22 @@ func (s *SealingEngineSuite) SetupTest() { err = s.engine.setupMessageHandler(unittest.NewSealingConfigs(RequiredApprovalsForSealConstructionTestingValue)) require.NoError(s.T(), err) - <-s.engine.Ready() + // setup ComponentManager and start the engine + s.engine.Component = s.engine.buildComponentManager() + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background()) + s.cancel = cancel + s.engine.Start(ctx) + unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond) } -// TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`. +func (s *SealingEngineSuite) TearDownTest() { + if s.cancel != nil { + s.cancel() + unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond) + } +} + +// TestOnFinalizedBlock tests if finalized block gets processed when sent through [Engine]. // Tests the whole processing pipeline. func (s *SealingEngineSuite) TestOnFinalizedBlock() { @@ -95,7 +110,7 @@ func (s *SealingEngineSuite) TestOnFinalizedBlock() { s.core.AssertExpectations(s.T()) } -// TestOnBlockIncorporated tests if incorporated block gets processed when send through `Engine`. +// TestOnBlockIncorporated tests if incorporated block gets processed when sent through [Engine]. // Tests the whole processing pipeline. func (s *SealingEngineSuite) TestOnBlockIncorporated() { parentBlock := unittest.BlockHeaderFixture() @@ -201,15 +216,11 @@ func (s *SealingEngineSuite) TestApprovalInvalidOrigin() { s.core.AssertNumberOfCalls(s.T(), "ProcessApproval", 0) } -// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type +// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type // was submitted from network layer. func (s *SealingEngineSuite) TestProcessUnsupportedMessageType() { invalidEvent := uint64(42) err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent) // shouldn't result in error since byzantine inputs are expected require.NoError(s.T(), err) - // in case of local processing error cannot be consumed since all inputs are trusted - err = s.engine.ProcessLocal(invalidEvent) - require.Error(s.T(), err) - require.True(s.T(), engine.IsIncompatibleInputTypeError(err)) } diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index 71f991d0334..56f26418a17 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -179,14 +179,31 @@ type ConsensusNode struct { MatchingEngine *matching.Engine } -func (cn ConsensusNode) Ready() { - <-cn.IngestionEngine.Ready() - <-cn.SealingEngine.Ready() +func (cn ConsensusNode) Start(t *testing.T) { + go unittest.FailOnIrrecoverableError(t, cn.Ctx.Done(), cn.Errs) + cn.IngestionEngine.Start(cn.Ctx) + cn.SealingEngine.Start(cn.Ctx) } -func (cn ConsensusNode) Done() { - <-cn.IngestionEngine.Done() - <-cn.SealingEngine.Done() +func (cn ConsensusNode) Ready() <-chan struct{} { + return util.AllReady( + cn.IngestionEngine, + cn.SealingEngine, + ) +} + +func (cn ConsensusNode) Done() <-chan struct{} { + done := make(chan struct{}) + go func() { + cn.GenericNode.Cancel() + <-util.AllDone( + cn.IngestionEngine, + cn.SealingEngine, + ) + cn.GenericNode.Done() + close(done) + }() + return done } // ExecutionNode implements a mocked execution node for tests. diff --git a/integration/tests/access/cohort4/grpc_state_stream_test.go b/integration/tests/access/cohort4/grpc_state_stream_test.go index 9f930702772..e3f5cb07ee9 100644 --- a/integration/tests/access/cohort4/grpc_state_stream_test.go +++ b/integration/tests/access/cohort4/grpc_state_stream_test.go @@ -231,7 +231,7 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { foundANTxControlCount := 0 foundONTxCount := 0 - messageIndex := counters.NewMonotonousCounter(0) + messageIndex := counters.NewMonotonicCounter(0) r := NewResponseTracker(compareEventsResponse, 3) diff --git a/module/counters/monotonous_counter.go b/module/counters/monotonous_counter.go index 17874299d84..a96498d8327 100644 --- a/module/counters/monotonous_counter.go +++ b/module/counters/monotonous_counter.go @@ -2,24 +2,24 @@ package counters import "sync/atomic" -// StrictMonotonousCounter is a helper struct which implements a strict monotonous counter. -// StrictMonotonousCounter is implemented using atomic operations and doesn't allow to set a value +// StrictMonotonicCounter is a helper struct which implements a strict monotonic counter. +// StrictMonotonicCounter is implemented using atomic operations and doesn't allow to set a value // which is lower or equal to the already stored one. The counter is implemented // solely with non-blocking atomic operations for concurrency safety. -type StrictMonotonousCounter struct { +type StrictMonotonicCounter struct { atomicCounter uint64 } -// NewMonotonousCounter creates new counter with initial value -func NewMonotonousCounter(initialValue uint64) StrictMonotonousCounter { - return StrictMonotonousCounter{ +// NewMonotonicCounter creates new counter with initial value +func NewMonotonicCounter(initialValue uint64) StrictMonotonicCounter { + return StrictMonotonicCounter{ atomicCounter: initialValue, } } // Set updates value of counter if and only if it's strictly larger than value which is already stored. // Returns true if update was successful or false if stored value is larger. -func (c *StrictMonotonousCounter) Set(newValue uint64) bool { +func (c *StrictMonotonicCounter) Set(newValue uint64) bool { for { oldValue := c.Value() if newValue <= oldValue { @@ -32,12 +32,12 @@ func (c *StrictMonotonousCounter) Set(newValue uint64) bool { } // Value returns value which is stored in atomic variable -func (c *StrictMonotonousCounter) Value() uint64 { +func (c *StrictMonotonicCounter) Value() uint64 { return atomic.LoadUint64(&c.atomicCounter) } // Increment atomically increments counter and returns updated value -func (c *StrictMonotonousCounter) Increment() uint64 { +func (c *StrictMonotonicCounter) Increment() uint64 { for { oldValue := c.Value() newValue := oldValue + 1 diff --git a/module/counters/monotonous_counter_test.go b/module/counters/monotonous_counter_test.go index f16fc48c0d7..cfda762fb34 100644 --- a/module/counters/monotonous_counter_test.go +++ b/module/counters/monotonous_counter_test.go @@ -9,7 +9,7 @@ import ( ) func TestSet(t *testing.T) { - counter := NewMonotonousCounter(3) + counter := NewMonotonicCounter(3) require.True(t, counter.Set(4)) require.Equal(t, uint64(4), counter.Value()) require.False(t, counter.Set(2)) @@ -17,15 +17,15 @@ func TestSet(t *testing.T) { } func TestIncrement(t *testing.T) { - counter := NewMonotonousCounter(1) + counter := NewMonotonicCounter(1) require.Equal(t, uint64(2), counter.Increment()) require.Equal(t, uint64(3), counter.Increment()) } -// TestIncrementConcurrently tests that the MonotonousCounter's Increment method +// TestIncrementConcurrently tests that the MonotonicCounter's Increment method // works correctly when called concurrently from multiple goroutines func TestIncrementConcurrently(t *testing.T) { - counter := NewMonotonousCounter(0) + counter := NewMonotonicCounter(0) unittest.Concurrently(100, func(i int) { counter.Increment() @@ -35,7 +35,7 @@ func TestIncrementConcurrently(t *testing.T) { } func TestFuzzy(t *testing.T) { - counter := NewMonotonousCounter(3) + counter := NewMonotonicCounter(3) require.True(t, counter.Set(4)) require.False(t, counter.Set(2)) require.True(t, counter.Set(7)) @@ -54,7 +54,7 @@ func TestFuzzy(t *testing.T) { } func TestConcurrent(t *testing.T) { - counter := NewMonotonousCounter(3) + counter := NewMonotonicCounter(3) unittest.Concurrently(100, func(i int) { counter.Set(uint64(i)) diff --git a/module/counters/persistent_strict_monotonic_counter.go b/module/counters/persistent_strict_monotonic_counter.go index 1ca881d04b6..95c2e3e8aae 100644 --- a/module/counters/persistent_strict_monotonic_counter.go +++ b/module/counters/persistent_strict_monotonic_counter.go @@ -17,7 +17,7 @@ type PersistentStrictMonotonicCounter struct { consumerProgress storage.ConsumerProgress // used to skip heights that are lower than the current height - counter StrictMonotonousCounter + counter StrictMonotonicCounter } // NewPersistentStrictMonotonicCounter creates a new PersistentStrictMonotonicCounter. @@ -36,7 +36,7 @@ func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgre return nil, fmt.Errorf("failed to get processed index: %w", err) } - m.counter = NewMonotonousCounter(value) + m.counter = NewMonotonicCounter(value) return m, nil } @@ -44,7 +44,7 @@ func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgre // Set sets the processed index, ensuring it is strictly monotonically increasing. // // Expected errors during normal operation: -// - codes.ErrIncorrectValue - if stored value is >= processed (requirement of strict monotonous increase is violated). +// - codes.ErrIncorrectValue - if stored value is >= processed (requirement of strict monotonic increase is violated). // - generic error in case of unexpected failure from the database layer or // encoding failure. func (m *PersistentStrictMonotonicCounter) Set(processed uint64) error { diff --git a/module/metrics/access.go b/module/metrics/access.go index aacfe316c76..577e8c34405 100644 --- a/module/metrics/access.go +++ b/module/metrics/access.go @@ -51,7 +51,7 @@ type AccessCollector struct { maxReceiptHeight prometheus.Gauge // used to skip heights that are lower than the current max height - maxReceiptHeightValue counters.StrictMonotonousCounter + maxReceiptHeightValue counters.StrictMonotonicCounter } var _ module.AccessMetrics = (*AccessCollector)(nil) @@ -112,7 +112,7 @@ func NewAccessCollector(opts ...AccessCollectorOpts) *AccessCollector { Subsystem: subsystemIngestion, Help: "gauge to track the maximum block height of execution receipts received", }), - maxReceiptHeightValue: counters.NewMonotonousCounter(0), + maxReceiptHeightValue: counters.NewMonotonicCounter(0), } for _, opt := range opts { diff --git a/module/metrics/execution.go b/module/metrics/execution.go index e269a70de64..4a06a5895c9 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -83,7 +83,7 @@ type ExecutionCollector struct { stateSyncActive prometheus.Gauge blockDataUploadsInProgress prometheus.Gauge blockDataUploadsDuration prometheus.Histogram - maxCollectionHeightData counters.StrictMonotonousCounter + maxCollectionHeightData counters.StrictMonotonicCounter maxCollectionHeight prometheus.Gauge computationResultUploadedCount prometheus.Counter computationResultUploadRetriedCount prometheus.Counter @@ -703,7 +703,7 @@ func NewExecutionCollector(tracer module.Tracer) *ExecutionCollector { Help: "the number of times a program was found in the cache", }), - maxCollectionHeightData: counters.NewMonotonousCounter(0), + maxCollectionHeightData: counters.NewMonotonicCounter(0), maxCollectionHeight: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "max_collection_height", diff --git a/network/test/cohort1/network_test.go b/network/test/cohort1/network_test.go index 5846167f934..d79a9a7204d 100644 --- a/network/test/cohort1/network_test.go +++ b/network/test/cohort1/network_test.go @@ -484,7 +484,7 @@ func (suite *NetworkTestSuite) TestUnicastRateLimit_Bandwidth() { _, err = newNet.Register(channels.TestNetworkChannel, newEngine) require.NoError(suite.T(), err) - callCount := counters.NewMonotonousCounter(0) + callCount := counters.NewMonotonicCounter(0) newEngine.On("Process", channels.TestNetworkChannel, suite.ids[0].NodeID, mockery.Anything).Run(func(args mockery.Arguments) { _ = callCount.Increment() }).Return(nil)