Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 108 additions & 15 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import importlib.resources
import json
import logging
from typing import Any, AsyncIterator, Dict, List, Literal, Self, Tuple, Union
from typing import Any, AsyncIterator, Dict, List, Literal, Optional, Self, Tuple, Union

from aleph_message.models import Chain
from configmanager import Config
Expand Down Expand Up @@ -46,7 +46,7 @@ class TooManyLogsInRange(GetLogsException):
end_block: BlockNumber | Literal["latest"]


def make_web3_client(rpc_url: URI, chain_id: int, timeout: int) -> AsyncWeb3:
def make_web3_client(rpc_url: URI, chain_id: int, timeout: float) -> AsyncWeb3:
web3 = AsyncWeb3(
AsyncHTTPProvider(
rpc_url,
Expand Down Expand Up @@ -91,6 +91,14 @@ def __init__(
session_factory: DbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
# Parameters needed to rebuild the web3 client if it wedges on a stale
# connection. Safe defaults keep minimal/test constructions working: if
# rpc_url/chain_id/contract_address are unset, the reset path skips the
# rebuild (no network access) while the timeout still surfaces.
rpc_url: Optional[URI] = None,
chain_id: Optional[int] = None,
client_timeout: float = 60,
contract_address: Optional[Union[Address, ChecksumAddress, ENS]] = None,
):
self.web3_client = web3_client
self.contract = contract
Expand All @@ -101,6 +109,10 @@ def __init__(
self.session_factory = session_factory
self.pending_tx_publisher = pending_tx_publisher
self.chain_data_service = chain_data_service
self.rpc_url = rpc_url
self.chain_id = chain_id
self.client_timeout = client_timeout
self.contract_address = contract_address

self.indexer_reader = AlephIndexerReader(
chain=Chain.ETH,
Expand All @@ -112,8 +124,17 @@ async def __aenter__(self) -> Self:
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
# Closes the aiohttp ClientSession cached by AsyncHTTPProvider.
await self.web3_client.provider.disconnect()
# Closes the aiohttp ClientSession cached by AsyncHTTPProvider. Bound it
# so a wedged provider cannot hang shutdown.
try:
await asyncio.wait_for(
self.web3_client.provider.disconnect(),
timeout=self.client_timeout,
)
except Exception as exc:
LOGGER.warning(
"Could not cleanly disconnect web3 provider on exit: %s", exc
)

@classmethod
async def new(
Expand All @@ -123,12 +144,17 @@ async def new(
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
) -> Self:
rpc_url = config.ethereum.api_url.value
chain_id = config.ethereum.chain_id.value
client_timeout = config.ethereum.client_timeout.value
contract_address = config.ethereum.sync_contract.value

web3_client = make_web3_client(
rpc_url=config.ethereum.api_url.value,
chain_id=config.ethereum.chain_id.value,
timeout=config.ethereum.client_timeout.value,
rpc_url=rpc_url,
chain_id=chain_id,
timeout=client_timeout,
)
contract = await get_contract(web3_client, config.ethereum.sync_contract.value)
contract = await get_contract(web3_client, contract_address)
return cls(
web3_client=web3_client,
contract=contract,
Expand All @@ -139,8 +165,49 @@ async def new(
session_factory=session_factory,
pending_tx_publisher=pending_tx_publisher,
chain_data_service=chain_data_service,
rpc_url=rpc_url,
chain_id=chain_id,
client_timeout=client_timeout,
contract_address=contract_address,
)

async def _reset_web3_client(self) -> None:
"""
Rebuild the web3 client after a hung RPC call.

The single long-lived AsyncWeb3 client can wedge on a stale TCP
connection, causing ``eth.get_logs`` / ``eth.block_number`` to hang
forever. When that happens we best-effort disconnect the old provider
and build a fresh client so the next retry uses a new connection.
"""
try:
# Bound the disconnect: a wedged provider's disconnect() can itself
# hang, which would freeze the recovery path we are trying to run.
await asyncio.wait_for(
self.web3_client.provider.disconnect(),
timeout=self.client_timeout,
)
except Exception as exc:
# Best-effort: the old provider may itself be wedged.
LOGGER.warning("Could not cleanly disconnect old web3 provider: %s", exc)

if (
self.rpc_url is None
or self.chain_id is None
or self.contract_address is None
):
# Minimal/test constructions cannot rebuild a real client.
return

# Build into locals first, then swap both together: a failure in
# get_contract must not leave a new web3_client paired with the old
# contract.
new_client = make_web3_client(self.rpc_url, self.chain_id, self.client_timeout)
new_contract = await get_contract(new_client, self.contract_address)
self.web3_client = new_client
self.contract = new_contract
LOGGER.info("Rebuilt Ethereum web3 client after RPC timeout")

async def get_last_height(self, sync_type: ChainEventType) -> BlockNumber:
"""Returns the last height for which we already have the ethereum data."""
with self.session_factory() as session:
Expand All @@ -164,14 +231,28 @@ async def _get_logs_in_block_range(

LOGGER.info(f"Fetching logs in range {start_block}..{end_block}")
try:
logs = await self.web3_client.eth.get_logs(
{
"address": self.contract.address,
"fromBlock": start_block,
"toBlock": end_block,
}
# Bound the RPC call: the web3 client can wedge on a stale
# connection and hang here forever, freezing the sync. A timeout is
# NOT a TooManyLogsInRange (handled below) - it must propagate.
logs = await asyncio.wait_for(
self.web3_client.eth.get_logs(
{
"address": self.contract.address,
"fromBlock": start_block,
"toBlock": end_block,
}
),
timeout=self.client_timeout,
)
return logs
except asyncio.TimeoutError:
LOGGER.warning(
"Timed out fetching logs in range %s..%s, recycling web3 client",
start_block,
end_block,
)
await self._reset_web3_client()
raise
except Web3RPCError as e:
# Handle limit exceptions
if rpc_response := e.rpc_response:
Expand All @@ -190,7 +271,19 @@ async def _get_all_logs_in_batches(
block_range = max_block_range

while True:
last_eth_block = await self.web3_client.eth.block_number
try:
# Bound the RPC call: a wedged web3 client can hang here
# forever, freezing the sync.
last_eth_block = await asyncio.wait_for(
self.web3_client.eth.block_number,
timeout=self.client_timeout,
)
except asyncio.TimeoutError:
LOGGER.warning(
"Timed out fetching latest block number, recycling web3 client"
)
await self._reset_web3_client()
raise
# Note: the range in get_logs is [start, end].
end_block = min(last_eth_block, BlockNumber(start_block + block_range - 1))

Expand Down
173 changes: 173 additions & 0 deletions tests/chains/test_ethereum_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Tests for the Ethereum sync RPC timeout + web3 client recycle behaviour.

Root cause covered here: the single long-lived AsyncWeb3 client can wedge on a
stale TCP connection, making ``eth.get_logs`` / ``eth.block_number`` hang
forever. The connector must bound those awaits with ``asyncio.wait_for`` and,
on timeout, rebuild its web3 client so the next retry uses a fresh connection.

Every async test that exercises a potential hang is guarded with an outer
``asyncio.wait_for(..., timeout=5)`` so the suite can never hang.
"""

import asyncio

import pytest
from eth_typing import BlockNumber

from aleph.chains.ethereum import EthereumConnector


def _make_connector(mocker, get_logs=None, block_number=None):
"""
Build an EthereumConnector with a mock web3 client, using the safe
defaults added to ``__init__`` for the fields a minimal construction does
not need.
"""
web3_client = mocker.MagicMock()
if get_logs is not None:
web3_client.eth.get_logs = get_logs

# block_number is awaited as an attribute (``await ...eth.block_number``),
# so assign an awaitable directly on this mock *instance*. Do not patch
# ``type(web3_client.eth)`` — that mutates the shared MagicMock class and
# leaks the property across tests.
if block_number is not None:
web3_client.eth.block_number = block_number()

# provider.disconnect() is awaited in the reset path; make it awaitable so
# the disconnect actually runs instead of being swallowed as a TypeError.
web3_client.provider.disconnect = mocker.AsyncMock()

contract = mocker.MagicMock()
contract.address = "0x" + "0" * 40

connector = EthereumConnector(
web3_client=web3_client,
contract=contract,
authorized_emitters=[],
max_gas_price=0,
start_height=BlockNumber(0),
max_block_range=100,
session_factory=mocker.MagicMock(),
pending_tx_publisher=mocker.AsyncMock(),
chain_data_service=mocker.AsyncMock(),
# Intentionally leave rpc_url/chain_id/contract_address unset so the
# reset path skips rebuilding (no real network), while the timeout
# still surfaces.
client_timeout=0.05,
)
return connector


@pytest.mark.asyncio
async def test_get_logs_hang_raises_timeout_and_triggers_reset(mocker):
async def hang(*_args, **_kwargs):
await asyncio.Event().wait()

connector = _make_connector(mocker, get_logs=hang)
reset_spy = mocker.spy(connector, "_reset_web3_client")

# The hung get_logs must surface as a timeout, not as TooManyLogsInRange.
with pytest.raises(asyncio.TimeoutError) as exc_info:
await asyncio.wait_for(
connector._get_logs_in_block_range(BlockNumber(0), BlockNumber(10)),
timeout=5,
)

# A timeout must trigger the client reset. The actual client rebuild is
# covered by test_reset_rebuilds_client_when_params_set; here rpc_url etc.
# are unset so the reset is a no-op beyond being invoked.
assert reset_spy.await_count == 1
# The timeout must be catchable by `except Exception` in the retry loop.
assert isinstance(exc_info.value, Exception)


@pytest.mark.asyncio
async def test_block_number_hang_raises_timeout_and_triggers_reset(mocker):
async def hang():
await asyncio.Event().wait()

connector = _make_connector(mocker, block_number=hang)
reset_spy = mocker.spy(connector, "_reset_web3_client")

async def drain():
async for _log in connector._get_all_logs_in_batches(
BlockNumber(0), max_block_range=100
):
pass

with pytest.raises(asyncio.TimeoutError) as exc_info:
await asyncio.wait_for(drain(), timeout=5)

# A timeout must trigger the client reset (full rebuild covered by
# test_reset_rebuilds_client_when_params_set).
assert reset_spy.await_count == 1
assert isinstance(exc_info.value, Exception)


@pytest.mark.asyncio
async def test_timeout_propagates_as_exception_for_retry_loop(mocker):
async def hang(*_args, **_kwargs):
await asyncio.Event().wait()

connector = _make_connector(mocker, get_logs=hang)

raised: BaseException | None = None
try:
await asyncio.wait_for(
connector._get_logs_in_block_range(BlockNumber(0), BlockNumber(10)),
# The inner client_timeout (0.05s) is what must fire; a tight outer
# safety net makes that explicit while still preventing a hang.
timeout=1,
)
except Exception as e: # mirrors fetch_sync_events_task's handler
raised = e

assert raised is not None
assert isinstance(raised, asyncio.TimeoutError)
assert isinstance(raised, Exception)


@pytest.mark.asyncio
async def test_reset_rebuilds_client_when_params_set(mocker):
"""
With rpc_url/chain_id/contract_address set, ``_reset_web3_client`` must
disconnect the old provider and replace BOTH the web3 client and the
contract with freshly built ones. Pure-mock, no network.
"""
old_web3_client = mocker.MagicMock()
old_web3_client.provider.disconnect = mocker.AsyncMock()
old_contract = mocker.MagicMock()

new_web3_client = mocker.MagicMock()
new_contract = mocker.MagicMock()

mocker.patch("aleph.chains.ethereum.make_web3_client", return_value=new_web3_client)
mocker.patch(
"aleph.chains.ethereum.get_contract",
new=mocker.AsyncMock(return_value=new_contract),
)

connector = EthereumConnector(
web3_client=old_web3_client,
contract=old_contract,
authorized_emitters=[],
max_gas_price=0,
start_height=BlockNumber(0),
max_block_range=100,
session_factory=mocker.MagicMock(),
pending_tx_publisher=mocker.AsyncMock(),
chain_data_service=mocker.AsyncMock(),
# Dummy-but-set values so the rebuild path runs.
rpc_url="http://localhost:8545",
chain_id=1,
client_timeout=0.05,
contract_address="0x" + "0" * 40,
)

await connector._reset_web3_client()

old_web3_client.provider.disconnect.assert_awaited_once()
assert connector.web3_client is new_web3_client
assert connector.contract is new_contract
Loading