event_reader.multicall_batcher

Documentation for eth_defi.event_reader.multicall_batcher Python module.

Multicall3 contract helpers.

For usage see read_multicall_chunked() and read_multicall_historical_stateful functions.

Warning

See Multicall private key leak hack warning.

Module Attributes

MULTICALL_DEPLOY_ADDRESS

Default Multicall3 address

MULTICALL_CHAIN_ADDRESSES

Per-chain Multicall3 deployemnts

WTF_RETRY_EXCEPTIONS_MESSAGE_CLUES

F**k EVM

Functions

call_multicall(multicall_contract, calls, ...)

Call a multicall contract.

call_multicall_batched_single_thread(...[, ...])

Call Multicall contract with a payload.

call_multicall_debug_single_thread(...)

Skip Multicall contract and try eth_call directly.

call_multicall_encoded(multicall_contract, ...)

Call a multicall contract.

get_multicall_block_number(chain_id)

When the multicall contract was deployed for a chain.

get_multicall_contract(web3[, address, ...])

Return a multicall smart contract instance.

read_multicall_chunked(chain_id, ...[, ...])

Read current data using multiple processes in parallel for speedup.

read_multicall_historical(chain_id, ...[, ...])

Read historical data using multiple threads in parallel for speedup.

read_multicall_historical_stateful(chain_id, ...)

Read historical data using multicall with reading state and adaptive frequency filtering.

Classes

BatchCallState

Allow mutlicall calls to maintain state over the multiple invocations.

CombinedEncodedCallResult

Historical read result of multiple multicalls.

EncodedCall

Multicall payload, minified implementation.

EncodedCallResult

Result of an one multicall.

MulticallHistoricalTask

Pickled task send between multicall reader loop and subprocesses.

MulticallWrapper

Wrap a call going through the Multicall contract.

MultiprocessMulticallReader

An instance created in a subprocess to do calls.

Exceptions

MulticallNonRetryable

Need to take a manual look these errors.

MulticallRetryable

Out of gas.

MulticallStateProblem

TODO

CallData

Address, arguments tuples

alias of tuple[Union[str, eth_typing.evm.HexAddress], tuple]

MULTICALL_DEPLOY_ADDRESS: Final[str] = '0xca11bde05977b3631167028862be2a173976ca11'

Default Multicall3 address

MULTICALL_CHAIN_ADDRESSES = {324: '0xF9cda624FBC7e059355ce98a31693d299FACd963'}

Per-chain Multicall3 deployemnts

exception MulticallStateProblem

Bases: Exception

TODO

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception MulticallRetryable

Bases: Exception

Out of gas.

  • Broken contract in a gas loop

Try to decrease batch size.

__init__(message, status_code=None, headers=None)
Parameters
  • message (str) –

  • status_code (int) –

  • headers (dict | None) –

__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception MulticallNonRetryable

Bases: Exception

Need to take a manual look these errors.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

get_multicall_block_number(chain_id)

When the multicall contract was deployed for a chain.

Parameters

chain_id (int) –

Return type

int | None

get_multicall_contract(web3, address=None, block_identifier=None)

Return a multicall smart contract instance.

  • Get IMulticall3 compiled with Forge

  • Use multicall3 ABI.

Parameters
Return type

web3.contract.contract.Contract

call_multicall(multicall_contract, calls, block_identifier)

Call a multicall contract.

Parameters
Return type

dict[Hashable, Any]

call_multicall_encoded(multicall_contract, calls, block_identifier)

Call a multicall contract.

Parameters
Return type

dict[Hashable, Any]

call_multicall_batched_single_thread(multicall_contract, calls, block_identifier, batch_size=15)

Call Multicall contract with a payload.

  • Single threaded

Parameters
  • web3_factory

    • Each thread will get its own web3 instance

  • batch_size – Don’t do more than this calls per one RPC.

  • multicall_contract (web3.contract.contract.Contract) –

  • calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

dict[Hashable, Any]

call_multicall_debug_single_thread(multicall_contract, calls, block_identifier)

Skip Multicall contract and try eth_call directly.

  • For debugging problems

  • Perform normal eth_call

  • Log output what calls are going out to diagnose issues

Parameters
class MulticallWrapper

Bases: abc.ABC

Wrap a call going through the Multicall contract.

call: web3.contract.contract.ContractFunction

Bound web3.py function with args in the place

debug: bool

Set for extensive info logging

abstract get_key()

Get key that will identify this call in the result dictionary

Return type

Hashable

abstract handle(succeed, raw_return_value)

Parse the call result.

Parameters
  • succeed (bool) – Did we revert or not

  • raw_return_value (bytes) – Undecoded bytes from the Solidity function call

