Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PRT - fix reconnect attempt on retry from blocked providers #1976

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,6 @@ func (csm *ConsumerSessionManager) validatePairingListNotEmpty(addon string, ext
return numberOfResets
}

func (csm *ConsumerSessionManager) getValidAddressesLengthForExtensionOrAddon(addon string, extensions []string) int {
csm.lock.RLock()
defer csm.lock.RUnlock()
return len(csm.getValidAddresses(addon, extensions))
}

func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders UsedProvidersInf, tempIgnoredProviders *ignoredProviders, cuNeededForSession uint64, requestedBlock int64, addon string, extensionNames []string, stateful uint32, virtualEpoch uint64) (sessionWithProviderMap SessionWithProviderMap, err error) {
sessionWithProviderMap, err = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch)
if err != nil {
Expand Down Expand Up @@ -448,10 +442,12 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
providers: initUnwantedProviders,
currentEpoch: csm.atomicReadCurrentEpoch(),
}
utils.LavaFormatTrace("GetSessions tempIgnoredProviders", utils.LogAttr("tempIgnoredProviders", tempIgnoredProviders))

// Get a valid consumerSessionsWithProvider
sessionWithProviderMap, err := csm.getSessionWithProviderOrError(usedProviders, tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch)
if err != nil {
utils.LavaFormatTrace("GetSessions error", utils.LogAttr("error", err.Error()))
return nil, err
}

Expand All @@ -466,7 +462,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessionEpoch := sessionWithProvider.CurrentEpoch

// Get a valid Endpoint from the provider chosen
connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false, false, addon, extensionNames)
connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, sessionWithProvider.retryConnecting, false, addon, extensionNames)
if err != nil {
// verify err is AllProviderEndpointsDisabled and report.
if AllProviderEndpointsDisabledError.Is(err) {
Expand Down Expand Up @@ -697,6 +693,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP
for _, providerAddress := range csm.currentlyBlockedProviderAddresses {
// check if we have this provider already.
if _, providerExistInIgnoredProviders := ignoredProviders.providers[providerAddress]; providerExistInIgnoredProviders {
utils.LavaFormatTrace("[continue] provider already in ignored providers", utils.LogAttr("providerAddress", providerAddress))
continue
}
consumerSessionsWithProvider := csm.pairing[providerAddress]
Expand All @@ -707,11 +704,13 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP
// validate this provider has enough cu to be used
if err := consumerSessionsWithProvider.validateComputeUnits(cuNeededForSession, virtualEpoch); err != nil {
// we already added to ignored we can just continue to the next provider
utils.LavaFormatTrace("[continue] no compute units", utils.LogAttr("providerAddress", providerAddress))
continue
}

// validate this provider supports the required extension or addon
if !consumerSessionsWithProvider.IsSupportingAddon(addon) || !consumerSessionsWithProvider.IsSupportingExtensions(extensions) {
utils.LavaFormatTrace("[continue] no addon or extensions", utils.LogAttr("providerAddress", providerAddress))
continue
}

Expand All @@ -721,6 +720,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP
providerAddress: &SessionWithProvider{
SessionsWithProvider: consumerSessionsWithProvider,
CurrentEpoch: currentEpoch,
retryConnecting: true,
},
}, nil
}
Expand Down Expand Up @@ -925,17 +925,6 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
blockProvider = true
}

if sdkerrors.IsOf(errorReceived, BlockEndpointError) {
utils.LavaFormatTrace("Got BlockEndpointError, blocking endpoint and session",
utils.LogAttr("error", errorReceived),
utils.LogAttr("sessionID", consumerSession.SessionId),
)

// Block the endpoint and the consumer session from future usages
consumerSession.EndpointConnection.blockListed.Store(true)
consumerSession.BlockListed = true
}

consumerSession.QoSManager.AddFailedRelay(consumerSession.epoch, consumerSession.SessionId)
consumerSession.ConsecutiveErrors = append(consumerSession.ConsecutiveErrors, errorReceived)
// copy consecutive errors for report.
Expand Down
9 changes: 6 additions & 3 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,12 @@ func TestAllProvidersEndpointsDisabled(t *testing.T) {
pairingList := createPairingList("", false)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.Nil(t, cs)
require.Error(t, err)
usedProviders := NewUsedProviders(nil)
cs, err := csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for key := range cs {
require.Contains(t, csm.currentlyBlockedProviderAddresses, key)
}
}

