read_multicall_chunked

Documentation for eth_defi.event_reader.multicall_batcher.read_multicall_chunked function.

read_multicall_chunked(chain_id, web3factory, calls, block_identifier, max_workers=8, timeout=1800, chunk_size=40, progress_bar_desc=None, timestamped_results=True, backend='loky')

Read current data using multiple processes in parallel for speedup.

  • All calls hit the same block number

  • Show a progress bar using tqdm

Example:

# Generated packed multicall for each token contract we want to query
balance_of_signature = Web3.keccak(text="balanceOf(address)")[0:4]


def _gen_calls(addresses: Iterable[str]) -> Iterable[EncodedCall]:
    for _token_address in addresses:
        yield EncodedCall.from_keccak_signature(
            address=_token_address.lower(),
            signature=balance_of_signature,
            data=convert_address_to_bytes32(out_address),
            extra_data={},
            ignore_errors=True,
            function="balanceOf",
        )


web3factory = MultiProviderWeb3Factory(web3.provider.endpoint_uri, hint="fetch_erc20_balances_multicall")

# Execute calls for all token balance reads at a specific block.
# read_multicall_chunked() will automatically split calls to multiple chunks
# if we are querying too many.
results = read_multicall_chunked(
    chain_id=chain_id,
    web3factory=web3factory,
    calls=list(_gen_calls(tokens)),
    block_identifier=block_identifier,
    max_workers=max_workers,
    timestamped_results=False,
)

results = list(results)

addr_to_balance = LowercaseDict()

for result in results:
    token_address = result.call.address

    if not result.result:
        if raise_on_error:
            raise BalanceFetchFailed(f"Could not read token balance for ERC-20: {token_address} for address {out_address}")
        value = None
    else:
        raw_value = convert_int256_bytes_to_int(result.result)
        if decimalise:
            token = fetch_erc20_details(web3, token_address, cache=token_cache, chain_id=chain_id)
            value = token.convert_to_decimals(raw_value)
        else:
            value = raw_value

    addr_to_balance[token_address] = value
Parameters
  • chain_id (int) – Which EVM chain we are targeting with calls.

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses

  • calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) – List of calls to perform against Multicall3.

  • chunk_size (int) – Max calls per one chunk sent to Multicall contract, to stay below JSON-RPC read gas limit.

  • max_workers – How many parallel processes to use.

  • timeout – Joblib timeout to wait for a result from an individual task.

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

    Block number to read.

    • Can be a block number or “latest” or “earliest”

  • progress_bar_desc (str | None) – If set, display a TQDM progress bar for the process.

  • timestamped_results

    Need timestamp of the block number in each result.

    Causes very slow eth_getBlock call, use only if needed.

  • backend

    Joblib backend to use.

    Either “loky” or “threading”.

Returns

Iterable of results.

One entry per each call.

Calls may be different order than originally given.

Return type

Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]