Returns

The value placed in the return dict

Return type

Any

get_human_args()

Get Solidity args as human readable string for debugging.

Return type

str

multicall_callback(succeed, raw_return_value)

Convert the raw Solidity function call result to a denominated token amount.

  • Multicall library callback

Returns

The token amount in the reserve currency we get on the market sell.

None if this path was not supported (Solidity reverted).

Parameters
  • succeed (bool) –

  • raw_return_value (Any) –

Return type

Any

__init__(call, debug)
Parameters
  • call (web3.contract.contract.ContractFunction) –

  • debug (bool) –

Return type

None

class BatchCallState

Bases: abc.ABC

Allow mutlicall calls to maintain state over the multiple invocations.

  • Mostly useful for historical mutlticall read and frequency management

abstract should_invoke(call, block_identifier, timestamp)

Check the condition if this multicall is good to go.

Parameters
Return type

bool

abstract save()

Persist state across multiple runs.

Returns

Pickleable Python object

Return type

dict

abstract load(data)

Persist state across multiple runs

Parameters

data (dict) –

class EncodedCall

Bases: object

Multicall payload, minified implementation.

  • Designed for multiprocessing and historical reads

  • Only carry encoded data, not ABI etc. metadata

  • Contain extra_data which allows route to call results from several calls to one handler class

Example:

convert_to_shares_payload = eth_abi.encode(["uint256"], [share_probe_amount])

share_price_call = EncodedCall.from_keccak_signature(
    address=address,
    signature=Web3.keccak(text="convertToShares(uint256)")[0:4],
    function="convertToShares",
    data=convert_to_shares_payload,
    extra_data=None,
)
func_name: str

Store ABI function for debugging purposers

address: eth_typing.evm.HexAddress

Contract address

data: bytes

Call ABI-encoded payload

extra_data: dict | None

Use this to match the reader

first_block_number: int | None

First block hint when doing historical multicall reading.

Skip calls for blocks that are earlier than this block number.

call_id: int

Running counter call id for debugging purposes

get_debug_info()

Get human-readable details for debugging.

  • Punch into Tenderly simulator

  • Data contains both function signature and data payload

Return type

str

get_curl_info(block_number)

Get human-readable details for debugging.

  • Punch into Tenderly simulator

  • Data contains both function signature and data payload

Parameters

block_number (int) –

Return type

str

static from_contract_call(call, extra_data=None, first_block_number=None)

Create poller call from Web3.py Contract proxy object

Parameters
  • call (web3.contract.contract.ContractFunction) –

  • extra_data (dict | None) –

  • first_block_number (int | None) –

Return type

eth_defi.event_reader.multicall_batcher.EncodedCall

static from_keccak_signature(address, function, signature, data, extra_data, first_block_number=None, ignore_errors=False, state=None)

Create poller call directly from a raw function signature

Parameters
Return type

eth_defi.event_reader.multicall_batcher.EncodedCall

call(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=None, ignore_error=False, silent_error=False, attempts=3, retry_sleep=30.0)

Return raw results of the call.

Example how to read:

erc_7575_call = EncodedCall.from_keccak_signature(
    address=self.vault_address,
    signature=Web3.keccak(text="share()")[0:4],
    function="share",
    data=b"",
    extra_data=None,
)

result = erc_7575_call.call(self.web3, block_identifier="latest")
share_token_address = convert_uint256_bytes_to_address(result)
Parameters
  • ignore_error – Set to True to inform middleware that it is normal for this call to fail and do not log it as a failed call, or retry it.

  • attempts (int) –

    Use built-in retry mechanism for flaky RPC.

    This works regardless of middleware installed. Set to zero to ignore.

    Cannot be used with ignore_errors.

  • gas (int) –

    Gas limit.

    If not given, use 15M limit except for Mantle use 99M.

  • web3 (web3.main.Web3) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Returns

Raw call results as bytes

Raises

ValueError – If the call reverts

Return type

bytes

transact(from_, gas_limit)

Build a transaction payload for this call.

Example:

gas_limit = 15_000_000

# function settleDeposit(uint256 _newTotalAssets) public virtual;
call = EncodedCall.from_keccak_signature(
    address=vault.address,
    function="settleDeposit()",
    signature=Web3.keccak(text="settleDeposit(uint256)")[0:4],
    data=convert_uin256_to_bytes(raw_nav),
    extra_data=None,
)
tx_data = call.transact(
    from_=asset_manager,
    gas_limit=gas_limit,
)
tx_hash = web3.eth.send_transaction(tx_data)
assert_transaction_success_with_explanation(web3, tx_hash)
Parameters
Return type

dict

call_as_result(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=15000000, ignore_error=False)

