Skip to content

Commit

Permalink
fix(core): add retry logic for the grpc subscription (#4093)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Feb 13, 2025
1 parent 5185fc8 commit 70d6351
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 268 deletions.
10 changes: 1 addition & 9 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
Expand Down Expand Up @@ -62,7 +61,6 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
require.NoError(t, err)
assert.True(t, has)
}
require.NoError(t, fetcher.Stop(ctx))
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
Expand All @@ -73,7 +71,6 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
Expand Down Expand Up @@ -121,7 +118,6 @@ func TestExchange_StoreHistoricIfArchival(t *testing.T) {

cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
Expand Down Expand Up @@ -211,12 +207,8 @@ func generateNonEmptyBlocks(
// generate several non-empty blocks
generateCtx, generateCtxCancel := context.WithCancel(context.Background())

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
sub, err := fetcher.SubscribeNewBlockEvent(generateCtx)
require.NoError(t, err)
defer func() {
err = fetcher.Stop(ctx)
require.NoError(t, err)
}()

go fillBlocks(t, generateCtx, cfg, cctx)

Expand Down
122 changes: 52 additions & 70 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -13,7 +12,6 @@ import (
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

libhead "github.com/celestiaorg/go-header"
)
Expand All @@ -34,10 +32,6 @@ var (

type BlockFetcher struct {
client coregrpc.BlockAPIClient

doneCh chan struct{}
cancel context.CancelFunc
isListeningForBlocks atomic.Bool
}

// NewBlockFetcher returns a new `BlockFetcher`.
Expand All @@ -47,18 +41,6 @@ func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) {
}, nil
}

// Stop stops the block fetcher.
// The underlying gRPC connection needs to be stopped separately.
func (f *BlockFetcher) Stop(ctx context.Context) error {
f.cancel()
select {
case <-f.doneCh:
return nil
case <-ctx.Done():
return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err())
}
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
Expand Down Expand Up @@ -162,72 +144,72 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V
}

// SubscribeNewBlockEvent subscribes to new block events from Core, returning
// a new block event channel on success.
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) {
if f.isListeningForBlocks.Load() {
return nil, fmt.Errorf("already subscribed to new blocks")
}
ctx, cancel := context.WithCancel(ctx)
f.cancel = cancel
f.doneCh = make(chan struct{})
f.isListeningForBlocks.Store(true)

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
close(f.doneCh)
f.isListeningForBlocks.Store(false)
return nil, err
}
// a new block event channel.
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan types.EventDataSignedBlock, error) {
signedBlockCh := make(chan types.EventDataSignedBlock, 1)

log.Debug("created a subscription. Start listening for new blocks...")
signedBlockCh := make(chan types.EventDataSignedBlock)
go func() {
defer close(f.doneCh)
defer close(signedBlockCh)
defer func() { f.isListeningForBlocks.Store(false) }()
for {
select {
case <-ctx.Done():
return
default:
resp, err := subscription.Recv()
if err != nil {
log.Errorw("fetcher: error receiving new height", "err", err.Error())
_, ok := status.FromError(err) // parsing the gRPC error
if ok {
// ok means that err contains a gRPC status error.
// move on another round of resubscribing.
return
}
continue
}
withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second)
signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height)
ctxCancel()
if err != nil {
log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error())
// sleeping a bit to avoid retrying instantly and give time for the gRPC connection
// to recover automatically.
time.Sleep(time.Second)
continue
}
select {
case signedBlockCh <- types.EventDataSignedBlock{
Header: *signedBlock.Header,
Commit: *signedBlock.Commit,
ValidatorSet: *signedBlock.ValidatorSet,
Data: *signedBlock.Data,
}:
case <-ctx.Done():
return
}
}

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
// try re-subscribe in case of any errors that can come during subscription. gRPC
// retry mechanism has a back off on retries, so we don't need timers anymore.
log.Warnw("fetcher: failed to subscribe to new block events", "err", err)
continue
}

log.Debug("fetcher: subscription created")
err = f.receive(ctx, signedBlockCh, subscription)
if err != nil {
log.Warnw("fetcher: error receiving new height", "err", err.Error())
continue
}
}
}()

return signedBlockCh, nil
}

func (f *BlockFetcher) receive(
ctx context.Context,
signedBlockCh chan types.EventDataSignedBlock,
subscription coregrpc.BlockAPI_SubscribeNewHeightsClient,
) error {
log.Debug("fetcher: started listening for new blocks")
for {
resp, err := subscription.Recv()
if err != nil {
return err
}

// TODO(@vgonkivs): make timeout configurable
withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second)
signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height)
ctxCancel()
if err != nil {
log.Warnw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error())
continue
}

select {
case signedBlockCh <- types.EventDataSignedBlock{
Header: *signedBlock.Header,
Commit: *signedBlock.Commit,
ValidatorSet: *signedBlock.ValidatorSet,
Data: *signedBlock.Data,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

// IsSyncing returns the sync status of the Core connection: true for
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
Expand Down
3 changes: 0 additions & 3 deletions core/fetcher_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

// generate some blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
// read once from channel to generate next block
Expand Down Expand Up @@ -56,5 +54,4 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
// compare ValidatorSet hash to the ValidatorsHash from first block height
hexBytes := valSet.Hash()
assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes)
require.NoError(t, fetcher.Stop(ctx))
}
Loading

0 comments on commit 70d6351

Please sign in to comment.