func TestUpdateAllProviders(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ type EndpointConnection struct {
Client pairingtypes.RelayerClient
connection *grpc.ClientConn
numberOfSessionsUsingThisConnection uint64
blockListed atomic.Bool
lbUniqueId string
// blockListed - currently unused, use it carefully as it will block this provider's endpoint until next epoch without forgiveness.
// Can be used in cases of data reliability, self provider conflict etc..
blockListed atomic.Bool
lbUniqueId string
// In case we got disconnected, we cant reconnect as we might lose stickiness
// with the provider, if its using a load balancer
disconnected bool
Expand Down Expand Up @@ -165,6 +167,7 @@ func (e *Endpoint) CheckSupportForServices(addon string, extensions []string) (s
type SessionWithProvider struct {
SessionsWithProvider *ConsumerSessionsWithProvider
CurrentEpoch uint64
retryConnecting bool
}

type SessionWithProviderMap map[string]*SessionWithProvider // key is the provider address
Expand Down Expand Up @@ -482,6 +485,9 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
if !retryDisabledEndpoints && !endpoint.Enabled {
continue
}
if retryDisabledEndpoints {
utils.LavaFormatDebug("retrying to connect to disabled endpoint", utils.LogAttr("endpoint", endpoint.NetworkAddress), utils.LogAttr("provider", cswp.PublicLavaAddress))
}

// check endpoint supports the requested addons
supported := endpoint.CheckSupportForServices(addon, extensionNames)
Expand All @@ -496,6 +502,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
if endpointConnection.Client != nil && endpointConnection.connection != nil && !endpointConnection.disconnected {
// Check if the endpoint is not blocked
if endpointConnection.blockListed.Load() {
utils.LavaFormatDebug("Skipping provider's endpoint as its block listed", utils.LogAttr("address", endpoint.NetworkAddress), utils.LogAttr("PublicLavaAddress", cswp.PublicLavaAddress))
continue
}
connectionState := endpointConnection.connection.GetState()
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe

if !singleConsumerSession.VerifyProviderUniqueIdAndStoreIfFirstTime(providerUniqueId[0]) {
return reply, 0, utils.LavaFormatError("provider unique id mismatch",
errors.Join(lavasession.SessionOutOfSyncError, lavasession.BlockEndpointError),
lavasession.SessionOutOfSyncError,
utils.LogAttr("GUID", ctx),
utils.LogAttr("sessionId", relayRequest.RelaySession.SessionId),
utils.LogAttr("provider", relayRequest.RelaySession.Provider),
Expand Down
22 changes: 15 additions & 7 deletions scripts/pre_setups/init_lava_only_with_node_three_providers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,22 @@ lavad tx gov vote 3 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --ga

screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \
$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7766" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25

screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \
$PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7756" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25

screen -d -m -S provider3 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \
$PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER3.log" && sleep 0.25
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ":7746" 2>&1 | tee $LOGS_DIR/PROVIDER3.log" && sleep 0.25


wait_next_block

Expand All @@ -87,4 +88,11 @@ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id la
echo "--- setting up screens done ---"
screen -ls

echo "lavap rpcprovider $PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' $PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava"
echo "Provider 1 command:"
echo "lavap rpcprovider $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ':7766'"

echo "Provider 2 command:"
echo "lavap rpcprovider $PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ':7756'"

echo "Provider 3 command:"
echo "lavap rpcprovider $PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ':7746'"
1 change: 1 addition & 0 deletions scripts/useful_commands.sh
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ get_base_specs() {
"specs/mainnet-1/specs/ethereum.json"
"specs/mainnet-1/specs/solana.json"
"specs/mainnet-1/specs/aptos.json"
"specs/mainnet-1/specs/btc.json"
)

(IFS=,; echo "${priority_specs[*]}")
Expand Down
Loading