diff --git a/.gitignore b/.gitignore index 5e3d05b03..a35a7e42a 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,5 @@ logs/* missing_tokens_df.csv tokens_and_fee_df.csv fastlane_bot/tests/nbtest/* + +.python-version diff --git a/fastlane_bot/bot.py b/fastlane_bot/bot.py index 9d9f347da..ae105e8d8 100644 --- a/fastlane_bot/bot.py +++ b/fastlane_bot/bot.py @@ -55,6 +55,7 @@ from typing import Generator, List, Dict, Tuple, Any, Callable from typing import Optional +import fastlane_bot from fastlane_bot.config import Config from fastlane_bot.helpers import ( TxRouteHandler, diff --git a/fastlane_bot/config/network.py b/fastlane_bot/config/network.py index 70b4e562a..687ab126b 100644 --- a/fastlane_bot/config/network.py +++ b/fastlane_bot/config/network.py @@ -429,6 +429,7 @@ class _ConfigNetworkMainnet(ConfigNetwork): RPC_ENDPOINT = "https://eth-mainnet.alchemyapi.io/v2/" WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_ALCHEMY_PROJECT_ID") + WEBSOCKET_URL = f"wss://eth-mainnet.g.alchemy.com/v2/{WEB3_ALCHEMY_PROJECT_ID}" MULTICALL_CONTRACT_ADDRESS = "0xcA11bde05977b3631167028862bE2a173976CA11" # NATIVE_GAS_TOKEN_KEY = "ETH-EEeE" # WRAPPED_GAS_TOKEN_KEY = "WETH-6Cc2" @@ -622,6 +623,7 @@ class _ConfigNetworkBase(ConfigNetwork): RPC_ENDPOINT = "https://base-mainnet.g.alchemy.com/v2/" WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_ALCHEMY_BASE") + WEBSOCKET_URL = f"wss://base-mainnet.g.alchemy.com/v2/{WEB3_ALCHEMY_PROJECT_ID}" GAS_ORACLE_ADDRESS = "0x420000000000000000000000000000000000000F" # source: https://docs.optimism.io/builders/tools/build/oracles#gas-oracle network_df = get_multichain_addresses(network="coinbase_base") FASTLANE_CONTRACT_ADDRESS = "0x2AE2404cD44c830d278f51f053a08F54b3756e1c" @@ -665,6 +667,7 @@ class _ConfigNetworkFantom(ConfigNetwork): RPC_ENDPOINT = "https://fantom.blockpi.network/v1/rpc/" WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_FANTOM") + WEBSOCKET_URL = f"wss://fantom.blockpi.network/v1/ws/{WEB3_ALCHEMY_PROJECT_ID}" network_df = get_multichain_addresses(network=NETWORK_NAME) FASTLANE_CONTRACT_ADDRESS = "0xFe19CbA3aB1A189B7FC17cAa798Df64Ad2b54d4D" MULTICALL_CONTRACT_ADDRESS = "0xcA11bde05977b3631167028862bE2a173976CA11" @@ -708,6 +711,7 @@ class _ConfigNetworkMantle(ConfigNetwork): RPC_ENDPOINT = "https://lb.drpc.org/ogrpc?network=mantle&dkey=" WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_MANTLE") + WEBSOCKET_URL = f"wss://lb.drpc.org/ogws?network=mantle&dkey={WEB3_ALCHEMY_PROJECT_ID}" GAS_ORACLE_ADDRESS = "0x420000000000000000000000000000000000000F" network_df = get_multichain_addresses(network=NETWORK_NAME) FASTLANE_CONTRACT_ADDRESS = "0xC7Dd38e64822108446872c5C2105308058c5C55C" @@ -792,6 +796,8 @@ class _ConfigNetworkSei(ConfigNetwork): RPC_ENDPOINT = "https://evm-rpc.arctic-1.seinetwork.io/" # TODO currently Sei devnet WEB3_ALCHEMY_PROJECT_ID = os.environ.get("WEB3_SEI") + + WEBSOCKET_URL = "wss://evm-ws.arctic-1.seinetwork.io" network_df = get_multichain_addresses(network=NETWORK_NAME) FASTLANE_CONTRACT_ADDRESS = "0xC7Dd38e64822108446872c5C2105308058c5C55C" #TODO - UPDATE WITH Mainnet MULTICALL_CONTRACT_ADDRESS = "0xcA11bde05977b3631167028862bE2a173976CA11" diff --git a/fastlane_bot/events/async_backdate_utils.py b/fastlane_bot/events/async_backdate_utils.py index cfe396b69..845f27a24 100644 --- a/fastlane_bot/events/async_backdate_utils.py +++ b/fastlane_bot/events/async_backdate_utils.py @@ -90,7 +90,6 @@ def async_handle_initial_iteration( last_block: int, mgr: Any, start_block: int, - current_block: int, ): if last_block == 0: non_multicall_rows_to_update = mgr.get_rows_to_update(start_block) @@ -105,7 +104,7 @@ def async_handle_initial_iteration( ) mgr.cfg.logger.info( - f"Backdating {len(other_pool_rows)} pools from {start_block} to {current_block}" + f"Backdating {len(other_pool_rows)} pools from {start_block}" ) start_time = time.time() async_backdate_from_contracts( diff --git a/fastlane_bot/events/async_event_update_utils.py b/fastlane_bot/events/async_event_update_utils.py index bbfa99273..a847599d0 100644 --- a/fastlane_bot/events/async_event_update_utils.py +++ b/fastlane_bot/events/async_event_update_utils.py @@ -27,6 +27,7 @@ from fastlane_bot.events.async_utils import get_contract_chunks from fastlane_bot.events.utils import update_pools_from_events from fastlane_bot.events.pools.utils import get_pool_cid +from .interfaces.event import Event nest_asyncio.apply() @@ -89,7 +90,7 @@ async def _get_missing_tkns(mgr: Any, c: List[Dict[str, Any]]) -> pd.DataFrame: return pd.concat(vals) -async def _get_token_and_fee(mgr: Any, exchange_name: str, ex: Any, address: str, contract: AsyncContract, event: Any): +async def _get_token_and_fee(mgr: Any, exchange_name: str, ex: Any, address: str, contract: AsyncContract, event: Event): """ This function uses the exchange object to get the tokens and fee for a given pool. @@ -97,7 +98,7 @@ async def _get_token_and_fee(mgr: Any, exchange_name: str, ex: Any, address: str ex(Any): The exchange object address(str): The pool address contract(AsyncContract): The contract object - event(Any): The event object + event(Event): The event object Returns: The tokens and fee info for the pool @@ -119,7 +120,7 @@ async def _get_token_and_fee(mgr: Any, exchange_name: str, ex: Any, address: str elif tkn1 == mgr.cfg.BNT_ADDRESS: tkn0 = connector_token - strategy_id = 0 if not ex.is_carbon_v1_fork else str(event["args"]["id"]) + strategy_id = 0 if not ex.is_carbon_v1_fork else str(event.args["id"]) pool_info = { "exchange_name": exchange_name, "address": address, @@ -319,7 +320,7 @@ def _get_pool_contracts(mgr: Any) -> List[Dict[str, Any]]: exchange_name = mgr.exchange_name_from_event(event) ex = mgr.exchanges[exchange_name] abi = ex.get_abi() - address = event["address"] + address = event.address contracts.append( { "exchange_name": exchange_name, diff --git a/fastlane_bot/events/exchanges/balancer.py b/fastlane_bot/events/exchanges/balancer.py index 01383cc51..791180f0c 100644 --- a/fastlane_bot/events/exchanges/balancer.py +++ b/fastlane_bot/events/exchanges/balancer.py @@ -17,8 +17,9 @@ from web3.contract import Contract from fastlane_bot.data.abi import BALANCER_VAULT_ABI, BALANCER_POOL_ABI_V1 -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.subscription import Subscription @dataclass @@ -47,6 +48,9 @@ def get_pool_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.AuthorizerChanged] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [] + async def get_fee(self, pool_id: str, contract: Contract) -> Tuple[str, float]: pool = self.get_pool(pool_id) if pool: diff --git a/fastlane_bot/events/exchanges/bancor_pol.py b/fastlane_bot/events/exchanges/bancor_pol.py index 1211cc5cc..4e40c7397 100644 --- a/fastlane_bot/events/exchanges/bancor_pol.py +++ b/fastlane_bot/events/exchanges/bancor_pol.py @@ -17,9 +17,11 @@ from web3.contract import Contract from fastlane_bot.data.abi import BANCOR_POL_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool from fastlane_bot import Config +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.event import Event +from ..interfaces.subscription import Subscription @dataclass @@ -46,14 +48,20 @@ def get_factory_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.TokenTraded, contract.events.TradingEnabled] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [ + Subscription(contract.events.TokenTraded), + Subscription(contract.events.TradingEnabled), + ] + async def get_fee(self, address: str, contract: Contract) -> Tuple[str, float]: return "0.000", 0.000 - async def get_tkn0(self, address: str, contract: Contract, event: Any) -> str: - return event["args"]["token"] + async def get_tkn0(self, address: str, contract: Contract, event: Event) -> str: + return event.args["token"] - async def get_tkn1(self, address: str, contract: Contract, event: Any) -> str: - return self.ETH_ADDRESS if event["args"]["token"] not in self.ETH_ADDRESS else self.BNT_ADDRESS + async def get_tkn1(self, address: str, contract: Contract, event: Event) -> str: + return self.ETH_ADDRESS if event.args["token"] not in self.ETH_ADDRESS else self.BNT_ADDRESS def save_strategy( self, diff --git a/fastlane_bot/events/exchanges/bancor_v2.py b/fastlane_bot/events/exchanges/bancor_v2.py index 217b1297a..d1b4b149e 100644 --- a/fastlane_bot/events/exchanges/bancor_v2.py +++ b/fastlane_bot/events/exchanges/bancor_v2.py @@ -14,11 +14,14 @@ from dataclasses import dataclass from typing import List, Type, Tuple, Any +from web3 import AsyncWeb3 from web3.contract import Contract, AsyncContract from fastlane_bot.data.abi import BANCOR_V2_CONVERTER_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.event import Event +from ..interfaces.subscription import Subscription @dataclass @@ -44,6 +47,9 @@ def get_factory_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.TokenRateUpdate] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [Subscription(contract.events.TokenRateUpdate)] + # def async convert_address(self, address: str, contract: Contract) -> str: # return @@ -59,15 +65,15 @@ async def get_fee(self, address: str, contract: AsyncContract) -> Tuple[str, flo fee_float = float(fee) / 1e6 return fee, fee_float - async def get_tkn0(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn0(self, address: str, contract: Contract, event: Event) -> str: if event: - return event["args"]["_token1"] - return await contract.caller.reserveTokens()[0] + return event.args["_token1"] + return await contract.functions.reserveTokens()[0] - async def get_tkn1(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn1(self, address: str, contract: Contract, event: Event) -> str: if event: - return event["args"]["_token2"] - return await contract.caller.reserveTokens()[1] + return event.args["_token2"] + return await contract.functions.reserveTokens()[1] async def get_anchor(self, contract: Contract) -> str: return await contract.caller.anchor() diff --git a/fastlane_bot/events/exchanges/bancor_v3.py b/fastlane_bot/events/exchanges/bancor_v3.py index 7e0e5b20d..da2c76733 100644 --- a/fastlane_bot/events/exchanges/bancor_v3.py +++ b/fastlane_bot/events/exchanges/bancor_v3.py @@ -17,8 +17,13 @@ from web3.contract import Contract from fastlane_bot.data.abi import BANCOR_V3_POOL_COLLECTION_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.event import Event +from ..interfaces.subscription import Subscription + + +LIQUIDITY_UPDATED_TOPIC = "0x6e96dc5343d067ec486a9920e0304c3610ed05c65e45cc029d9b9fe7ecfa7620" @dataclass @@ -44,15 +49,18 @@ def get_factory_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.TradingLiquidityUpdated] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [Subscription(contract.events.TradingLiquidityUpdated, LIQUIDITY_UPDATED_TOPIC)] + async def get_fee(self, address: str, contract: Contract) -> Tuple[str, float]: return "0.000", 0.000 - async def get_tkn0(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn0(self, address: str, contract: Contract, event: Event) -> str: return self.BNT_ADDRESS - async def get_tkn1(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn1(self, address: str, contract: Contract, event: Event) -> str: return ( - event["args"]["pool"] - if event["args"]["pool"] != self.BNT_ADDRESS - else event["args"]["tkn_address"] + event.args["pool"] + if event.args["pool"] != self.BNT_ADDRESS + else event.args["tkn_address"] ) diff --git a/fastlane_bot/events/exchanges/base.py b/fastlane_bot/events/exchanges/base.py index a63c7ed29..ed641d974 100644 --- a/fastlane_bot/events/exchanges/base.py +++ b/fastlane_bot/events/exchanges/base.py @@ -17,7 +17,8 @@ from web3.contract import Contract, AsyncContract from fastlane_bot.config.constants import CARBON_V1_NAME -from fastlane_bot.events.pools.base import Pool +from ..pools.base import Pool +from ..interfaces.subscription import Subscription @dataclass @@ -96,6 +97,10 @@ def get_events(self, contract: Contract) -> List[Type[Contract]]: """ pass + @abstractmethod + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + ... + @staticmethod @abstractmethod async def get_fee(address: str, contract: AsyncContract) -> float: diff --git a/fastlane_bot/events/exchanges/carbon_v1.py b/fastlane_bot/events/exchanges/carbon_v1.py index 3dcf93a98..2957e04cf 100644 --- a/fastlane_bot/events/exchanges/carbon_v1.py +++ b/fastlane_bot/events/exchanges/carbon_v1.py @@ -18,9 +18,16 @@ from web3.contract import Contract from fastlane_bot.data.abi import CARBON_CONTROLLER_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool -from fastlane_bot.events.pools.utils import get_pool_cid +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.event import Event +from ..interfaces.subscription import Subscription +from ..pools.utils import get_pool_cid + + +STRATEGY_CREATED_TOPIC = "0xff24554f8ccfe540435cfc8854831f8dcf1cf2068708cfaf46e8b52a4ccc4c8d" +STRATEGY_UPDATED_TOPIC = "0x720da23a5c920b1d8827ec83c4d3c4d90d9419eadb0036b88cb4c2ffa91aef7d" +STRATEGY_DELETED_TOPIC = "0x4d5b6e0627ea711d8e9312b6ba56f50e0b51d41816fd6fd38643495ac81d38b6" @dataclass @@ -73,6 +80,16 @@ def get_events(self, contract: Contract) -> List[Type[Contract]]: contract.events.PairCreated, ] if self.exchange_initialized else [] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [ + Subscription(contract.events.StrategyCreated, STRATEGY_CREATED_TOPIC), + Subscription(contract.events.StrategyUpdated, STRATEGY_UPDATED_TOPIC), + Subscription(contract.events.StrategyDeleted, STRATEGY_DELETED_TOPIC), + Subscription(contract.events.PairTradingFeePPMUpdated), + Subscription(contract.events.TradingFeePPMUpdated), + Subscription(contract.events.PairCreated), + ] + async def get_fee( self, address: str, contract: Contract ) -> Tuple[str, float]: @@ -94,7 +111,7 @@ async def get_fee( fee = await contract.tradingFeePPM() return f"{fee}", fee / 1e6 - async def get_tkn0(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn0(self, address: str, contract: Contract, event: Event) -> str: """ Get the token0 address from the contract or event. @@ -116,9 +133,9 @@ async def get_tkn0(self, address: str, contract: Contract, event: Any) -> str: if event is None: return await contract.caller.token0() else: - return event["args"]["token0"] + return event.args["token0"] - async def get_tkn1(self, address: str, contract: Contract, event: Any) -> str: + async def get_tkn1(self, address: str, contract: Contract, event: Event) -> str: """ Get the token1 address from the contract or event. @@ -140,7 +157,7 @@ async def get_tkn1(self, address: str, contract: Contract, event: Any) -> str: if event is None: return await contract.caller.token1() else: - return event["args"]["token1"] + return event.args["token1"] def delete_strategy(self, id: str): """ diff --git a/fastlane_bot/events/exchanges/solidly_v2.py b/fastlane_bot/events/exchanges/solidly_v2.py index 0197f5315..d6f5aa02a 100644 --- a/fastlane_bot/events/exchanges/solidly_v2.py +++ b/fastlane_bot/events/exchanges/solidly_v2.py @@ -19,8 +19,10 @@ from fastlane_bot.data.abi import SOLIDLY_V2_POOL_ABI, VELOCIMETER_V2_FACTORY_ABI, SOLIDLY_V2_FACTORY_ABI, \ SCALE_V2_FACTORY_ABI, CLEOPATRA_V2_FACTORY_ABI, LYNEX_V2_FACTORY_ABI, NILE_V2_FACTORY_ABI, \ XFAI_V0_FACTORY_ABI, XFAI_V0_CORE_ABI, XFAI_V0_POOL_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.subscription import Subscription + async def _get_fee_1(address: str, contract: Contract, factory_contract: Contract) -> int: return await factory_contract.caller.getFee(address) @@ -104,6 +106,9 @@ def get_factory_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.Sync] if self.exchange_initialized else [] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [Subscription(contract.events.Sync)] + async def get_fee(self, address: str, contract: AsyncContract) -> Tuple[str, float]: exchange_info = EXCHANGE_INFO[self.exchange_name] fee = await exchange_info["get_fee"](address, contract, self.factory_contract) diff --git a/fastlane_bot/events/exchanges/uniswap_v2.py b/fastlane_bot/events/exchanges/uniswap_v2.py index 198ce4a2f..870b938bf 100644 --- a/fastlane_bot/events/exchanges/uniswap_v2.py +++ b/fastlane_bot/events/exchanges/uniswap_v2.py @@ -17,8 +17,9 @@ from web3.contract import Contract, AsyncContract from fastlane_bot.data.abi import UNISWAP_V2_POOL_ABI, UNISWAP_V2_FACTORY_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.subscription import Subscription @dataclass @@ -49,6 +50,9 @@ def get_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.Sync] if self.exchange_initialized else [] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [Subscription(contract.events.Sync)] + async def get_fee(self, address: str, contract: AsyncContract) -> Tuple[str, float]: return self.fee, self.fee_float diff --git a/fastlane_bot/events/exchanges/uniswap_v3.py b/fastlane_bot/events/exchanges/uniswap_v3.py index 97870bf68..2136a32a2 100644 --- a/fastlane_bot/events/exchanges/uniswap_v3.py +++ b/fastlane_bot/events/exchanges/uniswap_v3.py @@ -18,8 +18,9 @@ from fastlane_bot.config.constants import AGNI_V3_NAME, PANCAKESWAP_V3_NAME, FUSIONX_V3_NAME, ECHODEX_V3_NAME, SECTA_V3_NAME from fastlane_bot.data.abi import UNISWAP_V3_POOL_ABI, UNISWAP_V3_FACTORY_ABI, PANCAKESWAP_V3_POOL_ABI -from fastlane_bot.events.exchanges.base import Exchange -from fastlane_bot.events.pools.base import Pool +from ..exchanges.base import Exchange +from ..pools.base import Pool +from ..interfaces.subscription import Subscription @dataclass @@ -45,6 +46,9 @@ def get_factory_abi(self): def get_events(self, contract: Contract) -> List[Type[Contract]]: return [contract.events.Swap] if self.exchange_initialized else [] + def get_subscriptions(self, contract: Contract) -> List[Subscription]: + return [Subscription(contract.events.Swap)] + async def get_fee(self, address: str, contract: Contract) -> Tuple[str, float]: fee = await contract.caller.fee() fee_float = float(fee) / 1e6 diff --git a/fastlane_bot/events/interfaces/event.py b/fastlane_bot/events/interfaces/event.py new file mode 100644 index 000000000..98bb5c2fc --- /dev/null +++ b/fastlane_bot/events/interfaces/event.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from typing import Any, Dict + + +@dataclass +class Event: + args: Dict[str, Any] + event: str + log_index: int + transaction_index: int + transaction_hash: str + address: str + block_hash: str + block_number: int + + @classmethod + def from_dict(cls, data): + return cls( + args=data["args"], + event=data["event"], + log_index=data["logIndex"], + transaction_index=data["transactionIndex"], + transaction_hash=data["transactionHash"], + address=data["address"], + block_hash=data["blockHash"], + block_number=data["blockNumber"], + ) diff --git a/fastlane_bot/events/interfaces/subscription.py b/fastlane_bot/events/interfaces/subscription.py new file mode 100644 index 000000000..5d78bf8ab --- /dev/null +++ b/fastlane_bot/events/interfaces/subscription.py @@ -0,0 +1,46 @@ +from typing import Optional + +from web3 import AsyncWeb3, Web3 +from web3.contract.contract import ContractEvent + +from ..utils import complex_handler +from .event import Event + + +def _get_event_topic(event): + abi = event().abi + topic = Web3.keccak(text=f'{abi["name"]}({",".join([arg["type"] for arg in abi["inputs"]])})') + return topic.hex() + + +class Subscription: + def __init__(self, event: ContractEvent, topic: Optional[str] = None): + self._event = event + self._topic = _get_event_topic(event) if topic is None else topic + self._subscription_id = None + self._latest_event_index = (-1, -1) # (block_number, block_index) + + async def subscribe(self, w3: AsyncWeb3): + self._subscription_id = await w3.eth.subscribe("logs", {"topics": [self._topic]}) + + @property + def subscription_id(self): + return self._subscription_id + + def process_log(self, log) -> Optional[Event]: + if self._is_event_latest(log): + self._latest_event_index = (log["blockNumber"], log["transactionIndex"]) + return self._parse_log(log) + else: + return None + + def _parse_log(self, log) -> Event: + try: + event_data = complex_handler(self._event().process_log(log)) + except: + print(log) + raise + return Event.from_dict(event_data) + + def _is_event_latest(self, event) -> bool: + return (event["blockNumber"], event["transactionIndex"]) > self._latest_event_index diff --git a/fastlane_bot/events/listener.py b/fastlane_bot/events/listener.py new file mode 100644 index 000000000..11b7f553a --- /dev/null +++ b/fastlane_bot/events/listener.py @@ -0,0 +1,72 @@ +import asyncio +import time +from typing import AsyncGenerator, List + +from web3 import AsyncWeb3 + +from .utils import complex_handler +from .managers.base import BaseManager +from .interfaces.event import Event + + +class EventListener: + """ + The EventListener is the entry point to create & manage websocket event subscriptions. + """ + NEW_EVENT_TIMEOUT = 0.01 + + def __init__( + self, + manager: BaseManager, + base_exchanges: List[str], + w3: AsyncWeb3, + ): + """ Initializes the EventManager. + Args: + base_exchanges: The list of base exchanges for which to gather events. + carbon_controller_addresses: The list of Carbon Controller addresses for which to gather events. + w3: The connected AsyncWeb3 object. + """ + self._manager = manager + self._w3 = w3 + self._subscriptions = [] # Initialize the subscription list + self._subscription_by_id = {} + self._event_buffer = [] + self._last_event_ts = 0 + + for exchange_name, exchange in self._manager.exchanges.items(): + if exchange.base_exchange_name in base_exchanges: + self._subscriptions += exchange.get_subscriptions(self._manager.event_contracts[exchange_name]) + + async def pull_block_events(self) -> AsyncGenerator[List[Event], None]: + """ Collects latest events from Websocket. + + Returns: + List: A stream of processed event batches retrieved from the Websocket. + """ + for subscription in self._subscriptions: + await subscription.subscribe(self._w3) + self._subscription_by_id[subscription.subscription_id] = subscription + + asyncio.ensure_future(self._listen()) + async for batch in self._get_batched_events(): + yield batch + + async def _get_batched_events(self): + while True: + ts = time.time() + if ts >= self._last_event_ts + self.NEW_EVENT_TIMEOUT: + if self._event_buffer != []: + batch = self._event_buffer.copy() + self._event_buffer = [] + yield batch + await asyncio.sleep(min(self.NEW_EVENT_TIMEOUT, self._last_event_ts + self.NEW_EVENT_TIMEOUT - ts)) + + async def _listen(self): + async for response in self._w3.ws.process_subscriptions(): + self._last_event_ts = time.time() + subscription_id = response["subscription"] + log = response["result"] + event = self._subscription_by_id[subscription_id].process_log(log) + if event is not None: + self._event_buffer.append(complex_handler(event)) diff --git a/fastlane_bot/events/managers/base.py b/fastlane_bot/events/managers/base.py index b76a2d592..abb55223d 100644 --- a/fastlane_bot/events/managers/base.py +++ b/fastlane_bot/events/managers/base.py @@ -23,6 +23,7 @@ from fastlane_bot.events.exchanges.base import Exchange from fastlane_bot.events.pools.utils import get_pool_cid from fastlane_bot.events.pools import pool_factory +from ..interfaces.event import Event @dataclass @@ -241,13 +242,13 @@ def get_fee_pairs( ) return fee_pairs - def exchange_name_from_event(self, event: Dict[str, Any]) -> str: + def exchange_name_from_event(self, event: Event) -> str: """ Get the exchange name from the event. Parameters ---------- - event : Dict[str, Any] + event : Event The event. Returns @@ -255,8 +256,8 @@ def exchange_name_from_event(self, event: Dict[str, Any]) -> str: str The exchange name. """ - if 'id' in event['args']: - carbon_controller_address = event['address'] + if 'id' in event.args: + carbon_controller_address = event.address for ex in self.cfg.CARBON_CONTROLLER_MAPPING: if self.cfg.CARBON_CONTROLLER_MAPPING[ex] == carbon_controller_address: return ex @@ -270,7 +271,7 @@ def exchange_name_from_event(self, event: Dict[str, Any]) -> str: return None def check_forked_exchange_names( - self, exchange_name_default: str = None, address: str = None, event: Any = None + self, exchange_name_default: str = None, address: str = None, event: Event = None ) -> str: """ Check the forked exchange names. If the exchange name is forked (Sushiswap from UniswapV2, etc) return the @@ -282,7 +283,7 @@ def check_forked_exchange_names( The default exchange name. address : str, optional The address. - event : Any, optional + event : Event, optional The event. Returns @@ -793,11 +794,11 @@ def validate_pool_info( if key != "strategy_id" and (pool_info is None or not pool_info): # Uses method in ContractsManager.add_pool_info_from_contract class to get pool info from contract pool_info = self.add_pool_info_from_contract( - address=addr, event=event, block_number=event["blockNumber"] + address=addr, event=event, block_number=event.block_number ) # if addr in self.cfg.CARBON_CONTROLLER_MAPPING: - # cid = event["args"]["id"] if event is not None else pool_info["strategy_id"] + # cid = event.args["id"] if event is not None else pool_info["strategy_id"] # # for pool in self.pool_data: # if pool["cid"] == cid: @@ -810,14 +811,14 @@ def validate_pool_info( return pool_info def get_key_and_value( - self, event: Dict[str, Any], addr: str, ex_name: str + self, event: Event, addr: str, ex_name: str ) -> Tuple[str, Any]: """ Get the key and value. Parameters ---------- - event : Dict[str, Any] + event : Event The event. addr : str The address. @@ -831,38 +832,38 @@ def get_key_and_value( """ if ex_name == "bancor_pol": - return "token", event["args"]["token"] + return "token", event.args["token"] if ex_name in self.cfg.CARBON_V1_FORKS: - info = {'exchange_name': ex_name, 'strategy_id': event["args"]["id"]} + info = {'exchange_name': ex_name, 'strategy_id': event.args["id"]} return "cid", get_pool_cid(info, self.cfg.CARBON_V1_FORKS) if ex_name in self.cfg.ALL_FORK_NAMES_WITHOUT_CARBON: return "address", addr if ex_name == "bancor_v2": return ("tkn0_address", "tkn1_address"), ( - event["args"]["_token1"], - event["args"]["_token2"], + event.args["_token1"], + event.args["_token2"], ) if ex_name == "bancor_v3": value = ( - event["args"]["tkn_address"] - if event["args"]["tkn_address"] != self.cfg.BNT_ADDRESS - else event["args"]["pool"] + event.args["tkn_address"] + if event.args["tkn_address"] != self.cfg.BNT_ADDRESS + else event.args["pool"] ) return "tkn1_address", value raise ValueError( f"[managers.base.get_key_and_value] Exchange {ex_name} not supported" ) - def handle_strategy_deleted(self, event: Dict[str, Any]) -> None: + def handle_strategy_deleted(self, event: Event) -> None: """ Handle the strategy deleted event. Parameters ---------- - event : Dict[str, Any] + event : Event The event. """ - strategy_id = event["args"]["id"] + strategy_id = event.args["id"] exchange_name = self.exchange_name_from_event(event) cids = [p["cid"] for p in self.pool_data if p["strategy_id"] == strategy_id and p["exchange_name"] == exchange_name] @@ -901,13 +902,13 @@ def pool_key_value_from_event(key: str, event: Dict[str, Any]) -> Any: The pool key value. """ if key == "cid": - return event["args"]["id"] + return event.args["id"] elif key == "address": - return event["address"] + return event.address elif key == "tkn0_address": - return event["args"]["token0"] + return event.args["token0"] elif key == "tkn1_address": - return event["args"]["token1"] + return event.args["token1"] print_events = [] diff --git a/fastlane_bot/events/managers/contracts.py b/fastlane_bot/events/managers/contracts.py index 03a4cd0c1..1355d4ba9 100644 --- a/fastlane_bot/events/managers/contracts.py +++ b/fastlane_bot/events/managers/contracts.py @@ -28,6 +28,7 @@ ) from fastlane_bot.events.managers.base import BaseManager from fastlane_bot.events.pools.utils import get_pool_cid +from ..interfaces.event import Event class ContractsManager(BaseManager): @@ -203,7 +204,7 @@ def add_pool_info_from_contract( self, exchange_name: str = None, address: str = None, - event: Any = None, + event: Event = None, tenderly_exchanges: List[str] = None, ) -> Dict[str, Any]: """ @@ -244,8 +245,8 @@ def add_pool_info_from_contract( t0_addr = self.exchanges[exchange_name].get_tkn0(address, pool_contract, event) t1_addr = self.exchanges[exchange_name].get_tkn1(address, pool_contract, event) - block_number = event["blockNumber"] - strategy_id = event["args"]["id"] if exchange_name in self.cfg.CARBON_V1_FORKS else None + block_number = event.block_number + strategy_id = event.args["id"] if exchange_name in self.cfg.CARBON_V1_FORKS else None temp_pool_info = { "exchange_name": exchange_name, "fee": f"{fee}", diff --git a/fastlane_bot/events/managers/manager.py b/fastlane_bot/events/managers/manager.py index 998356c53..7b80e66f6 100644 --- a/fastlane_bot/events/managers/manager.py +++ b/fastlane_bot/events/managers/manager.py @@ -18,33 +18,33 @@ from fastlane_bot.events.managers.contracts import ContractsManager from fastlane_bot.events.managers.events import EventManager from fastlane_bot.events.managers.pools import PoolManager -from fastlane_bot.events.pools.utils import get_pool_cid +from ..interfaces.event import Event class Manager(PoolManager, EventManager, ContractsManager): - def update_from_event(self, event: Dict[str, Any]): + def update_from_event(self, event: Event): """ Updates the state of the pool data from an event. StrategyCreated and StrategyUpdated events are handled as the "default" event types to process. Args: - event (Dict[str, Any]): The event to process. + event (Event): The event to process. """ ex_name = self.exchange_name_from_event(event) - if event["event"] in ["TradingFeePPMUpdated", "PairTradingFeePPMUpdated"]: + if event.event in ["TradingFeePPMUpdated", "PairTradingFeePPMUpdated"]: self.handle_trading_fee_updated() return - if event["event"] == "PairCreated": + if event.event == "PairCreated": self.set_carbon_v1_fee_pairs() return - if event["event"] == "StrategyDeleted": + if event.event == "StrategyDeleted": self.handle_strategy_deleted(event) return - addr = self.web3.to_checksum_address(event["address"]) + addr = self.web3.to_checksum_address(event.address) if not ex_name: return @@ -63,9 +63,7 @@ def update_from_event(self, event: Dict[str, Any]): pool_info["descr"] = self.pool_descr_from_info(pool_info) pool = self.get_or_init_pool(pool_info) - data = pool.update_from_event( - event or {}, pool.get_common_data(event, pool_info) - ) + data = pool.update_from_event(event, pool.get_common_data(event, pool_info)) self.update_pool_data(pool_info, data) def update_from_pool_info( @@ -166,18 +164,18 @@ def update( def handle_pair_trading_fee_updated( self, - event: Dict[str, Any] = None, + event: Event = None, ): """ Handle the pair trading fee updated event by updating the fee pairs and pool info for the given pair. Parameters ---------- - event : Dict[str, Any], optional + event : Event, optional The event, by default None. """ - tkn0_address = event["args"]["token0"] - tkn1_address = event["args"]["token1"] + tkn0_address = event.args["token0"] + tkn1_address = event.args["token1"] for exchange_name in self.cfg.CARBON_V1_FORKS: if exchange_name in self.exchanges: @@ -244,3 +242,23 @@ def handle_trading_fee_updated(self): pool["fee_float"] = pool["fee"] / 1e6 pool["descr"] = self.pool_descr_from_info(pool) self.pool_data[idx] = pool + + + def update_remaining_pools(self): + remaining_pools = [] + all_events = [pool[2] for pool in self.pools_to_add_from_contracts] + for event in all_events: + addr = self.web3.to_checksum_address(event.address) + ex_name = self.exchange_name_from_event(event) + if not ex_name: + self.cfg.logger.warning("[update_remaining_pools] ex_name not found from event") + continue + + key, key_value = self.get_key_and_value(event, addr, ex_name) + pool_info = self.get_pool_info(key, key_value, ex_name) + + if not pool_info: + remaining_pools.append((addr, ex_name, event, key, key_value)) + + random.shuffle(remaining_pools) + self.pools_to_add_from_contracts = remaining_pools diff --git a/fastlane_bot/events/pools/balancer.py b/fastlane_bot/events/pools/balancer.py index 80f894aea..6f8cd4b2b 100644 --- a/fastlane_bot/events/pools/balancer.py +++ b/fastlane_bot/events/pools/balancer.py @@ -15,7 +15,7 @@ from web3.contract import Contract from .base import Pool -from fastlane_bot import Config +from ..interfaces.event import Event @dataclass @@ -46,7 +46,7 @@ def event_matches_format( return False def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. diff --git a/fastlane_bot/events/pools/bancor_pol.py b/fastlane_bot/events/pools/bancor_pol.py index 12acc7b95..c14b01caa 100644 --- a/fastlane_bot/events/pools/bancor_pol.py +++ b/fastlane_bot/events/pools/bancor_pol.py @@ -18,6 +18,7 @@ from fastlane_bot.data.abi import ERC20_ABI, BANCOR_POL_ABI from fastlane_bot.events.pools.base import Pool +from ..interfaces.event import Event from _decimal import Decimal @@ -41,14 +42,14 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ Check if an event matches the format of a Bancor pol event. Parameters ---------- - event : Dict[str, Any] + event : Event The event arguments. Returns @@ -57,11 +58,11 @@ def event_matches_format( True if the event matches the format of a Bancor v3 event, False otherwise. """ - event_args = event["args"] + event_args = event.args return "token" in event_args and "token0" not in event_args def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ This updates the pool balance from TokenTraded events. @@ -69,12 +70,12 @@ def update_from_event( See base class. """ - event_type = event_args["event"] + event_type = event.event if event_type in "TradingEnabled": - data["tkn0_address"] = event_args["args"]["token"] - data["tkn1_address"] = "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" if event_args["args"]["token"] not in "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" else "0x1F573D6Fb3F13d689FF844B4cE37794d79a7FF1C" + data["tkn0_address"] = event.args["token"] + data["tkn1_address"] = "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" if event.args["token"] not in "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" else "0x1F573D6Fb3F13d689FF844B4cE37794d79a7FF1C" - if event_args["args"]["token"] == self.state["tkn0_address"] and event_type in [ + if event.args["token"] == self.state["tkn0_address"] and event_type in [ "TokenTraded" ]: # *** Balance now updated from multicall *** diff --git a/fastlane_bot/events/pools/bancor_v2.py b/fastlane_bot/events/pools/bancor_v2.py index 68068bea7..b0f993418 100644 --- a/fastlane_bot/events/pools/bancor_v2.py +++ b/fastlane_bot/events/pools/bancor_v2.py @@ -15,6 +15,7 @@ from web3.contract import Contract from fastlane_bot.events.pools.base import Pool +from ..interfaces.event import Event @dataclass @@ -34,14 +35,14 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ Check if an event matches the format of a Bancor v2 event. Parameters ---------- - event : Dict[str, Any] + event : Event The event arguments. Returns @@ -50,21 +51,21 @@ def event_matches_format( True if the event matches the format of a Bancor v3 event, False otherwise. """ - event_args = event["args"] + event_args = event.args return "_rateN" in event_args def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ **** IMPORTANT **** Bancor V2 pools emit 3 events per trade. Only one of them contains the new token balances we want. The one we want is the one where _token1 and _token2 match the token addresses of the pool. """ - data["tkn0_address"] = event_args["args"]["_token1"] - data["tkn1_address"] = event_args["args"]["_token2"] - data["tkn0_balance"] = event_args["args"]["_rateD"] - data["tkn1_balance"] = event_args["args"]["_rateN"] + data["tkn0_address"] = event.args["_token1"] + data["tkn1_address"] = event.args["_token2"] + data["tkn0_balance"] = event.args["_rateD"] + data["tkn1_balance"] = event.args["_rateN"] for key, value in data.items(): self.state[key] = value diff --git a/fastlane_bot/events/pools/bancor_v3.py b/fastlane_bot/events/pools/bancor_v3.py index 1b2281662..b266a819d 100644 --- a/fastlane_bot/events/pools/bancor_v3.py +++ b/fastlane_bot/events/pools/bancor_v3.py @@ -15,6 +15,7 @@ from web3.contract import Contract from .base import Pool +from ..interfaces.event import Event @dataclass @@ -34,14 +35,14 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ Check if an event matches the format of a Bancor v3 event. Parameters ---------- - event : Dict[str, Any] + event : Event The event arguments. Returns @@ -50,20 +51,19 @@ def event_matches_format( True if the event matches the format of a Bancor v3 event, False otherwise. """ - event_args = event["args"] + event_args = event.args return "pool" in event_args def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. """ - event_args = event_args["args"] - if event_args["tkn_address"] == "0x1F573D6Fb3F13d689FF844B4cE37794d79a7FF1C": - data["tkn0_balance"] = event_args["newLiquidity"] + if event.args["tkn_address"] == "0x1F573D6Fb3F13d689FF844B4cE37794d79a7FF1C": + data["tkn0_balance"] = event.args["newLiquidity"] else: - data["tkn1_balance"] = event_args["newLiquidity"] + data["tkn1_balance"] = event.args["newLiquidity"] for key, value in data.items(): self.state[key] = value diff --git a/fastlane_bot/events/pools/base.py b/fastlane_bot/events/pools/base.py index e7f5870bb..4aa92999a 100644 --- a/fastlane_bot/events/pools/base.py +++ b/fastlane_bot/events/pools/base.py @@ -18,6 +18,8 @@ from web3 import Web3 from web3.contract import Contract +from ..interfaces.event import Event + @dataclass class Pool(ABC): @@ -61,37 +63,37 @@ def event_matches_format( @staticmethod def get_common_data( - event: Dict[str, Any], pool_info: Dict[str, Any] + event: Event, pool_info: Dict[str, Any] ) -> Dict[str, Any]: """ Get common (common to all Pool child classes) data from an event and pool info. Args: - event (Dict[str, Any]): The event data. + event (Event): The event data. pool_info (Dict[str, Any]): The pool information. Returns: Dict[str, Any]: A dictionary containing common data extracted from the event and pool info. """ return { - "last_updated_block": event["blockNumber"], + "last_updated_block": event.block_number, "timestamp": datetime.now().strftime("%H:%M:%S"), "pair_name": pool_info["pair_name"], "descr": pool_info["descr"], - "address": event["address"], + "address": event.address, } @staticmethod @abstractmethod def update_from_event( - event_args: Dict[str, Any], data: Dict[str, Any] + event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update the pool state from an event. Parameters ---------- - event_args : Dict[str, Any] + event : Event The event arguments. data : Dict[str, Any] The pool data. diff --git a/fastlane_bot/events/pools/carbon_v1.py b/fastlane_bot/events/pools/carbon_v1.py index 4d62da546..184b13e83 100644 --- a/fastlane_bot/events/pools/carbon_v1.py +++ b/fastlane_bot/events/pools/carbon_v1.py @@ -15,6 +15,7 @@ from web3.contract import Contract from .base import Pool +from ..interfaces.event import Event from fastlane_bot.events.pools.utils import get_pool_cid @@ -36,34 +37,31 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ see base class. """ - event_args = event["args"] + event_args = event.args return "order0" in event_args def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. """ - event_type = event_args["event"] - assert event_type not in ["TradingFeePPMUpdated", "PairTradingFeePPMUpdated"], ( + assert event.event not in ["TradingFeePPMUpdated", "PairTradingFeePPMUpdated"], ( "This event should not be " "handled by this class." ) - data = CarbonV1Pool.parse_event(data, event_args, event_type) + data = CarbonV1Pool.parse_event(data, event) data["router"] = self.router_address for key, value in data.items(): self.state[key] = value return data @staticmethod - def parse_event( - data: Dict[str, Any], event_args: Dict[str, Any], event_type: str - ) -> Dict[str, Any]: + def parse_event(data: Dict[str, Any], event: Event) -> Dict[str, Any]: """ Parse the event args into a dict. @@ -71,18 +69,16 @@ def parse_event( ---------- data : Dict[str, Any] The data to update. - event_args : Dict[str, Any] - The event arguments. - event_type : str - The event type. + event_args : Event + The event object. Returns ------- Dict[str, Any] The updated data. """ - order0, order1 = CarbonV1Pool.parse_orders(event_args, event_type) - data["strategy_id"] = event_args["args"].get("id") + order0, order1 = CarbonV1Pool.parse_orders(event) + data["strategy_id"] = event.args.get("id") if isinstance(order0, list) and isinstance(order1, list): data["y_0"] = order0[0] data["z_0"] = order0[1] @@ -105,9 +101,7 @@ def parse_event( return data @staticmethod - def parse_orders( - event_args: Dict[str, Any], event_type: str - ) -> Tuple[List[int], List[int]]: + def parse_orders(event: Event) -> Tuple[List[int], List[int]]: """ Parse the orders from the event args. If the event type is StrategyDeleted, then the orders are set to 0. @@ -115,17 +109,15 @@ def parse_orders( ---------- event_args : Dict[str, Any] The event arguments. - event_type : str - The event type. Returns ------- Tuple[List[int], List[int]] The parsed orders. """ - if event_type not in ["StrategyDeleted"]: - order0 = event_args["args"].get("order0") - order1 = event_args["args"].get("order1") + if event.event not in ["StrategyDeleted"]: + order0 = event.args.get("order0") + order1 = event.args.get("order1") else: order0 = [0, 0, 0, 0] order1 = [0, 0, 0, 0] diff --git a/fastlane_bot/events/pools/solidly_v2.py b/fastlane_bot/events/pools/solidly_v2.py index 61a1acf03..8ca967a7f 100644 --- a/fastlane_bot/events/pools/solidly_v2.py +++ b/fastlane_bot/events/pools/solidly_v2.py @@ -15,6 +15,7 @@ from web3.contract import Contract from fastlane_bot.events.pools.base import Pool +from ..interfaces.event import Event def _balances_A(contract: Contract) -> List[int]: return contract.caller.getReserves() @@ -88,21 +89,20 @@ def event_matches_format( """ Check if an event matches the format of a Uniswap v2 event. """ - event_args = event["args"] + event_args = event.args return ( "reserve0" in event_args - and event["address"] in static_pools[f"{exchange_name}_pools"] + and event.address in static_pools[f"{exchange_name}_pools"] ) def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. """ - event_args = event_args["args"] - data["tkn0_balance"] = event_args["reserve0"] - data["tkn1_balance"] = event_args["reserve1"] + data["tkn0_balance"] = event.args["reserve0"] + data["tkn1_balance"] = event.args["reserve1"] for key, value in data.items(): self.state[key] = value diff --git a/fastlane_bot/events/pools/uniswap_v2.py b/fastlane_bot/events/pools/uniswap_v2.py index 55549e89a..c48639f59 100644 --- a/fastlane_bot/events/pools/uniswap_v2.py +++ b/fastlane_bot/events/pools/uniswap_v2.py @@ -15,6 +15,7 @@ from web3.contract import Contract from fastlane_bot.events.pools.base import Pool +from ..interfaces.event import Event @dataclass @@ -41,26 +42,25 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ Check if an event matches the format of a Uniswap v2 event. """ - event_args = event["args"] + event_args = event.args return ( "reserve0" in event_args - and event["address"] in static_pools[f"{exchange_name}_pools"] + and event.address in static_pools[f"{exchange_name}_pools"] ) def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. """ - event_args = event_args["args"] - data["tkn0_balance"] = event_args["reserve0"] - data["tkn1_balance"] = event_args["reserve1"] + data["tkn0_balance"] = event.args["reserve0"] + data["tkn1_balance"] = event.args["reserve1"] for key, value in data.items(): self.state[key] = value diff --git a/fastlane_bot/events/pools/uniswap_v3.py b/fastlane_bot/events/pools/uniswap_v3.py index 265c72296..06e67f65f 100644 --- a/fastlane_bot/events/pools/uniswap_v3.py +++ b/fastlane_bot/events/pools/uniswap_v3.py @@ -14,6 +14,7 @@ from web3.contract import Contract from fastlane_bot.events.pools.base import Pool +from ..interfaces.event import Event @dataclass @@ -34,27 +35,26 @@ def unique_key() -> str: @classmethod def event_matches_format( - cls, event: Dict[str, Any], static_pools: Dict[str, Any], exchange_name: str = None + cls, event: Event, static_pools: Dict[str, Any], exchange_name: str = None ) -> bool: """ Check if an event matches the format of a Uniswap v3 event. """ - event_args = event["args"] + event_args = event.args return ( "sqrtPriceX96" in event_args - and event["address"] in static_pools[f"{exchange_name}_pools"] + and event.address in static_pools[f"{exchange_name}_pools"] ) def update_from_event( - self, event_args: Dict[str, Any], data: Dict[str, Any] + self, event: Event, data: Dict[str, Any] ) -> Dict[str, Any]: """ See base class. """ - event_args = event_args["args"] - data["liquidity"] = event_args["liquidity"] - data["sqrt_price_q96"] = event_args["sqrtPriceX96"] - data["tick"] = event_args["tick"] + data["liquidity"] = event.args["liquidity"] + data["sqrt_price_q96"] = event.args["sqrtPriceX96"] + data["tick"] = event.args["tick"] for key, value in data.items(): self.state[key] = value diff --git a/fastlane_bot/events/utils.py b/fastlane_bot/events/utils.py index 8f2e66618..63d3a6984 100644 --- a/fastlane_bot/events/utils.py +++ b/fastlane_bot/events/utils.py @@ -31,15 +31,15 @@ from fastlane_bot.data.abi import FAST_LANE_CONTRACT_ABI from fastlane_bot.exceptions import ReadOnlyException from fastlane_bot.events.interface import QueryInterface -from fastlane_bot.events.managers.manager import Manager from fastlane_bot.helpers import TxHelpers from fastlane_bot.utils import safe_int +from .interfaces.event import Event def filter_latest_events( - mgr: Manager, events: List[List[AttributeDict]] -) -> List[AttributeDict]: + mgr: Any, events: List[Event] +) -> List[Event]: """ This function filters out the latest events for each pool. Given a nested list of events, it iterates through all events and keeps track of the latest event (i.e., with the highest block number) for each pool. The key used to identify each pool @@ -54,19 +54,16 @@ def filter_latest_events( List[AttributeDict]: A list of events, each representing the latest event for its corresponding pool. """ latest_entry_per_pool = {} - all_events = [event for event_list in events for event in event_list] # Handles the case where multiple pools are created in the same block - all_events.reverse() + events.reverse() bancor_v2_anchor_addresses = { pool["anchor"] for pool in mgr.pool_data if pool["exchange_name"] == "bancor_v2" } - for event in all_events: - pool_type = mgr.pool_type_from_exchange_name( - mgr.exchange_name_from_event(event) - ) + for event in events: + pool_type = mgr.pool_type_from_exchange_name(mgr.exchange_name_from_event(event)) if pool_type: key = pool_type.unique_key() else: @@ -74,43 +71,33 @@ def filter_latest_events( if key == "cid": key = "id" elif key == "tkn1_address": - if event["args"]["pool"] != mgr.cfg.BNT_ADDRESS: + if event.args["pool"] != mgr.cfg.BNT_ADDRESS: key = "pool" else: key = "tkn_address" - unique_key = event[key] if key in event else event["args"][key] + unique_key = event.address if key == "address" else event.args[key] + # unique_key = event.args[key] # Skip events for Bancor v2 anchors if ( key == "address" - and "_token1" in event["args"] + and "_token1" in event.args and ( - event["args"]["_token1"] in bancor_v2_anchor_addresses - or event["args"]["_token2"] in bancor_v2_anchor_addresses + event.args["_token1"] in bancor_v2_anchor_addresses + or event.args["_token2"] in bancor_v2_anchor_addresses ) ): continue if unique_key in latest_entry_per_pool: - if event["blockNumber"] > latest_entry_per_pool[unique_key]["blockNumber"]: + if event.block_number > latest_entry_per_pool[unique_key].block_number: latest_entry_per_pool[unique_key] = event - elif ( - event["blockNumber"] == latest_entry_per_pool[unique_key]["blockNumber"] - ): - if ( - event["transactionIndex"] - == latest_entry_per_pool[unique_key]["transactionIndex"] - ): - if ( - event["logIndex"] - > latest_entry_per_pool[unique_key]["logIndex"] - ): + elif event.block_number == latest_entry_per_pool[unique_key].block_number: + if event.transaction_index == latest_entry_per_pool[unique_key].transaction_index: + if event.log_index > latest_entry_per_pool[unique_key].log_index: latest_entry_per_pool[unique_key] = event - elif ( - event["transactionIndex"] - > latest_entry_per_pool[unique_key]["transactionIndex"] - ): + elif event.transaction_index > latest_entry_per_pool[unique_key].transaction_index: latest_entry_per_pool[unique_key] = event else: continue @@ -138,7 +125,7 @@ def complex_handler(obj: Any) -> Union[Dict, str, List, Set, Any]: If the input object does not match any of the specified types, it is returned as is. """ if isinstance(obj, AttributeDict): - return dict(obj) + return complex_handler(dict(obj)) elif isinstance(obj, HexBytes): return obj.hex() elif isinstance(obj, bytes): @@ -148,7 +135,7 @@ def complex_handler(obj: Any) -> Union[Dict, str, List, Set, Any]: elif isinstance(obj, (list, tuple)): return [complex_handler(i) for i in obj] elif isinstance(obj, set): - return list(obj) + return complex_handler(list(obj)) else: return obj @@ -810,7 +797,7 @@ def save_events_to_json( mgr.cfg.logger.debug(f"[events.utils.save_events_to_json] Saved events to {path}") -def update_pools_from_events(n_jobs: int, mgr: Any, latest_events: List[Any]): +def update_pools_from_events(n_jobs: int, mgr: Any, latest_events: List[Event]): """ Updates the pools with the given events. @@ -1220,14 +1207,12 @@ def get_latest_events( # Get all event filters, events, and flatten them events = [ - complex_handler(event) - for event in [ - complex_handler(event) - for event in get_all_events( - n_jobs, - get_event_filters(n_jobs, mgr, start_block, current_block), - ) - ] + Event.from_dict(complex_handler(event)) + for events in get_all_events( + n_jobs, + get_event_filters(n_jobs, mgr, start_block, current_block), + ) + for event in events ] # Filter out the latest events per pool, save them to disk, and update the pools @@ -1246,7 +1231,7 @@ def get_latest_events( if not pool_type.event_matches_format(event) ] - carbon_pol_events = [event for event in latest_events if "token" in event["args"]] + carbon_pol_events = [event for event in latest_events if "token" in event.args] mgr.cfg.logger.info( f"[events.utils.get_latest_events] Found {len(latest_events)} new events, {len(carbon_pol_events)} carbon_pol_events" ) diff --git a/main.py b/main.py index 468909357..9ed651803 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,9 @@ (c) Copyright Bprotocol foundation 2023. Licensed under MIT """ +import asyncio +import logging +import websockets from fastlane_bot.exceptions import ReadOnlyException, FlashloanUnavailableException from fastlane_bot.events.version_utils import check_version_requirements @@ -14,11 +17,13 @@ import os, sys import time +from threading import Lock, Thread from traceback import format_exc import pandas as pd from dotenv import load_dotenv -from web3 import Web3, HTTPProvider +from web3 import AsyncWeb3, Web3, HTTPProvider +from web3.providers import WebsocketProviderV2 from fastlane_bot import __version__ as bot_version from fastlane_bot.events.async_backdate_utils import ( @@ -62,6 +67,12 @@ load_dotenv() +logger = logging.getLogger(__name__) + + +pool_data_lock = Lock() + + def process_arguments(args): """ Process and transform command line arguments. @@ -297,15 +308,63 @@ def main(args: argparse.Namespace) -> None: # Add initial pool data to the manager add_initial_pool_data(cfg, mgr, args.n_jobs) + handle_static_pools_update(mgr) + + # Handle the initial iteration (backdate pools, update pools from contracts, etc.) + async_handle_initial_iteration( + backdate_pools=args.backdate_pools, + last_block=0, + mgr=mgr, + start_block=mgr.web3.eth.block_number if args.replay_from_block is None else args.replay_from_block, + ) + + thread = Thread(target=run_event_listener, args=(mgr,), daemon=True) + thread.start() + # Run the main loop run(mgr, args) +def run_event_listener(mgr): + async def inner(mgr): + from fastlane_bot.events.listener import EventListener + + base_exchanges = ["carbon_v1", "uniswap_v3", "uniswap_v2", "bancor_pol", "bancor_v2", "bancor_v3", "solidly_v2"] + while True: + try: + async with AsyncWeb3.persistent_websocket(WebsocketProviderV2(mgr.cfg.network.WEBSOCKET_URL)) as w3: + event_listener = EventListener(manager=mgr, base_exchanges=base_exchanges, w3=w3) + async for events in event_listener.pull_block_events(): + with pool_data_lock: + for event in events: + print(event) + mgr.update_from_event(event) + + with pool_data_lock: + current_block = mgr.web3.eth.block_number + + # Update new pool events from contracts + if len(mgr.pools_to_add_from_contracts) > 0: + mgr.cfg.logger.info( + f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, " + f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}." + ) + _run_async_update_with_retries(mgr, current_block=current_block) + mgr.pools_to_add_from_contracts = [] + except websockets.exceptions.ConnectionClosedError: + logger.info("Websocket connection lost. Reconnecting ...") + mgr.cfg.logger.info("Websocket connection lost. Reconnecting ...") + print("Websocket connection lost. Reconnecting ...") + await asyncio.sleep(1) + + asyncio.run(inner(mgr)) + + def run(mgr, args, tenderly_uri=None) -> None: loop_idx = last_block = last_block_queried = total_iteration_time = 0 start_timeout = time.time() mainnet_uri = mgr.cfg.w3.provider.endpoint_uri - handle_static_pools_update(mgr) + while True: try: # ensure 'last_updated_block' is in pool_data for all pools @@ -332,9 +391,9 @@ def run(mgr, args, tenderly_uri=None) -> None: ) # Log the current start, end and last block - mgr.cfg.logger.info( - f"Fetching events from {start_block} to {current_block}... {last_block}" - ) + # mgr.cfg.logger.info( + # f"Fetching events from {start_block} to {current_block}... {last_block}" + # ) # Set the network connection to Mainnet if replaying from a block set_network_to_mainnet_if_replay( @@ -346,32 +405,22 @@ def run(mgr, args, tenderly_uri=None) -> None: args.use_cached_events, ) - # Get the events - latest_events = ( - get_cached_events(mgr, args.logging_path) - if args.use_cached_events - else get_latest_events( - current_block, - mgr, - args.n_jobs, - start_block, - args.cache_latest_only, - args.logging_path, - ) - ) + # latest_events = ( + # get_cached_events(mgr, args.logging_path) + # if args.use_cached_events + # else get_latest_events( + # current_block, + # mgr, + # args.n_jobs, + # start_block, + # args.cache_latest_only, + # args.logging_path, + # ) + # ) iteration_start_time = time.time() # Update the pools from the latest events - update_pools_from_events(args.n_jobs, mgr, latest_events) - - # Update new pool events from contracts - if len(mgr.pools_to_add_from_contracts) > 0: - mgr.cfg.logger.info( - f"Adding {len(mgr.pools_to_add_from_contracts)} new pools from contracts, " - f"{len(mgr.pool_data)} total pools currently exist. Current block: {current_block}." - ) - async_update_pools_from_contracts(mgr, current_block=current_block) - mgr.pools_to_add_from_contracts = [] + #update_pools_from_events(args.n_jobs, mgr, latest_events) # Increment the loop index loop_idx += 1 @@ -387,14 +436,6 @@ def run(mgr, args, tenderly_uri=None) -> None: tenderly_fork_id=args.tenderly_fork_id, ) - # Handle the initial iteration (backdate pools, update pools from contracts, etc.) - async_handle_initial_iteration( - backdate_pools=args.backdate_pools, - current_block=current_block, - last_block=last_block, - mgr=mgr, - start_block=start_block, - ) # Run multicall every iteration multicall_every_iteration(current_block=current_block, mgr=mgr) @@ -417,33 +458,40 @@ def run(mgr, args, tenderly_uri=None) -> None: # Re-initialize the bot bot = init_bot(mgr) - if args.use_specific_exchange_for_target_tokens is not None: - target_tokens = bot.get_tokens_in_exchange( - exchange_name=args.use_specific_exchange_for_target_tokens - ) - mgr.cfg.logger.info( - f"[main] Using only tokens in: {args.use_specific_exchange_for_target_tokens}, found {len(target_tokens)} tokens" + with pool_data_lock: + # Verify that the state has changed + verify_state_changed(bot=bot, initial_state=initial_state, mgr=mgr) + + # Verify that the minimum profit in BNT is respected + # verify_min_bnt_is_respected(bot=bot, mgr=mgr) + + if args.use_specific_exchange_for_target_tokens is not None: + target_tokens = bot.get_tokens_in_exchange( + exchange_name=args.use_specific_exchange_for_target_tokens + ) + mgr.cfg.logger.info( + f"[main] Using only tokens in: {args.use_specific_exchange_for_target_tokens}, found {len(target_tokens)} tokens" + ) + + if not mgr.read_only: + handle_tokens_csv(mgr, mgr.prefix_path) + + # Handle subsequent iterations + handle_subsequent_iterations( + arb_mode=args.arb_mode, + bot=bot, + flashloan_tokens=args.flashloan_tokens, + randomizer=args.randomizer, + run_data_validator=args.run_data_validator, + target_tokens=args.target_tokens, + loop_idx=loop_idx, + logging_path=args.logging_path, + replay_from_block=replay_from_block, + tenderly_uri=tenderly_uri, + mgr=mgr, + forked_from_block=forked_from_block, ) - if not mgr.read_only: - handle_tokens_csv(mgr, mgr.prefix_path) - - # Handle subsequent iterations - handle_subsequent_iterations( - arb_mode=args.arb_mode, - bot=bot, - flashloan_tokens=args.flashloan_tokens, - randomizer=args.randomizer, - run_data_validator=args.run_data_validator, - target_tokens=args.target_tokens, - loop_idx=loop_idx, - logging_path=args.logging_path, - replay_from_block=replay_from_block, - tenderly_uri=tenderly_uri, - mgr=mgr, - forked_from_block=forked_from_block, - ) - # Sleep for the polling interval if not replay_from_block and args.polling_interval > 0: mgr.cfg.logger.info(