Perform RPC call and return the result as an EncodedCallResult.

See call() for info.

Parameters
  • gas_limit

    eth_call RPC gas limit.

    Set to 15M by default, assume to be safe on every chain.

  • web3 (web3.main.Web3) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

eth_defi.event_reader.multicall_batcher.EncodedCallResult

__init__(func_name, address, data, extra_data, first_block_number=None, call_id=<factory>, _hash=None)
Parameters
Return type

None

class EncodedCallResult

Bases: object

Result of an one multicall.

Example:

# File 21 of 47 : PlasmaVaultStorageLib.sol
#     /// @custom:storage-location erc7201:io.ipor.PlasmaVaultPerformanceFeeData
#     struct PerformanceFeeData {
#         address feeManager;
#         uint16 feeInPercentage;
#     }
data = call_by_name["getPerformanceFeeData"].result
performance_fee = int.from_bytes(data[32:64], byteorder="big") / 10_000
block_identifier: Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]

Block number

timestamp: datetime.datetime | None

Timestamp of the block (if available)

revert_exception: Exception | None

Not available in multicalls, only through EncodedCall.call_as_result()

state: eth_defi.event_reader.multicall_batcher.BatchCallState | None

Copy the state reference in stateful reading

__init__(call, success, result, block_identifier, timestamp=None, revert_exception=None, state=None)
Parameters
Return type

None

class CombinedEncodedCallResult

Bases: object

Historical read result of multiple multicalls.

Return the whole block worth of calls when iterating over chain block by block.

__init__(block_number, timestamp, results)
Parameters
Return type

None

WTF_RETRY_EXCEPTIONS_MESSAGE_CLUES = {'api key is not allowed to access blockchain', 'evm timeout', 'exceeds block gas limit', 'failed to call: invalidtransaction', 'failsafe timeout policy exceeded', 'historical state', 'incorrect response body', 'intrinsic gas too high', 'intrinsic gas too low', 'out of gas', 'request timed out', 'request timeout', "state histories haven't been fully indexed yet"}

F**k EVM

class MultiprocessMulticallReader

Bases: object

An instance created in a subprocess to do calls.

  • Specific to a chain (connection is married with a chain, otherwise stateless)

  • Initialises the web3 connection at the start of the process

  • If you try to read using multicall when the contract is not yet deployed (see get_multicall_block_number()) then you get no results

Create subprocess worker instance.

Parameters
  • web3factory – Initialise connection within the subprocess

  • batch_size

    How many calls we pack into the multicall.

    Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.

__init__(web3factory, batch_size=40, backswitch_threshold=100, too_many_requets_sleep=61.0)

Create subprocess worker instance.

Parameters
  • web3factory (eth_defi.event_reader.web3factory.Web3Factory | web3.main.Web3) – Initialise connection within the subprocess

  • batch_size

    How many calls we pack into the multicall.

    Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.

last_switch

How many calls ago we switched the fallback provider.

backswitch_threshold

Try to switch back from the fallback provider to the main provider after this many calls.

get_gas_hint(chain_id, batch_calls)

Fix non-standard out of gas issues

Parameters
Return type

int | None

get_batch_size(web3, chain_id, block_identifier)

Fix non-standard out of gas issues.

TODO: Move these rules to their own module.

Parameters
  • web3 (web3.main.Web3) –

  • chain_id (int) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

int | None

call_multicall_with_batch_size(multicall_contract, block_identifier, batch_size, encoded_calls, require_multicall_result)

Communicate with Multicall3 contract.

  • Fail safes for ugly situations

Parameters
  • multicall_contract (web3.contract.contract.Contract) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

  • batch_size (int) –

  • encoded_calls (list[tuple[eth_typing.evm.HexAddress, bytes]]) –

  • require_multicall_result (bool) –

Return type

list[tuple[bool, bytes]]

process_calls(block_identifier, calls, require_multicall_result=False, timestamp=None, min_fallback_retries=5)

Work a chunk of calls in the subprocess.

  • Divide unlimited number of calls to something we think Multicall3 and RPC node can handle

  • If a single batch fail

Parameters
  • require_multicall_result – Headache debug flag.

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) – Block number

  • timestamp (datetime.datetime | None) – Block timestamp

  • min_fallback_retries – Bang all RPCs at least this many times when attempting to make progress.

  • calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –

Return type

Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]

read_multicall_historical(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))

Read historical data using multiple threads in parallel for speedup.

  • Run over period of time (blocks)

  • Use multicall to harvest data from a single block at a time

  • Show a progress bar using tqdm

