Skip to content

Commit 7faffd0

Browse files
authored
Merge pull request #7046 from onflow/illia-malachyn/7040-fix-context-canceled-error-propagation
[Access] Properly handle subscription errors in data providers
2 parents 82d7f1d + 429b961 commit 7faffd0

25 files changed

+529
-463
lines changed

engine/access/rest/common/models/block.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,68 @@ import (
88
const ExpandableFieldPayload = "payload"
99
const ExpandableExecutionResult = "execution_result"
1010

11+
func NewBlock(
12+
block *flow.Block,
13+
execResult *flow.ExecutionResult,
14+
link LinkGenerator,
15+
blockStatus flow.BlockStatus,
16+
expand map[string]bool,
17+
) (*Block, error) {
18+
self, err := SelfLink(block.ID(), link.BlockLink)
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
var result Block
24+
result.Header = NewBlockHeader(block.Header)
25+
26+
// add the payload to the response if it is specified as an expandable field
27+
result.Expandable = &BlockExpandable{}
28+
if expand[ExpandableFieldPayload] {
29+
var payload BlockPayload
30+
err := payload.Build(block.Payload)
31+
if err != nil {
32+
return nil, err
33+
}
34+
result.Payload = &payload
35+
} else {
36+
// else add the payload expandable link
37+
payloadExpandable, err := link.PayloadLink(block.ID())
38+
if err != nil {
39+
return nil, err
40+
}
41+
result.Expandable.Payload = payloadExpandable
42+
}
43+
44+
// execution result might not yet exist
45+
if execResult != nil {
46+
// add the execution result to the response if it is specified as an expandable field
47+
if expand[ExpandableExecutionResult] {
48+
var exeResult ExecutionResult
49+
err := exeResult.Build(execResult, link)
50+
if err != nil {
51+
return nil, err
52+
}
53+
result.ExecutionResult = &exeResult
54+
} else {
55+
// else add the execution result expandable link
56+
executionResultExpandable, err := link.ExecutionResultLink(execResult.ID())
57+
if err != nil {
58+
return nil, err
59+
}
60+
result.Expandable.ExecutionResult = executionResultExpandable
61+
}
62+
}
63+
64+
result.Links = self
65+
66+
var status BlockStatus
67+
status.Build(blockStatus)
68+
result.BlockStatus = &status
69+
70+
return &result, nil
71+
}
72+
1173
func (b *Block) Build(
1274
block *flow.Block,
1375
execResult *flow.ExecutionResult,
@@ -99,6 +161,16 @@ func (b *BlockPayload) Build(payload *flow.Payload) error {
99161
return nil
100162
}
101163

164+
func NewBlockHeader(header *flow.Header) *BlockHeader {
165+
return &BlockHeader{
166+
Id: header.ID().String(),
167+
ParentId: header.ParentID.String(),
168+
Height: util.FromUint(header.Height),
169+
Timestamp: header.Timestamp,
170+
ParentVoterSignature: util.ToBase64(header.ParentVoterSigData),
171+
}
172+
}
173+
102174
func (b *BlockHeader) Build(header *flow.Header) {
103175
b.Id = header.ID().String()
104176
b.ParentId = header.ParentID.String()

engine/access/rest/websockets/controller.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,7 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
476476
c.dataProvidersGroup.Add(1)
477477
go func() {
478478
err = provider.Run()
479-
// return the error to the client for all errors except context.Canceled.
480-
// context.Canceled is returned during graceful shutdown of a subscription
481-
if err != nil && !errors.Is(err, context.Canceled) {
479+
if err != nil {
482480
err = fmt.Errorf("internal error: %w", err)
483481
c.writeErrorResponse(
484482
ctx,

engine/access/rest/websockets/controller_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
6262
// data provider might finish on its own or controller will close it via Close()
6363
dataProvider.On("Close").Return(nil).Maybe()
6464
dataProvider.
65-
On("Run").
65+
On("Run", mock.Anything).
6666
Run(func(args mock.Arguments) {
6767
<-done
6868
}).
@@ -206,7 +206,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
206206
// data provider might finish on its own or controller will close it via Close()
207207
dataProvider.On("Close").Return(nil).Maybe()
208208
dataProvider.
209-
On("Run").
209+
On("Run", mock.Anything).
210210
Run(func(args mock.Arguments) {}).
211211
Return(fmt.Errorf("error running data provider")).
212212
Once()
@@ -261,7 +261,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
261261
// data provider might finish on its own or controller will close it via Close()
262262
dataProvider.On("Close").Return(nil).Maybe()
263263
dataProvider.
264-
On("Run").
264+
On("Run", mock.Anything).
265265
Run(func(args mock.Arguments) {
266266
<-done
267267
}).
@@ -329,7 +329,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
329329
// data provider might finish on its own or controller will close it via Close()
330330
dataProvider.On("Close").Return(nil).Maybe()
331331
dataProvider.
332-
On("Run").
332+
On("Run", mock.Anything).
333333
Run(func(args mock.Arguments) {
334334
<-done
335335
}).
@@ -399,7 +399,7 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
399399
// data provider might finish on its own or controller will close it via Close()
400400
dataProvider.On("Close").Return(nil).Maybe()
401401
dataProvider.
402-
On("Run").
402+
On("Run", mock.Anything).
403403
Run(func(args mock.Arguments) {
404404
<-done
405405
}).
@@ -480,7 +480,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
480480
// data provider might finish on its own or controller will close it via Close()
481481
dataProvider.On("Close").Return(nil).Maybe()
482482
dataProvider.
483-
On("Run").
483+
On("Run", mock.Anything).
484484
Run(func(args mock.Arguments) {
485485
<-done
486486
}).

engine/access/rest/websockets/data_providers/account_statuses_provider.go

Lines changed: 63 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"fmt"
66

77
"github.com/rs/zerolog"
8-
"google.golang.org/grpc/codes"
9-
"google.golang.org/grpc/status"
108

119
"github.com/onflow/flow-go/engine/access/rest/http/request"
1210
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/models"
@@ -23,16 +21,16 @@ type accountStatusesArguments struct {
2321
StartBlockID flow.Identifier // ID of the block to start subscription from
2422
StartBlockHeight uint64 // Height of the block to start subscription from
2523
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
26-
HeartbeatInterval *uint64 // Maximum number of blocks message won't be sent. Nil if not set
24+
HeartbeatInterval uint64 // Maximum number of blocks message won't be sent
2725
}
2826

2927
type AccountStatusesDataProvider struct {
3028
*baseDataProvider
3129

32-
logger zerolog.Logger
33-
stateStreamApi state_stream.API
34-
35-
heartbeatInterval uint64
30+
arguments accountStatusesArguments
31+
messageIndex counters.StrictMonotonicCounter
32+
blocksSinceLastMessage uint64
33+
stateStreamApi state_stream.API
3634
}
3735

3836
var _ DataProvider = (*AccountStatusesDataProvider)(nil)
@@ -44,55 +42,86 @@ func NewAccountStatusesDataProvider(
4442
stateStreamApi state_stream.API,
4543
subscriptionID string,
4644
topic string,
47-
arguments wsmodels.Arguments,
45+
rawArguments wsmodels.Arguments,
4846
send chan<- interface{},
4947
chain flow.Chain,
5048
eventFilterConfig state_stream.EventFilterConfig,
51-
heartbeatInterval uint64,
49+
defaultHeartbeatInterval uint64,
5250
) (*AccountStatusesDataProvider, error) {
5351
if stateStreamApi == nil {
5452
return nil, fmt.Errorf("this access node does not support streaming account statuses")
5553
}
5654

57-
p := &AccountStatusesDataProvider{
58-
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(),
59-
stateStreamApi: stateStreamApi,
60-
heartbeatInterval: heartbeatInterval,
61-
}
62-
63-
// Initialize arguments passed to the provider.
64-
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig)
55+
args, err := parseAccountStatusesArguments(rawArguments, chain, eventFilterConfig, defaultHeartbeatInterval)
6556
if err != nil {
6657
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
6758
}
68-
if accountStatusesArgs.HeartbeatInterval != nil {
69-
p.heartbeatInterval = *accountStatusesArgs.HeartbeatInterval
70-
}
71-
72-
subCtx, cancel := context.WithCancel(ctx)
7359

74-
p.baseDataProvider = newBaseDataProvider(
60+
provider := newBaseDataProvider(
61+
ctx,
62+
logger.With().Str("component", "account-statuses-data-provider").Logger(),
63+
nil,
7564
subscriptionID,
7665
topic,
77-
arguments,
78-
cancel,
66+
rawArguments,
7967
send,
80-
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
8168
)
8269

83-
return p, nil
70+
return &AccountStatusesDataProvider{
71+
baseDataProvider: provider,
72+
arguments: args,
73+
messageIndex: counters.NewMonotonicCounter(0),
74+
blocksSinceLastMessage: 0,
75+
stateStreamApi: stateStreamApi,
76+
}, nil
8477
}
8578

8679
// Run starts processing the subscription for events and handles responses.
80+
// Must be called once.
8781
//
88-
// Expected errors during normal operations:
89-
// - context.Canceled: if the operation is canceled, during an unsubscribe action.
82+
// No errors expected during normal operations.
9083
func (p *AccountStatusesDataProvider) Run() error {
91-
return subscription.HandleSubscription(p.subscription, p.handleResponse())
84+
return run(
85+
p.createAndStartSubscription(p.ctx, p.arguments),
86+
func(response *backend.AccountStatusesResponse) error {
87+
return p.sendResponse(response)
88+
},
89+
)
9290
}
9391

94-
// createSubscription creates a new subscription using the specified input arguments.
95-
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
92+
// sendResponse processes an account statuses message and sends it to data provider's channel.
93+
// This function is not safe to call concurrently.
94+
//
95+
// No errors are expected during normal operations
96+
func (p *AccountStatusesDataProvider) sendResponse(response *backend.AccountStatusesResponse) error {
97+
// Only send a response if there's meaningful data to send
98+
// or the heartbeat interval limit is reached
99+
p.blocksSinceLastMessage += 1
100+
accountEmittedEvents := len(response.AccountEvents) != 0
101+
reachedHeartbeatLimit := p.blocksSinceLastMessage >= p.arguments.HeartbeatInterval
102+
if !accountEmittedEvents && !reachedHeartbeatLimit {
103+
return nil
104+
}
105+
106+
accountStatusesPayload := models.NewAccountStatusesResponse(response, p.messageIndex.Value())
107+
resp := models.BaseDataProvidersResponse{
108+
SubscriptionID: p.ID(),
109+
Topic: p.Topic(),
110+
Payload: accountStatusesPayload,
111+
}
112+
p.send <- &resp
113+
114+
p.blocksSinceLastMessage = 0
115+
p.messageIndex.Increment()
116+
117+
return nil
118+
}
119+
120+
// createAndStartSubscription creates a new subscription using the specified input arguments.
121+
func (p *AccountStatusesDataProvider) createAndStartSubscription(
122+
ctx context.Context,
123+
args accountStatusesArguments,
124+
) subscription.Subscription {
96125
if args.StartBlockID != flow.ZeroID {
97126
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter)
98127
}
@@ -104,46 +133,12 @@ func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, ar
104133
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
105134
}
106135

107-
// handleResponse processes an account statuses and sends the formatted response.
108-
//
109-
// No errors are expected during normal operations.
110-
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
111-
blocksSinceLastMessage := uint64(0)
112-
messageIndex := counters.NewMonotonicCounter(0)
113-
114-
return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
115-
// check if there are any events in the response. if not, do not send a message unless the last
116-
// response was more than HeartbeatInterval blocks ago
117-
if len(accountStatusesResponse.AccountEvents) == 0 {
118-
blocksSinceLastMessage++
119-
if blocksSinceLastMessage < p.heartbeatInterval {
120-
return nil
121-
}
122-
}
123-
blocksSinceLastMessage = 0
124-
125-
index := messageIndex.Value()
126-
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
127-
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
128-
}
129-
130-
accountStatusesPayload := models.NewAccountStatusesResponse(accountStatusesResponse, index)
131-
response := models.BaseDataProvidersResponse{
132-
SubscriptionID: p.ID(),
133-
Topic: p.Topic(),
134-
Payload: accountStatusesPayload,
135-
}
136-
p.send <- &response
137-
138-
return nil
139-
}
140-
}
141-
142136
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
143137
func parseAccountStatusesArguments(
144138
arguments wsmodels.Arguments,
145139
chain flow.Chain,
146140
eventFilterConfig state_stream.EventFilterConfig,
141+
defaultHeartbeatInterval uint64,
147142
) (accountStatusesArguments, error) {
148143
allowedFields := map[string]struct{}{
149144
"start_block_id": {},
@@ -168,7 +163,7 @@ func parseAccountStatusesArguments(
168163
args.StartBlockHeight = startBlockHeight
169164

170165
// Parse 'heartbeat_interval' argument
171-
heartbeatInterval, err := extractHeartbeatInterval(arguments)
166+
heartbeatInterval, err := extractHeartbeatInterval(arguments, defaultHeartbeatInterval)
172167
if err != nil {
173168
return accountStatusesArguments{}, err
174169
}

engine/access/rest/websockets/data_providers/account_statuses_provider_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,11 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_HappyPath
8888
}
8989

9090
func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_StateStreamNotConfigured() {
91-
ctx := context.Background()
9291
send := make(chan interface{})
93-
9492
topic := AccountStatusesTopic
9593

9694
provider, err := NewAccountStatusesDataProvider(
97-
ctx,
95+
context.Background(),
9896
s.log,
9997
nil,
10098
"dummy-id",
@@ -206,15 +204,13 @@ func (s *AccountStatusesProviderSuite) expectedAccountStatusesResponses(backendR
206204
// when invalid arguments are provided. It verifies that appropriate errors are returned
207205
// for missing or conflicting arguments.
208206
func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidArguments() {
209-
ctx := context.Background()
210207
send := make(chan interface{})
211-
212208
topic := AccountStatusesTopic
213209

214210
for _, test := range invalidAccountStatusesArgumentsTestCases() {
215211
s.Run(test.name, func() {
216212
provider, err := NewAccountStatusesDataProvider(
217-
ctx,
213+
context.Background(),
218214
s.log,
219215
s.api,
220216
"dummy-id",
@@ -234,7 +230,6 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidAr
234230

235231
// TestMessageIndexAccountStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing.
236232
func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderResponse_HappyPath() {
237-
ctx := context.Background()
238233
send := make(chan interface{}, 10)
239234
topic := AccountStatusesTopic
240235
accountStatusesCount := 4
@@ -258,7 +253,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
258253

259254
// Create the AccountStatusesDataProvider instance
260255
provider, err := NewAccountStatusesDataProvider(
261-
ctx,
256+
context.Background(),
262257
s.log,
263258
s.api,
264259
"dummy-id",

0 commit comments

Comments
 (0)