Skip to content

Commit

Permalink
fix: skip block scanning actions on Arbitrum and Avalanch networks (v…
Browse files Browse the repository at this point in the history
…27) (#3554)

* Hotfix to not observe TSS Addr direct TX.

* Remove whitespace

* invert logic to only include EVM chains currently deployed

* add polygon to direct TSS support for now

* add fix for declared but not used

* revert logger change

* add item to changelog.md

* bugfix: fix typo in function name called

* lint fixes

* lint fixes

* skip setting lastScannedTssRecvd to 0

* do scanning in parallel

* skip FilterTSSOutbound for ARB and AVAX as it time-consuming

* execute FilterTSSOutbound only when shouldScanTSSRecieve is true

* remove cruft comment

* cleanup naming and comments

---------

Co-authored-by: Alex Gartner <[email protected]>
Co-authored-by: Charlie Chen <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2025
1 parent caa4804 commit c6f75bb
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 36 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## v27.0.5
* [3554](https://github.com/zeta-chain/node/pull/3554) - disable observation of direct to TSS Address deposits on Arbitrum and Avalanch networks

## v27.0.4

### Fixes
Expand Down
138 changes: 102 additions & 36 deletions zetaclient/chains/evm/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"
"sort"
"strings"
"sync"

sdkmath "cosmossdk.io/math"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/zeta-chain/protocol-contracts/pkg/erc20custody.sol"
"github.com/zeta-chain/protocol-contracts/pkg/zetaconnector.non-eth.sol"

"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/memo"
Expand Down Expand Up @@ -131,62 +133,126 @@ func (ob *Observer) ObserveInbound(ctx context.Context) error {
return nil
}

// get last scanned block height (we simply use same height for all 3 events ZetaSent, Deposited, TssRecvd)
var (
lastScannedZetaSent uint64
lastScannedDeposited uint64
lastScannedGatewayDeposit uint64
lastScannedGatewayCall uint64
lastScannedGatewayDepositAndCall uint64
lastScannedTssRecvd uint64
)
// Note: using different heights for each event incurs more complexity (metrics, db, etc) and not worth it
startBlock, toBlock := ob.calcBlockRangeToScan(confirmedBlockNum, lastScanned, config.MaxBlocksPerPeriod)

wg := sync.WaitGroup{}

// task 1: query evm chain for zeta sent logs (read at most 100 blocks in one go)
lastScannedZetaSent, err := ob.ObserveZetaSent(ctx, startBlock, toBlock)
if err != nil {
return errors.Wrap(err, "unable to observe ZetaSent")
}
wg.Add(1)
go func() {
defer wg.Done()
var err error
lastScannedZetaSent, err = ob.ObserveZetaSent(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing zeta sent")
}
}()

// task 2: query evm chain for deposited logs (read at most 100 blocks in one go)
lastScannedDeposited := ob.ObserveERC20Deposited(ctx, startBlock, toBlock)
wg.Add(1)
go func() {
defer wg.Done()
lastScannedDeposited = ob.ObserveERC20Deposited(ctx, startBlock, toBlock)
}()

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd, err := ob.ObserverTSSReceive(ctx, startBlock, toBlock)
if err != nil {
return errors.Wrap(err, "unable to observe TSSReceive")
// skip slow block scanning actions for ARB, AVAX, and their testnets
chainID := ob.Chain().ChainId
shouldBlockScan := chainID != chains.ArbitrumMainnet.ChainId &&
chainID != chains.ArbitrumSepolia.ChainId &&
chainID != chains.AvalancheMainnet.ChainId &&
chainID != chains.AvalancheTestnet.ChainId

if shouldBlockScan {
wg.Add(1)
go func() {
defer wg.Done()
var err error
// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd, err = ob.ObserverTSSReceive(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observe TSSReceive")
}
// task 4: filter the outbounds from TSS address to supplement outbound trackers
// task 3 and task 4 both use the block cache so there little speedup running in parallel
// TODO: make this a separate go routine in outbound.go after switching to smart contract V2
ob.FilterTSSOutbound(ctx, startBlock, toBlock)
}()
}

// task 4: filter the outbounds from TSS address to supplement outbound trackers
// TODO: make this a separate go routine in outbound.go after switching to smart contract V2
//
ob.FilterTSSOutbound(ctx, startBlock, toBlock)

// query the gateway logs
// TODO: refactor in a more declarative design. Example: storing the list of contract and events to listen in an array
// https://github.com/zeta-chain/node/issues/2493
lastScannedGatewayDeposit, err := ob.ObserveGatewayDeposit(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing deposit events from Gateway contract")
}
lastScannedGatewayCall, err := ob.ObserveGatewayCall(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing call events from Gateway contract")
}
lastScannedGatewayDepositAndCall, err := ob.ObserveGatewayDepositAndCall(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing depositAndCall events from Gateway contract")
}
wg.Add(1)
go func() {
defer wg.Done()
var err error
lastScannedGatewayDeposit, err = ob.ObserveGatewayDeposit(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing deposit events from Gateway contract")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
var err error
lastScannedGatewayCall, err = ob.ObserveGatewayCall(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing call events from Gateway contract")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
var err error
lastScannedGatewayDepositAndCall, err = ob.ObserveGatewayDepositAndCall(ctx, startBlock, toBlock)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserveInbound: error observing depositAndCall events from Gateway contract")
}
}()
wg.Wait()

// note: using the lowest height for all events is not perfect,
// but it's simple and good enough
lowestLastScannedBlock := slices.Min([]uint64{
scannedBlocks := []uint64{
lastScannedZetaSent,
lastScannedDeposited,
lastScannedTssRecvd,
lastScannedGatewayDeposit,
lastScannedGatewayCall,
lastScannedGatewayDepositAndCall,
})
}
// only include lastScannedTssRecvd if it was set
if shouldBlockScan {
scannedBlocks = append(scannedBlocks, lastScannedTssRecvd)
}
// calculate the lowest last scanned block
lowestLastScannedBlock := slices.Min(scannedBlocks)

highestLastScannedBlock := slices.Max(scannedBlocks)
if highestLastScannedBlock-lowestLastScannedBlock > 10 {
ob.Logger().Inbound.Warn().
Uint64("observer.last_scanned_lowest", lowestLastScannedBlock).
Uint64("observer.highest_scanned_lowest", highestLastScannedBlock).
Msg("ObserveInbound: high scanned block delta")
}

// update last scanned block height for all 3 events (ZetaSent, Deposited, TssRecvd), ignore db error
if lowestLastScannedBlock > lastScanned {
Expand Down

0 comments on commit c6f75bb

Please sign in to comment.