Parameters
  • chain_id (int) – Which chain we are targeting with calls.

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses

  • start_block (int) – Block range to scoop

  • end_block (int) – Block range to scoop

  • step (int) – How many blocks we iterate at once

  • timeout – Joblib timeout to wait for a result from an individual task

  • progress_suffix (Optional[Callable]) – Allow caller to decorate the progress bar

  • require_multicall_result – Debug parameter to crash the reader if we start to get invalid replies from Multicall3 contract.

  • display_progress (bool | str) –

    Whether to display progress bar or not.

    Set to string to have a progress bar label.

  • hypersync_client (HypersyncClient | None) – Not used in this reader

  • calls (Iterable[eth_defi.event_reader.multicall_batcher.EncodedCall]) –

  • timestamp_cache_file (pathlib.Path) –

Return type

Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]

read_multicall_historical_stateful(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, chunk_size=48, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))

Read historical data using multicall with reading state and adaptive frequency filtering.

  • Allow adaptive frequency with read state

  • Slower loop than the dumb read_multicall_historical() as it has to maintain state

  • Because of state, we need to do block by block reading, as we need to evaluate state to see which calls are needed for which block, and the state depends on the result of the previous blocks

Parameters
Return type

Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]

read_multicall_chunked(chain_id, web3factory, calls, block_identifier, max_workers=8, timeout=1800, chunk_size=40, progress_bar_desc=None, timestamped_results=True, backend='loky')

Read current data using multiple processes in parallel for speedup.

  • All calls hit the same block number

  • Show a progress bar using tqdm

Example:

# Generated packed multicall for each token contract we want to query
balance_of_signature = Web3.keccak(text="balanceOf(address)")[0:4]


def _gen_calls(addresses: Iterable[str]) -> Iterable[EncodedCall]:
    for _token_address in addresses:
        yield EncodedCall.from_keccak_signature(
            address=_token_address.lower(),
            signature=balance_of_signature,
            data=convert_address_to_bytes32(out_address),
            extra_data={},
            ignore_errors=True,
            function="balanceOf",
        )


web3factory = MultiProviderWeb3Factory(web3.provider.endpoint_uri, hint="fetch_erc20_balances_multicall")

# Execute calls for all token balance reads at a specific block.
# read_multicall_chunked() will automatically split calls to multiple chunks
# if we are querying too many.
results = read_multicall_chunked(
    chain_id=chain_id,
    web3factory=web3factory,
    calls=list(_gen_calls(tokens)),
    block_identifier=block_identifier,
    max_workers=max_workers,
    timestamped_results=False,
)

results = list(results)

addr_to_balance = LowercaseDict()

for result in results:
    token_address = result.call.address

    if not result.result:
        if raise_on_error:
            raise BalanceFetchFailed(f"Could not read token balance for ERC-20: {token_address} for address {out_address}")
        value = None
    else:
        raw_value = convert_int256_bytes_to_int(result.result)
        if decimalise:
            token = fetch_erc20_details(web3, token_address, cache=token_cache, chain_id=chain_id)
            value = token.convert_to_decimals(raw_value)
        else:
            value = raw_value

    addr_to_balance[token_address] = value
Parameters
  • chain_id (int) – Which EVM chain we are targeting with calls.

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses

  • calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) – List of calls to perform against Multicall3.

  • chunk_size (int) – Max calls per one chunk sent to Multicall contract, to stay below JSON-RPC read gas limit.

  • max_workers – How many parallel processes to use.

  • timeout – Joblib timeout to wait for a result from an individual task.

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

    Block number to read.

    • Can be a block number or “latest” or “earliest”

  • progress_bar_desc (str | None) – If set, display a TQDM progress bar for the process.

  • timestamped_results

    Need timestamp of the block number in each result.

    Causes very slow eth_getBlock call, use only if needed.

  • backend

    Joblib backend to use.

    Either “loky” or “threading”.

Returns

Iterable of results.

One entry per each call.

Calls may be different order than originally given.

Return type

Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]

class MulticallHistoricalTask

Bases: object

Pickled task send between multicall reader loop and subprocesses.

Send a batch of calls to a specific block.

chain_id: int

Track which chain this call belongs to

web3factory: eth_defi.event_reader.web3factory.Web3Factory

Used to initialise web3 connection in the subprocess

block_number: Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]

Block number to sccan

calls: list[eth_defi.event_reader.multicall_batcher.EncodedCall]

Multicalls to perform

require_multicall_result: bool

Debug parameter to early abort if we get invalid replies from Multicall contract

timestamp: datetime.datetime

Fetch timestamp not given.

Otherwise prefetched

task_id: int

Running counter for task ids, for serialisation checks

__init__(chain_id, web3factory, block_number, calls, require_multicall_result=False, timestamp=None, task_id=<factory>)
Parameters
Return type

None