event_reader.multicall_batcher
Documentation for eth_defi.event_reader.multicall_batcher Python module.
Multicall3 contract helpers.
Read Multicall3: Doing fast chain data reading with Web3.py for tutorial
Perform several smart contract calls in one RPC request using Multicall contract
Increase smart contract call throughput using Multicall smart contract
Further increase call throughput using multiprocessing with
joblib.ParallelDo fast historical reads several blocks with
read_multicall_historical()
For usage see read_multicall_chunked() and read_multicall_historical_stateful functions.
Warning
See Multicall private key leak hack warning.
Module Attributes
Default Multicall3 address |
|
Per-chain Multicall3 deployemnts |
|
F**k EVM |
Functions
|
Call a multicall contract. |
|
Call Multicall contract with a payload. |
Skip Multicall contract and try eth_call directly. |
|
|
Call a multicall contract. |
|
When the multicall contract was deployed for a chain. |
|
Return a multicall smart contract instance. |
|
Read current data using multiple processes in parallel for speedup. |
|
Read historical data using multiple threads in parallel for speedup. |
|
Read historical data using multicall with reading state and adaptive frequency filtering. |
Classes
Allow mutlicall calls to maintain state over the multiple invocations. |
|
Historical read result of multiple multicalls. |
|
Multicall payload, minified implementation. |
|
Result of an one multicall. |
|
Pickled task send between multicall reader loop and subprocesses. |
|
Wrap a call going through the Multicall contract. |
|
An instance created in a subprocess to do calls. |
Exceptions
Need to take a manual look these errors. |
|
Out of gas. |
|
TODO |
- MULTICALL_DEPLOY_ADDRESS: Final[str] = '0xca11bde05977b3631167028862be2a173976ca11'
Default Multicall3 address
- MULTICALL_CHAIN_ADDRESSES = {324: '0xF9cda624FBC7e059355ce98a31693d299FACd963'}
Per-chain Multicall3 deployemnts
- exception MulticallStateProblem
Bases:
ExceptionTODO
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception MulticallRetryable
Bases:
ExceptionOut of gas.
Broken contract in a gas loop
Try to decrease batch size.
- __init__(message, status_code=None, headers=None)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception MulticallNonRetryable
Bases:
ExceptionNeed to take a manual look these errors.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- get_multicall_block_number(chain_id)
When the multicall contract was deployed for a chain.
- get_multicall_contract(web3, address=None, block_identifier=None)
Return a multicall smart contract instance.
Get IMulticall3 compiled with Forge
Use multicall3 ABI.
- Parameters
web3 (web3.main.Web3) –
address (Optional[Union[eth_typing.evm.HexAddress, str]]) –
block_identifier (eth_typing.evm.BlockNumber) –
- Return type
web3.contract.contract.Contract
- call_multicall(multicall_contract, calls, block_identifier)
Call a multicall contract.
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- call_multicall_encoded(multicall_contract, calls, block_identifier)
Call a multicall contract.
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- call_multicall_batched_single_thread(multicall_contract, calls, block_identifier, batch_size=15)
Call Multicall contract with a payload.
Single threaded
- Parameters
web3_factory –
Each thread will get its own web3 instance
batch_size – Don’t do more than this calls per one RPC.
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- call_multicall_debug_single_thread(multicall_contract, calls, block_identifier)
Skip Multicall contract and try eth_call directly.
For debugging problems
Perform normal eth_call
Log output what calls are going out to diagnose issues
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- class MulticallWrapper
Bases:
abc.ABCWrap a call going through the Multicall contract.
Each call in the batch is represented by one instance of
MulticallWrapperThis class must be subclassed and needed
get_key(),handle()and__repr__()
- call: web3.contract.contract.ContractFunction
Bound web3.py function with args in the place
- abstract get_key()
Get key that will identify this call in the result dictionary
- Return type
- abstract handle(succeed, raw_return_value)
Parse the call result.
- multicall_callback(succeed, raw_return_value)
Convert the raw Solidity function call result to a denominated token amount.
Multicall library callback
- class BatchCallState
Bases:
abc.ABCAllow mutlicall calls to maintain state over the multiple invocations.
Mostly useful for historical mutlticall read and frequency management
- abstract should_invoke(call, block_identifier, timestamp)
Check the condition if this multicall is good to go.
- Parameters
call (eth_defi.event_reader.multicall_batcher.EncodedCall) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
timestamp (datetime.datetime) –
- Return type
- abstract save()
Persist state across multiple runs.
- Returns
Pickleable Python object
- Return type
- class EncodedCall
Bases:
objectMulticall payload, minified implementation.
Designed for multiprocessing and historical reads
Only carry encoded data, not ABI etc. metadata
Contain
extra_datawhich allows route to call results from several calls to one handler class
Example:
convert_to_shares_payload = eth_abi.encode(["uint256"], [share_probe_amount]) share_price_call = EncodedCall.from_keccak_signature( address=address, signature=Web3.keccak(text="convertToShares(uint256)")[0:4], function="convertToShares", data=convert_to_shares_payload, extra_data=None, )
- address: eth_typing.evm.HexAddress
Contract address
- first_block_number: int | None
First block hint when doing historical multicall reading.
Skip calls for blocks that are earlier than this block number.
- get_debug_info()
Get human-readable details for debugging.
Punch into Tenderly simulator
Data contains both function signature and data payload
- Return type
- get_curl_info(block_number)
Get human-readable details for debugging.
Punch into Tenderly simulator
Data contains both function signature and data payload
- static from_contract_call(call, extra_data=None, first_block_number=None)
Create poller call from Web3.py Contract proxy object
- Parameters
- Return type
- static from_keccak_signature(address, function, signature, data, extra_data, first_block_number=None, ignore_errors=False, state=None)
Create poller call directly from a raw function signature
- Parameters
address (eth_typing.evm.HexAddress) –
function (str) –
signature (bytes) –
data (bytes) –
extra_data (dict | None) –
first_block_number (int | None) –
ignore_errors (bool) –
state (eth_defi.event_reader.multicall_batcher.BatchCallState | None) –
- Return type
- call(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=None, ignore_error=False, silent_error=False, attempts=3, retry_sleep=30.0)
Return raw results of the call.
Example how to read:
erc_7575_call = EncodedCall.from_keccak_signature( address=self.vault_address, signature=Web3.keccak(text="share()")[0:4], function="share", data=b"", extra_data=None, ) result = erc_7575_call.call(self.web3, block_identifier="latest") share_token_address = convert_uint256_bytes_to_address(result)
- Parameters
ignore_error – Set to True to inform middleware that it is normal for this call to fail and do not log it as a failed call, or retry it.
attempts (int) –
Use built-in retry mechanism for flaky RPC.
This works regardless of middleware installed. Set to zero to ignore.
Cannot be used with ignore_errors.
gas (int) –
Gas limit.
If not given, use 15M limit except for Mantle use 99M.
web3 (web3.main.Web3) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Returns
Raw call results as bytes
- Raises
ValueError – If the call reverts
- Return type
- transact(from_, gas_limit)
Build a transaction payload for this call.
Example:
gas_limit = 15_000_000 # function settleDeposit(uint256 _newTotalAssets) public virtual; call = EncodedCall.from_keccak_signature( address=vault.address, function="settleDeposit()", signature=Web3.keccak(text="settleDeposit(uint256)")[0:4], data=convert_uin256_to_bytes(raw_nav), extra_data=None, ) tx_data = call.transact( from_=asset_manager, gas_limit=gas_limit, ) tx_hash = web3.eth.send_transaction(tx_data) assert_transaction_success_with_explanation(web3, tx_hash)
- Parameters
from_ (eth_typing.evm.HexAddress) –
gas_limit (int) –
- Return type
- call_as_result(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=15000000, ignore_error=False)
Perform RPC call and return the result as an
EncodedCallResult.Performs an RPC call and returns a wrapped result in an
EncodedCallResult.
See
call()for info.- Parameters
- Return type
- class EncodedCallResult
Bases:
objectResult of an one multicall.
Example:
# File 21 of 47 : PlasmaVaultStorageLib.sol # /// @custom:storage-location erc7201:io.ipor.PlasmaVaultPerformanceFeeData # struct PerformanceFeeData { # address feeManager; # uint16 feeInPercentage; # } data = call_by_name["getPerformanceFeeData"].result performance_fee = int.from_bytes(data[32:64], byteorder="big") / 10_000
- block_identifier: Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]
Block number
- timestamp: datetime.datetime | None
Timestamp of the block (if available)
- revert_exception: Exception | None
Not available in multicalls, only through
EncodedCall.call_as_result()
- state: eth_defi.event_reader.multicall_batcher.BatchCallState | None
Copy the state reference in stateful reading
- __init__(call, success, result, block_identifier, timestamp=None, revert_exception=None, state=None)
- Parameters
call (eth_defi.event_reader.multicall_batcher.EncodedCall) –
success (bool) –
result (bytes) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
timestamp (datetime.datetime | None) –
revert_exception (Exception | None) –
state (eth_defi.event_reader.multicall_batcher.BatchCallState | None) –
- Return type
None
- class CombinedEncodedCallResult
Bases:
objectHistorical read result of multiple multicalls.
Return the whole block worth of calls when iterating over chain block by block.
- __init__(block_number, timestamp, results)
- Parameters
block_number (int) –
timestamp (datetime.datetime) –
results (list[eth_defi.event_reader.multicall_batcher.EncodedCallResult]) –
- Return type
None
- WTF_RETRY_EXCEPTIONS_MESSAGE_CLUES = {'api key is not allowed to access blockchain', 'evm timeout', 'exceeds block gas limit', 'failed to call: invalidtransaction', 'failsafe timeout policy exceeded', 'historical state', 'incorrect response body', 'intrinsic gas too high', 'intrinsic gas too low', 'out of gas', 'request timed out', 'request timeout', "state histories haven't been fully indexed yet"}
F**k EVM
- class MultiprocessMulticallReader
Bases:
objectAn instance created in a subprocess to do calls.
Specific to a chain (connection is married with a chain, otherwise stateless)
Initialises the web3 connection at the start of the process
If you try to read using multicall when the contract is not yet deployed (see
get_multicall_block_number()) then you get no results
Create subprocess worker instance.
- Parameters
web3factory – Initialise connection within the subprocess
batch_size –
How many calls we pack into the multicall.
Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.
- __init__(web3factory, batch_size=40, backswitch_threshold=100, too_many_requets_sleep=61.0)
Create subprocess worker instance.
- Parameters
web3factory (eth_defi.event_reader.web3factory.Web3Factory | web3.main.Web3) – Initialise connection within the subprocess
batch_size –
How many calls we pack into the multicall.
Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.
- last_switch
How many calls ago we switched the fallback provider.
- backswitch_threshold
Try to switch back from the fallback provider to the main provider after this many calls.
- get_gas_hint(chain_id, batch_calls)
Fix non-standard out of gas issues
- get_batch_size(web3, chain_id, block_identifier)
Fix non-standard out of gas issues.
TODO: Move these rules to their own module.
- call_multicall_with_batch_size(multicall_contract, block_identifier, batch_size, encoded_calls, require_multicall_result)
Communicate with Multicall3 contract.
Fail safes for ugly situations
- Parameters
multicall_contract (web3.contract.contract.Contract) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
batch_size (int) –
encoded_calls (list[tuple[eth_typing.evm.HexAddress, bytes]]) –
require_multicall_result (bool) –
- Return type
- process_calls(block_identifier, calls, require_multicall_result=False, timestamp=None, min_fallback_retries=5)
Work a chunk of calls in the subprocess.
Divide unlimited number of calls to something we think Multicall3 and RPC node can handle
If a single batch fail
- Parameters
require_multicall_result – Headache debug flag.
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) – Block number
timestamp (datetime.datetime | None) – Block timestamp
min_fallback_retries – Bang all RPCs at least this many times when attempting to make progress.
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]
- read_multicall_historical(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))
Read historical data using multiple threads in parallel for speedup.
Run over period of time (blocks)
Use multicall to harvest data from a single block at a time
Show a progress bar using
tqdm
- Parameters
chain_id (int) – Which chain we are targeting with calls.
web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses
start_block (int) – Block range to scoop
end_block (int) – Block range to scoop
step (int) – How many blocks we iterate at once
timeout – Joblib timeout to wait for a result from an individual task
progress_suffix (Optional[Callable]) – Allow caller to decorate the progress bar
require_multicall_result – Debug parameter to crash the reader if we start to get invalid replies from Multicall3 contract.
display_progress (bool | str) –
Whether to display progress bar or not.
Set to string to have a progress bar label.
hypersync_client (HypersyncClient | None) – Not used in this reader
calls (Iterable[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
timestamp_cache_file (pathlib.Path) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]
- read_multicall_historical_stateful(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, chunk_size=48, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))
Read historical data using multicall with reading state and adaptive frequency filtering.
Allow adaptive frequency with read state
Slower loop than the dumb
read_multicall_historical()as it has to maintain stateBecause of state, we need to do block by block reading, as we need to evaluate state to see which calls are needed for which block, and the state depends on the result of the previous blocks
- Parameters
chunk_size –
We guarantee to update the reader state at least this many steps.
24 = 24h hours per day, assuming we update state once for every day data read.
Between chunks we blindly push data to subprocesses for speedup, do not attempt to hear back from the multiprocess to update the state.
chain_id (int) –
web3factory (eth_defi.event_reader.web3factory.Web3Factory) –
calls (dict[eth_defi.event_reader.multicall_batcher.EncodedCall, eth_defi.event_reader.multicall_batcher.BatchCallState]) –
start_block (int) –
end_block (int) –
step (int) –
hypersync_client (HypersyncClient | None) –
timestamp_cache_file (pathlib.Path) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]
- read_multicall_chunked(chain_id, web3factory, calls, block_identifier, max_workers=8, timeout=1800, chunk_size=40, progress_bar_desc=None, timestamped_results=True, backend='loky')
Read current data using multiple processes in parallel for speedup.
All calls hit the same block number
Show a progress bar using
tqdm
Example:
# Generated packed multicall for each token contract we want to query balance_of_signature = Web3.keccak(text="balanceOf(address)")[0:4] def _gen_calls(addresses: Iterable[str]) -> Iterable[EncodedCall]: for _token_address in addresses: yield EncodedCall.from_keccak_signature( address=_token_address.lower(), signature=balance_of_signature, data=convert_address_to_bytes32(out_address), extra_data={}, ignore_errors=True, function="balanceOf", ) web3factory = MultiProviderWeb3Factory(web3.provider.endpoint_uri, hint="fetch_erc20_balances_multicall") # Execute calls for all token balance reads at a specific block. # read_multicall_chunked() will automatically split calls to multiple chunks # if we are querying too many. results = read_multicall_chunked( chain_id=chain_id, web3factory=web3factory, calls=list(_gen_calls(tokens)), block_identifier=block_identifier, max_workers=max_workers, timestamped_results=False, ) results = list(results) addr_to_balance = LowercaseDict() for result in results: token_address = result.call.address if not result.result: if raise_on_error: raise BalanceFetchFailed(f"Could not read token balance for ERC-20: {token_address} for address {out_address}") value = None else: raw_value = convert_int256_bytes_to_int(result.result) if decimalise: token = fetch_erc20_details(web3, token_address, cache=token_cache, chain_id=chain_id) value = token.convert_to_decimals(raw_value) else: value = raw_value addr_to_balance[token_address] = value
- Parameters
chain_id (int) – Which EVM chain we are targeting with calls.
web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) – List of calls to perform against Multicall3.
chunk_size (int) – Max calls per one chunk sent to Multicall contract, to stay below JSON-RPC read gas limit.
max_workers – How many parallel processes to use.
timeout – Joblib timeout to wait for a result from an individual task.
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
Block number to read.
Can be a block number or “latest” or “earliest”
progress_bar_desc (str | None) – If set, display a TQDM progress bar for the process.
timestamped_results –
Need timestamp of the block number in each result.
Causes very slow eth_getBlock call, use only if needed.
backend –
Joblib backend to use.
Either “loky” or “threading”.
- Returns
Iterable of results.
One entry per each call.
Calls may be different order than originally given.
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]
- class MulticallHistoricalTask
Bases:
objectPickled task send between multicall reader loop and subprocesses.
Send a batch of calls to a specific block.
- web3factory: eth_defi.event_reader.web3factory.Web3Factory
Used to initialise web3 connection in the subprocess
- block_number: Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]
Block number to sccan
- calls: list[eth_defi.event_reader.multicall_batcher.EncodedCall]
Multicalls to perform
- require_multicall_result: bool
Debug parameter to early abort if we get invalid replies from Multicall contract
- timestamp: datetime.datetime
Fetch timestamp not given.
Otherwise prefetched
- __init__(chain_id, web3factory, block_number, calls, require_multicall_result=False, timestamp=None, task_id=<factory>)
- Parameters
chain_id (int) –
web3factory (eth_defi.event_reader.web3factory.Web3Factory) –
block_number (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
require_multicall_result (bool) –
timestamp (datetime.datetime) –
task_id (int) –
- Return type
None