From f1ccc05ec73223bc9d6d1e10a186541cc5668380 Mon Sep 17 00:00:00 2001 From: Reza Rahemtola Date: Mon, 1 Jun 2026 02:28:18 +0200 Subject: [PATCH] fix(eth): bound sync RPC calls with timeout and recycle wedged web3 client The single long-lived AsyncWeb3 client can wedge on a stale TCP connection, making eth.get_logs / eth.block_number hang forever and freezing the Ethereum sync loop. Bound those awaits with asyncio.wait_for and, on timeout, recycle the web3 client so the next retry uses a fresh connection. Harden the recycle path: - Bound the provider disconnect with asyncio.wait_for so a wedged provider's own disconnect() cannot hang the recovery. - Rebuild the web3 client and contract atomically (build into locals, then swap both together) so a failure in get_contract cannot leave a new web3 client paired with the stale contract. - Annotate client_timeout as float to match its use in asyncio.wait_for. - Add a pure-mock test exercising the rebuild path (disconnect awaited, both web3_client and contract replaced). --- src/aleph/chains/ethereum.py | 123 +++++++++++++++--- tests/chains/test_ethereum_timeout.py | 173 ++++++++++++++++++++++++++ 2 files changed, 281 insertions(+), 15 deletions(-) create mode 100644 tests/chains/test_ethereum_timeout.py diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 41aa18e9a..3b7e9e1df 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -2,7 +2,7 @@ import importlib.resources import json import logging -from typing import Any, AsyncIterator, Dict, List, Literal, Self, Tuple, Union +from typing import Any, AsyncIterator, Dict, List, Literal, Optional, Self, Tuple, Union from aleph_message.models import Chain from configmanager import Config @@ -46,7 +46,7 @@ class TooManyLogsInRange(GetLogsException): end_block: BlockNumber | Literal["latest"] -def make_web3_client(rpc_url: URI, chain_id: int, timeout: int) -> AsyncWeb3: +def make_web3_client(rpc_url: URI, chain_id: int, timeout: float) -> AsyncWeb3: web3 = AsyncWeb3( AsyncHTTPProvider( rpc_url, @@ -91,6 +91,14 @@ def __init__( session_factory: DbSessionFactory, pending_tx_publisher: PendingTxPublisher, chain_data_service: ChainDataService, + # Parameters needed to rebuild the web3 client if it wedges on a stale + # connection. Safe defaults keep minimal/test constructions working: if + # rpc_url/chain_id/contract_address are unset, the reset path skips the + # rebuild (no network access) while the timeout still surfaces. + rpc_url: Optional[URI] = None, + chain_id: Optional[int] = None, + client_timeout: float = 60, + contract_address: Optional[Union[Address, ChecksumAddress, ENS]] = None, ): self.web3_client = web3_client self.contract = contract @@ -101,6 +109,10 @@ def __init__( self.session_factory = session_factory self.pending_tx_publisher = pending_tx_publisher self.chain_data_service = chain_data_service + self.rpc_url = rpc_url + self.chain_id = chain_id + self.client_timeout = client_timeout + self.contract_address = contract_address self.indexer_reader = AlephIndexerReader( chain=Chain.ETH, @@ -112,8 +124,17 @@ async def __aenter__(self) -> Self: return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: - # Closes the aiohttp ClientSession cached by AsyncHTTPProvider. - await self.web3_client.provider.disconnect() + # Closes the aiohttp ClientSession cached by AsyncHTTPProvider. Bound it + # so a wedged provider cannot hang shutdown. + try: + await asyncio.wait_for( + self.web3_client.provider.disconnect(), + timeout=self.client_timeout, + ) + except Exception as exc: + LOGGER.warning( + "Could not cleanly disconnect web3 provider on exit: %s", exc + ) @classmethod async def new( @@ -123,12 +144,17 @@ async def new( pending_tx_publisher: PendingTxPublisher, chain_data_service: ChainDataService, ) -> Self: + rpc_url = config.ethereum.api_url.value + chain_id = config.ethereum.chain_id.value + client_timeout = config.ethereum.client_timeout.value + contract_address = config.ethereum.sync_contract.value + web3_client = make_web3_client( - rpc_url=config.ethereum.api_url.value, - chain_id=config.ethereum.chain_id.value, - timeout=config.ethereum.client_timeout.value, + rpc_url=rpc_url, + chain_id=chain_id, + timeout=client_timeout, ) - contract = await get_contract(web3_client, config.ethereum.sync_contract.value) + contract = await get_contract(web3_client, contract_address) return cls( web3_client=web3_client, contract=contract, @@ -139,8 +165,49 @@ async def new( session_factory=session_factory, pending_tx_publisher=pending_tx_publisher, chain_data_service=chain_data_service, + rpc_url=rpc_url, + chain_id=chain_id, + client_timeout=client_timeout, + contract_address=contract_address, ) + async def _reset_web3_client(self) -> None: + """ + Rebuild the web3 client after a hung RPC call. + + The single long-lived AsyncWeb3 client can wedge on a stale TCP + connection, causing ``eth.get_logs`` / ``eth.block_number`` to hang + forever. When that happens we best-effort disconnect the old provider + and build a fresh client so the next retry uses a new connection. + """ + try: + # Bound the disconnect: a wedged provider's disconnect() can itself + # hang, which would freeze the recovery path we are trying to run. + await asyncio.wait_for( + self.web3_client.provider.disconnect(), + timeout=self.client_timeout, + ) + except Exception as exc: + # Best-effort: the old provider may itself be wedged. + LOGGER.warning("Could not cleanly disconnect old web3 provider: %s", exc) + + if ( + self.rpc_url is None + or self.chain_id is None + or self.contract_address is None + ): + # Minimal/test constructions cannot rebuild a real client. + return + + # Build into locals first, then swap both together: a failure in + # get_contract must not leave a new web3_client paired with the old + # contract. + new_client = make_web3_client(self.rpc_url, self.chain_id, self.client_timeout) + new_contract = await get_contract(new_client, self.contract_address) + self.web3_client = new_client + self.contract = new_contract + LOGGER.info("Rebuilt Ethereum web3 client after RPC timeout") + async def get_last_height(self, sync_type: ChainEventType) -> BlockNumber: """Returns the last height for which we already have the ethereum data.""" with self.session_factory() as session: @@ -164,14 +231,28 @@ async def _get_logs_in_block_range( LOGGER.info(f"Fetching logs in range {start_block}..{end_block}") try: - logs = await self.web3_client.eth.get_logs( - { - "address": self.contract.address, - "fromBlock": start_block, - "toBlock": end_block, - } + # Bound the RPC call: the web3 client can wedge on a stale + # connection and hang here forever, freezing the sync. A timeout is + # NOT a TooManyLogsInRange (handled below) - it must propagate. + logs = await asyncio.wait_for( + self.web3_client.eth.get_logs( + { + "address": self.contract.address, + "fromBlock": start_block, + "toBlock": end_block, + } + ), + timeout=self.client_timeout, ) return logs + except asyncio.TimeoutError: + LOGGER.warning( + "Timed out fetching logs in range %s..%s, recycling web3 client", + start_block, + end_block, + ) + await self._reset_web3_client() + raise except Web3RPCError as e: # Handle limit exceptions if rpc_response := e.rpc_response: @@ -190,7 +271,19 @@ async def _get_all_logs_in_batches( block_range = max_block_range while True: - last_eth_block = await self.web3_client.eth.block_number + try: + # Bound the RPC call: a wedged web3 client can hang here + # forever, freezing the sync. + last_eth_block = await asyncio.wait_for( + self.web3_client.eth.block_number, + timeout=self.client_timeout, + ) + except asyncio.TimeoutError: + LOGGER.warning( + "Timed out fetching latest block number, recycling web3 client" + ) + await self._reset_web3_client() + raise # Note: the range in get_logs is [start, end]. end_block = min(last_eth_block, BlockNumber(start_block + block_range - 1)) diff --git a/tests/chains/test_ethereum_timeout.py b/tests/chains/test_ethereum_timeout.py new file mode 100644 index 000000000..06bfbf58f --- /dev/null +++ b/tests/chains/test_ethereum_timeout.py @@ -0,0 +1,173 @@ +""" +Tests for the Ethereum sync RPC timeout + web3 client recycle behaviour. + +Root cause covered here: the single long-lived AsyncWeb3 client can wedge on a +stale TCP connection, making ``eth.get_logs`` / ``eth.block_number`` hang +forever. The connector must bound those awaits with ``asyncio.wait_for`` and, +on timeout, rebuild its web3 client so the next retry uses a fresh connection. + +Every async test that exercises a potential hang is guarded with an outer +``asyncio.wait_for(..., timeout=5)`` so the suite can never hang. +""" + +import asyncio + +import pytest +from eth_typing import BlockNumber + +from aleph.chains.ethereum import EthereumConnector + + +def _make_connector(mocker, get_logs=None, block_number=None): + """ + Build an EthereumConnector with a mock web3 client, using the safe + defaults added to ``__init__`` for the fields a minimal construction does + not need. + """ + web3_client = mocker.MagicMock() + if get_logs is not None: + web3_client.eth.get_logs = get_logs + + # block_number is awaited as an attribute (``await ...eth.block_number``), + # so assign an awaitable directly on this mock *instance*. Do not patch + # ``type(web3_client.eth)`` — that mutates the shared MagicMock class and + # leaks the property across tests. + if block_number is not None: + web3_client.eth.block_number = block_number() + + # provider.disconnect() is awaited in the reset path; make it awaitable so + # the disconnect actually runs instead of being swallowed as a TypeError. + web3_client.provider.disconnect = mocker.AsyncMock() + + contract = mocker.MagicMock() + contract.address = "0x" + "0" * 40 + + connector = EthereumConnector( + web3_client=web3_client, + contract=contract, + authorized_emitters=[], + max_gas_price=0, + start_height=BlockNumber(0), + max_block_range=100, + session_factory=mocker.MagicMock(), + pending_tx_publisher=mocker.AsyncMock(), + chain_data_service=mocker.AsyncMock(), + # Intentionally leave rpc_url/chain_id/contract_address unset so the + # reset path skips rebuilding (no real network), while the timeout + # still surfaces. + client_timeout=0.05, + ) + return connector + + +@pytest.mark.asyncio +async def test_get_logs_hang_raises_timeout_and_triggers_reset(mocker): + async def hang(*_args, **_kwargs): + await asyncio.Event().wait() + + connector = _make_connector(mocker, get_logs=hang) + reset_spy = mocker.spy(connector, "_reset_web3_client") + + # The hung get_logs must surface as a timeout, not as TooManyLogsInRange. + with pytest.raises(asyncio.TimeoutError) as exc_info: + await asyncio.wait_for( + connector._get_logs_in_block_range(BlockNumber(0), BlockNumber(10)), + timeout=5, + ) + + # A timeout must trigger the client reset. The actual client rebuild is + # covered by test_reset_rebuilds_client_when_params_set; here rpc_url etc. + # are unset so the reset is a no-op beyond being invoked. + assert reset_spy.await_count == 1 + # The timeout must be catchable by `except Exception` in the retry loop. + assert isinstance(exc_info.value, Exception) + + +@pytest.mark.asyncio +async def test_block_number_hang_raises_timeout_and_triggers_reset(mocker): + async def hang(): + await asyncio.Event().wait() + + connector = _make_connector(mocker, block_number=hang) + reset_spy = mocker.spy(connector, "_reset_web3_client") + + async def drain(): + async for _log in connector._get_all_logs_in_batches( + BlockNumber(0), max_block_range=100 + ): + pass + + with pytest.raises(asyncio.TimeoutError) as exc_info: + await asyncio.wait_for(drain(), timeout=5) + + # A timeout must trigger the client reset (full rebuild covered by + # test_reset_rebuilds_client_when_params_set). + assert reset_spy.await_count == 1 + assert isinstance(exc_info.value, Exception) + + +@pytest.mark.asyncio +async def test_timeout_propagates_as_exception_for_retry_loop(mocker): + async def hang(*_args, **_kwargs): + await asyncio.Event().wait() + + connector = _make_connector(mocker, get_logs=hang) + + raised: BaseException | None = None + try: + await asyncio.wait_for( + connector._get_logs_in_block_range(BlockNumber(0), BlockNumber(10)), + # The inner client_timeout (0.05s) is what must fire; a tight outer + # safety net makes that explicit while still preventing a hang. + timeout=1, + ) + except Exception as e: # mirrors fetch_sync_events_task's handler + raised = e + + assert raised is not None + assert isinstance(raised, asyncio.TimeoutError) + assert isinstance(raised, Exception) + + +@pytest.mark.asyncio +async def test_reset_rebuilds_client_when_params_set(mocker): + """ + With rpc_url/chain_id/contract_address set, ``_reset_web3_client`` must + disconnect the old provider and replace BOTH the web3 client and the + contract with freshly built ones. Pure-mock, no network. + """ + old_web3_client = mocker.MagicMock() + old_web3_client.provider.disconnect = mocker.AsyncMock() + old_contract = mocker.MagicMock() + + new_web3_client = mocker.MagicMock() + new_contract = mocker.MagicMock() + + mocker.patch("aleph.chains.ethereum.make_web3_client", return_value=new_web3_client) + mocker.patch( + "aleph.chains.ethereum.get_contract", + new=mocker.AsyncMock(return_value=new_contract), + ) + + connector = EthereumConnector( + web3_client=old_web3_client, + contract=old_contract, + authorized_emitters=[], + max_gas_price=0, + start_height=BlockNumber(0), + max_block_range=100, + session_factory=mocker.MagicMock(), + pending_tx_publisher=mocker.AsyncMock(), + chain_data_service=mocker.AsyncMock(), + # Dummy-but-set values so the rebuild path runs. + rpc_url="http://localhost:8545", + chain_id=1, + client_timeout=0.05, + contract_address="0x" + "0" * 40, + ) + + await connector._reset_web3_client() + + old_web3_client.provider.disconnect.assert_awaited_once() + assert connector.web3_client is new_web3_client + assert connector.contract is new_contract