Skip to content

Commit

Permalink
Merge pull request #5632 from AndriiDiachuk/cleanup-streaming-errors-…
Browse files Browse the repository at this point in the history
…for-block-not-ready

Cleanup streaming errors for block not ready
  • Loading branch information
peterargue authored Apr 23, 2024
2 parents a762ee0 + f0adcfd commit 9117e90
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 16 deletions.
15 changes: 11 additions & 4 deletions engine/access/rpc/backend/backend_stream_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -283,7 +284,7 @@ func (b *backendSubscribeBlocks) getBlockDigestResponse(blockStatus flow.BlockSt

// getBlockHeader returns the block header for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady: block for the given block height is not available.
func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStatus flow.BlockStatus) (*flow.Header, error) {
err := b.validateHeight(height, expectedBlockStatus)
if err != nil {
Expand All @@ -293,6 +294,9 @@ func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStat
// since we are querying a finalized or sealed block header, we can use the height index and save an ID computation
header, err := b.headers.ByHeight(height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to retrieve block header for height %d: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}

Expand All @@ -301,7 +305,7 @@ func (b *backendSubscribeBlocks) getBlockHeader(height uint64, expectedBlockStat

// getBlock returns the block for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady: block for the given block height is not available.
func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flow.BlockStatus) (*flow.Block, error) {
err := b.validateHeight(height, expectedBlockStatus)
if err != nil {
Expand All @@ -311,6 +315,9 @@ func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flo
// since we are querying a finalized or sealed block, we can use the height index and save an ID computation
block, err := b.blocks.ByHeight(height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("failed to retrieve block for height %d: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}

Expand All @@ -319,7 +326,7 @@ func (b *backendSubscribeBlocks) getBlock(height uint64, expectedBlockStatus flo

// validateHeight checks if the given block height is valid and available based on the expected block status.
// Expected errors during normal operation:
// - storage.ErrNotFound: block for the given block height is not available.
// - subscription.ErrBlockNotReady when unable to retrieve the block by height.
func (b *backendSubscribeBlocks) validateHeight(height uint64, expectedBlockStatus flow.BlockStatus) error {
highestHeight, err := b.blockTracker.GetHighestHeight(expectedBlockStatus)
if err != nil {
Expand All @@ -330,7 +337,7 @@ func (b *backendSubscribeBlocks) validateHeight(height uint64, expectedBlockStat
// note: it's possible for the data to exist in the data store before the notification is
// received. this ensures a consistent view is available to all streams.
if height > highestHeight {
return fmt.Errorf("block %d is not available yet: %w", height, storage.ErrNotFound)
return fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

return nil
Expand Down
11 changes: 8 additions & 3 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -12,7 +13,6 @@ import (
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
Expand Down Expand Up @@ -144,18 +144,23 @@ func New(

// getExecutionData returns the execution data for the given block height.
// Expected errors during normal operation:
// - storage.ErrNotFound or execution_data.BlobNotFoundError: execution data for the given block height is not available.
// - subscription.ErrBlockNotReady: execution data for the given block height is not available.
func (b *StateStreamBackend) getExecutionData(ctx context.Context, height uint64) (*execution_data.BlockExecutionDataEntity, error) {
highestHeight := b.ExecutionDataTracker.GetHighestHeight()
// fail early if no notification has been received for the given block height.
// note: it's possible for the data to exist in the data store before the notification is
// received. this ensures a consistent view is available to all streams.
if height > highestHeight {
return nil, fmt.Errorf("execution data for block %d is not available yet: %w", height, storage.ErrNotFound)
return nil, fmt.Errorf("execution data for block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

execData, err := b.execDataCache.ByHeight(ctx, height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
execution_data.IsBlobNotFoundError(err) {
err = errors.Join(err, subscription.ErrBlockNotReady)
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

Expand Down
13 changes: 12 additions & 1 deletion engine/access/state_stream/backend/backend_account_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package backend

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

type AccountStatusesResponse struct {
Expand Down Expand Up @@ -54,7 +57,7 @@ func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartBlockID(
// SubscribeAccountStatusesFromStartHeight subscribes to the streaming of account status changes starting from
// a specific block height, with an optional status filter.
// Errors:
// - codes.ErrNotFound if could not get block by start height.
// - codes.ErrNotFound if could not get block by start height.
// - codes.Internal if there is an internal error.
func (b *AccountStatusesBackend) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
Expand Down Expand Up @@ -84,12 +87,20 @@ func (b *AccountStatusesBackend) SubscribeAccountStatusesFromLatestBlock(
}

// getAccountStatusResponseFactory returns a function that returns the account statuses response for a given height.
//
// Errors:
// - subscription.ErrBlockNotReady: If block header for the specified block height is not found.
// - error: An error, if any, encountered during getting events from storage or execution data.
func (b *AccountStatusesBackend) getAccountStatusResponseFactory(
filter state_stream.AccountStatusFilter,
) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
eventsResponse, err := b.eventsRetriever.GetAllEventsResponse(ctx, height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) {
return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}
return nil, err
}
filteredProtocolEvents := filter.Filter(eventsResponse.Events)
Expand Down
11 changes: 9 additions & 2 deletions engine/access/state_stream/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package backend

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

type EventsBackend struct {
Expand Down Expand Up @@ -126,12 +129,16 @@ func (b *EventsBackend) SubscribeEventsFromLatest(ctx context.Context, filter st
// - filter: The event filter used to filter events.
//
// Expected errors during normal operation:
// - codes.NotFound: If block header for the specified block height is not found, if events for the specified block height are not found.
// - subscription.ErrBlockNotReady: execution data for the given block height is not available.
func (b *EventsBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (response interface{}, err error) {
eventsResponse, err := b.eventsRetriever.GetAllEventsResponse(ctx, height)
if err != nil {
return nil, err
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) {
return nil, subscription.ErrBlockNotReady
}
return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

eventsResponse.Events = filter.Filter(eventsResponse.Events)
Expand Down
7 changes: 1 addition & 6 deletions engine/access/subscription/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"golang.org/x/time/rate"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
)

// ErrBlockNotReady represents an error indicating that a block is not yet available or ready.
Expand Down Expand Up @@ -105,10 +103,7 @@ func (s *Streamer) sendAllAvailable(ctx context.Context) error {
}

if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) ||
execution_data.IsBlobNotFoundError(err) ||
errors.Is(err, ErrBlockNotReady) {
if errors.Is(err, ErrBlockNotReady) {
// no more available
return nil
}
Expand Down

0 comments on commit 9117e90

Please sign in to comment.