-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Properly handle subscription errors in data providers #7046
base: master
Are you sure you want to change the base?
[Access] Properly handle subscription errors in data providers #7046
Conversation
Distinguish between `context.Canceled` errors originating from the streamer and those triggered by the DataProvider’s `Close()` method. Use `wasClosedByClient()` to suppress expected cancellations while propagating unexpected ones
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7046 +/- ##
==========================================
+ Coverage 41.23% 41.24% +0.01%
==========================================
Files 2138 2138
Lines 188585 188622 +37
==========================================
+ Hits 77764 77799 +35
- Misses 104327 104329 +2
Partials 6494 6494
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
After discussing with Yurii, we agreed to use a different approach here. |
As this will changed, will re-aprove final implementation
We use it to distinguish place where cxt.Canceled error comes from. Also, I refactored each data provider's Run() function. Now it's more readable and clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this refactor needed? can you point out the important changes, it's not clear to me
Hey. I updated PR's description. Added a context of what has been done and why. |
// The block counter increments until either: | ||
// 1. The account emits events | ||
// 2. The heartbeat interval is reached | ||
*blocksSinceLastMessage += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all places with blockSinceLastMessage
were refactored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your refactor variant should work as expected, I would just add to comment that:
// Only send a response if there's meaningful data to send or the heartbeat interval limit is reached
} | ||
|
||
var accountStatusesPayload models.AccountStatusesResponse | ||
defer messageIndex.Increment() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all places with messageIndex
increment were refactored
@@ -15,6 +17,9 @@ type baseDataProvider struct { | |||
cancel context.CancelFunc | |||
send chan<- interface{} | |||
subscription subscription.Subscription | |||
// Ensures the closedChan has been closed once. | |||
closedFlag sync.Once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added closedChan
channel
) error { | ||
for { | ||
select { | ||
case <-closedChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run() is aware of closedChan
and returns nil if it was closed
@peterargue I also pointed out the most important lines of code |
@@ -33,6 +38,8 @@ func newBaseDataProvider( | |||
cancel: cancel, | |||
send: send, | |||
subscription: subscription, | |||
closedFlag: sync.Once{}, | |||
closedChan: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use unbuffered channel here, as it only needed for closing, we do not write to it anywhere?
|
||
type sendResponseCallback[T any] func(T) error | ||
|
||
func run[T any]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is self-explentatory, but I would add the function comment anyway.
p.subscription, | ||
subscription.HandleResponse(p.send, func(b *flow.BlockDigest) (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this subscription.HandleRespons
used anywhere? If not, I think we could remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first round of review - DONE! I have a few small comments.
Distinguish between
context.Canceled
errors originating from the streamer and those triggered by the DataProvider’sClose()
method.closeChan
that is used inDataProvider.Close()
method to indicateDataProvider.Run()
that users of data providers (WebSocket controller in our case) want to finish receiving data.HandleSubscription()
function is replaced byrun()
function that is aware ofcloseChan
. I made a new function for it becauseHandleSubscription()
is widely used in the access package (HandleRPCSubscription
has 22 usages atm).run()
returns nil ifcloseChan
was closed.HandleSubscription
returnedctx.Canceled
which lead to confusion asctx.Canceled
could come from 2 sources (streamer and websocket controller).sendResponse()
functions to make it more readable.Closes #7040 #7047