From 3b2bc05bc3325b5c278b158a2e14f9666775dd86 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Wed, 26 Feb 2025 14:55:57 +0100 Subject: [PATCH 1/4] fix: PRT - fix reconnect attempt on retry from blocked providers --- .../lavasession/consumer_session_manager.go | 14 ++++++------ protocol/lavasession/consumer_types.go | 4 ++++ ...nit_lava_only_with_node_three_providers.sh | 22 +++++++++++++------ scripts/useful_commands.sh | 1 + 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 2cff6354e5..a802acf429 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -396,12 +396,6 @@ func (csm *ConsumerSessionManager) validatePairingListNotEmpty(addon string, ext return numberOfResets } -func (csm *ConsumerSessionManager) getValidAddressesLengthForExtensionOrAddon(addon string, extensions []string) int { - csm.lock.RLock() - defer csm.lock.RUnlock() - return len(csm.getValidAddresses(addon, extensions)) -} - func (csm *ConsumerSessionManager) getSessionWithProviderOrError(usedProviders UsedProvidersInf, tempIgnoredProviders *ignoredProviders, cuNeededForSession uint64, requestedBlock int64, addon string, extensionNames []string, stateful uint32, virtualEpoch uint64) (sessionWithProviderMap SessionWithProviderMap, err error) { sessionWithProviderMap, err = csm.getValidConsumerSessionsWithProvider(tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch) if err != nil { @@ -448,10 +442,12 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS providers: initUnwantedProviders, currentEpoch: csm.atomicReadCurrentEpoch(), } + utils.LavaFormatTrace("GetSessions tempIgnoredProviders", utils.LogAttr("tempIgnoredProviders", tempIgnoredProviders)) // Get a valid consumerSessionsWithProvider sessionWithProviderMap, err := csm.getSessionWithProviderOrError(usedProviders, tempIgnoredProviders, cuNeededForSession, requestedBlock, addon, extensionNames, stateful, virtualEpoch) if err != nil { + utils.LavaFormatTrace("GetSessions error", utils.LogAttr("error", err.Error())) return nil, err } @@ -466,7 +462,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS sessionEpoch := sessionWithProvider.CurrentEpoch // Get a valid Endpoint from the provider chosen - connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false, false, addon, extensionNames) + connected, endpoints, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, sessionWithProvider.retryConnecting, false, addon, extensionNames) if err != nil { // verify err is AllProviderEndpointsDisabled and report. if AllProviderEndpointsDisabledError.Is(err) { @@ -697,6 +693,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP for _, providerAddress := range csm.currentlyBlockedProviderAddresses { // check if we have this provider already. if _, providerExistInIgnoredProviders := ignoredProviders.providers[providerAddress]; providerExistInIgnoredProviders { + utils.LavaFormatTrace("[continue] provider already in ignored providers", utils.LogAttr("providerAddress", providerAddress)) continue } consumerSessionsWithProvider := csm.pairing[providerAddress] @@ -707,11 +704,13 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP // validate this provider has enough cu to be used if err := consumerSessionsWithProvider.validateComputeUnits(cuNeededForSession, virtualEpoch); err != nil { // we already added to ignored we can just continue to the next provider + utils.LavaFormatTrace("[continue] no compute units", utils.LogAttr("providerAddress", providerAddress)) continue } // validate this provider supports the required extension or addon if !consumerSessionsWithProvider.IsSupportingAddon(addon) || !consumerSessionsWithProvider.IsSupportingExtensions(extensions) { + utils.LavaFormatTrace("[continue] no addon or extensions", utils.LogAttr("providerAddress", providerAddress)) continue } @@ -721,6 +720,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP providerAddress: &SessionWithProvider{ SessionsWithProvider: consumerSessionsWithProvider, CurrentEpoch: currentEpoch, + retryConnecting: true, }, }, nil } diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 72b74c83ea..f30cebe2ce 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -165,6 +165,7 @@ func (e *Endpoint) CheckSupportForServices(addon string, extensions []string) (s type SessionWithProvider struct { SessionsWithProvider *ConsumerSessionsWithProvider CurrentEpoch uint64 + retryConnecting bool } type SessionWithProviderMap map[string]*SessionWithProvider // key is the provider address @@ -482,6 +483,9 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes if !retryDisabledEndpoints && !endpoint.Enabled { continue } + if retryDisabledEndpoints { + utils.LavaFormatDebug("retrying to connect to disabled endpoint", utils.LogAttr("endpoint", endpoint.NetworkAddress), utils.LogAttr("provider", cswp.PublicLavaAddress)) + } // check endpoint supports the requested addons supported := endpoint.CheckSupportForServices(addon, extensionNames) diff --git a/scripts/pre_setups/init_lava_only_with_node_three_providers.sh b/scripts/pre_setups/init_lava_only_with_node_three_providers.sh index 869b8ffb35..b632cb7461 100755 --- a/scripts/pre_setups/init_lava_only_with_node_three_providers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_three_providers.sh @@ -62,21 +62,22 @@ lavad tx gov vote 3 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --ga screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ -$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ +$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7766" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' \ -$PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ +$PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7756" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25 screen -d -m -S provider3 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' \ -$PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ +$PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER3.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ":7746" 2>&1 | tee $LOGS_DIR/PROVIDER3.log" && sleep 0.25 + wait_next_block @@ -87,4 +88,11 @@ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id la echo "--- setting up screens done ---" screen -ls -echo "lavap rpcprovider $PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' $PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava" \ No newline at end of file +echo "Provider 1 command:" +echo "lavap rpcprovider $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ':7766'" + +echo "Provider 2 command:" +echo "lavap rpcprovider $PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ':7756'" + +echo "Provider 3 command:" +echo "lavap rpcprovider $PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' $PROVIDER3_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' $PROVIDER3_LISTENER LAV1 grpc '$LAVA_GRPC' $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain-id lava --metrics-listen-address ':7746'" diff --git a/scripts/useful_commands.sh b/scripts/useful_commands.sh index 50ae105be8..0773d893c3 100755 --- a/scripts/useful_commands.sh +++ b/scripts/useful_commands.sh @@ -183,6 +183,7 @@ get_base_specs() { "specs/mainnet-1/specs/ethereum.json" "specs/mainnet-1/specs/solana.json" "specs/mainnet-1/specs/aptos.json" + "specs/mainnet-1/specs/btc.json" ) (IFS=,; echo "${priority_specs[*]}") From 7ea75649201ff5176300ac533617cdd927e0fe46 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 27 Feb 2025 15:03:52 +0100 Subject: [PATCH 2/4] fix a sneakybug --- protocol/lavasession/consumer_session_manager.go | 11 ----------- protocol/lavasession/consumer_session_manager_test.go | 9 ++++++--- protocol/lavasession/consumer_types.go | 7 +++++-- protocol/rpcconsumer/rpcconsumer_server.go | 2 +- scripts/init_chain.sh | 2 +- 5 files changed, 13 insertions(+), 18 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index a802acf429..703dc411e8 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -925,17 +925,6 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu blockProvider = true } - if sdkerrors.IsOf(errorReceived, BlockEndpointError) { - utils.LavaFormatTrace("Got BlockEndpointError, blocking endpoint and session", - utils.LogAttr("error", errorReceived), - utils.LogAttr("sessionID", consumerSession.SessionId), - ) - - // Block the endpoint and the consumer session from future usages - consumerSession.EndpointConnection.blockListed.Store(true) - consumerSession.BlockListed = true - } - consumerSession.QoSManager.AddFailedRelay(consumerSession.epoch, consumerSession.SessionId) consumerSession.ConsecutiveErrors = append(consumerSession.ConsecutiveErrors, errorReceived) // copy consecutive errors for report. diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index eaab7a794e..66dadfdb1c 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -855,9 +855,12 @@ func TestAllProvidersEndpointsDisabled(t *testing.T) { pairingList := createPairingList("", false) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session - require.Nil(t, cs) - require.Error(t, err) + usedProviders := NewUsedProviders(nil) + cs, err := csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session + require.NoError(t, err) + for key, _ := range cs { + require.Contains(t, csm.currentlyBlockedProviderAddresses, key) + } } func TestUpdateAllProviders(t *testing.T) { diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index f30cebe2ce..ae53df9c60 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -96,8 +96,10 @@ type EndpointConnection struct { Client pairingtypes.RelayerClient connection *grpc.ClientConn numberOfSessionsUsingThisConnection uint64 - blockListed atomic.Bool - lbUniqueId string + // blockListed - currently unused, use it carefully as it will block this provider's endpoint until next epoch without forgiveness. + // Can be used in cases of data reliability, self provider conflict etc.. + blockListed atomic.Bool + lbUniqueId string // In case we got disconnected, we cant reconnect as we might lose stickiness // with the provider, if its using a load balancer disconnected bool @@ -500,6 +502,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes if endpointConnection.Client != nil && endpointConnection.connection != nil && !endpointConnection.disconnected { // Check if the endpoint is not blocked if endpointConnection.blockListed.Load() { + utils.LavaFormatDebug("Skipping provider's endpoint as its block listed", utils.LogAttr("address", endpoint.NetworkAddress), utils.LogAttr("PublicLavaAddress", cswp.PublicLavaAddress)) continue } connectionState := endpointConnection.connection.GetState() diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 5f3f2ef613..5f58cd2ca7 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -1014,7 +1014,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe if !singleConsumerSession.VerifyProviderUniqueIdAndStoreIfFirstTime(providerUniqueId[0]) { return reply, 0, utils.LavaFormatError("provider unique id mismatch", - errors.Join(lavasession.SessionOutOfSyncError, lavasession.BlockEndpointError), + lavasession.SessionOutOfSyncError, utils.LogAttr("GUID", ctx), utils.LogAttr("sessionId", relayRequest.RelaySession.SessionId), utils.LogAttr("provider", relayRequest.RelaySession.Provider), diff --git a/scripts/init_chain.sh b/scripts/init_chain.sh index 756c77f702..eaa271f1a4 100755 --- a/scripts/init_chain.sh +++ b/scripts/init_chain.sh @@ -59,7 +59,7 @@ else | jq '.app_state.downtime.params.downtime_duration = "6s"' \ | jq '.app_state.downtime.params.epoch_duration = "10s"' \ | jq '.app_state.epochstorage.params.epochsToSave = "8"' \ - | jq '.app_state.epochstorage.params.epochBlocks = "20"' \ + | jq '.app_state.epochstorage.params.epochBlocks = "240"' \ | jq '.app_state.pairing.params.recommendedEpochNumToCollectPayment = "2"' \ ) fi From e631a63f20fd6ce106689209bf220d7bdf6fbd36 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 27 Feb 2025 15:04:23 +0100 Subject: [PATCH 3/4] undo init chain changes --- scripts/init_chain.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/init_chain.sh b/scripts/init_chain.sh index eaa271f1a4..756c77f702 100755 --- a/scripts/init_chain.sh +++ b/scripts/init_chain.sh @@ -59,7 +59,7 @@ else | jq '.app_state.downtime.params.downtime_duration = "6s"' \ | jq '.app_state.downtime.params.epoch_duration = "10s"' \ | jq '.app_state.epochstorage.params.epochsToSave = "8"' \ - | jq '.app_state.epochstorage.params.epochBlocks = "240"' \ + | jq '.app_state.epochstorage.params.epochBlocks = "20"' \ | jq '.app_state.pairing.params.recommendedEpochNumToCollectPayment = "2"' \ ) fi From de4b719741ec5c666005e67db5e43df3431591bd Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Thu, 27 Feb 2025 15:18:44 +0100 Subject: [PATCH 4/4] lintiya --- protocol/lavasession/consumer_session_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 66dadfdb1c..c70e14c6ec 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -858,7 +858,7 @@ func TestAllProvidersEndpointsDisabled(t *testing.T) { usedProviders := NewUsedProviders(nil) cs, err := csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) - for key, _ := range cs { + for key := range cs { require.Contains(t, csm.currentlyBlockedProviderAddresses, key) } }