event_reader.reader

Documentation for eth_defi.event_reader.reader Python module.

High performance EVM event reader.

For further reading see:

Functions

extract_events(web3, start_block, end_block, ...)

Perform eth_getLogs call over a block range.

extract_events_concurrent(start_block, ...)

Concurrency happy event extractor.

extract_timestamps_json_rpc(web3, ...)

Get block timestamps from block headers.

prepare_filter(events)

Creates internal filter to match contract events.

read_events(web3, start_block, end_block[, ...])

Reads multiple events from the blockchain.

read_events_concurrent(executor, ...[, ...])

Reads multiple events from the blockchain parallel using a thread pool for IO.

Classes

ProgressUpdate

Informs any listener about the state of an event scan.

Web3EventReader

Pass the event reader callable around.

Exceptions

BadTimestampValueReturned

Timestamp does not look good.

ReadingLogsFailed

eth_getLogs call failed.

TimestampNotFound

Timestamp service does not have a timestasmp for a given block.

exception TimestampNotFound

Bases: Exception

Timestamp service does not have a timestasmp for a given block.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception BadTimestampValueReturned

Bases: Exception

Timestamp does not look good.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception ReadingLogsFailed

Bases: Exception

eth_getLogs call failed.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

ReaderConnection

How to pass a connection to the event readers

  • Single-threaded readers take Web3 instance as is, because this is the simplest

  • Multithreaded readers set up their own connection pools behind the scenes, and passing a single connection around is not meaningful

alias of Optional[web3.main.Web3]

class ProgressUpdate

Bases: Protocol

Informs any listener about the state of an event scan.

Called before a new block is processed.

Hook this up with tqdm for an interactive progress bar.

__init__(*args, **kwargs)
class Web3EventReader

Bases: Protocol

Pass the event reader callable around.

An interface over event reader iterator.

  • Helps to type decorate event reader callable in the function arguments

  • The event reader implementation may be single-threaded, multithreaded, async based, etc.

For concrete implementation see

Example by using a single-thread reader:

from eth_defi.event_reader.reader import extract_events, Web3EventReader
from eth_defi.enzyme.events import fetch_vault_balance_events

read_events: Web3EventReader = cast(Web3EventReader, partial(extract_events))
balance_events = list(fetch_vault_balance_events(vault, start_block, end_block, read_events))
__init__(*args, **kwargs)
extract_timestamps_json_rpc(web3, start_block, end_block)

Get block timestamps from block headers.

Use slow JSON-RPC block headers call to get this information.

TODO: This is an old code path. This has been replaced by more robust ReorganisationMonitor implementation.

Returns

block hash -> UNIX timestamp mapping

Parameters
  • web3 (web3.main.Web3) –

  • start_block (int) –

  • end_block (int) –

Return type

Dict[str, int]

extract_events(web3, start_block, end_block, filter, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, reorg_mon=None, attempts=5, throttle_sleep=15)

Perform eth_getLogs call over a block range.

You should use read_events() unless you know the block range is something your node can handle.

Parameters
Returns

Iterable for the raw event data

Return type

Iterable[eth_defi.event_reader.logresult.LogResult]

extract_events_concurrent(start_block, end_block, filter, context=None, extract_timestamps=<function extract_timestamps_json_rpc>)

Concurrency happy event extractor.

You should use read_events_concurrent() unless you know the block range is something your node can handle.

Called by the thread pool - you probably do not want to call this directly.

Assumes the web3 connection is preset when the concurrent worker has been created, see get_worker_web3().

Parameters
Return type

List[eth_defi.event_reader.logresult.LogResult]

prepare_filter(events)

Creates internal filter to match contract events.

Parameters

events (List[web3.contract.contract.ContractEvent]) –

Return type

eth_defi.event_reader.filter.Filter

read_events(web3, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)

Reads multiple events from the blockchain.

Optimized to read multiple events from test blockchains.

Note

For a much faster event reader check eth_defi.reader.multithread.MultithreadEventReader. This implementation is mostly good with EVM test backends or very small block ranges.

  • Scans chains block by block

  • Returns events as a dict for optimal performance

  • Supports interactive progress bar

  • Reads all the events matching signature - any filtering must be done by the reader

See scripts/read-uniswap-v2-pairs-and-swaps.py for a full example.

Example:

json_rpc_url = os.environ["JSON_RPC_URL"]
web3 = Web3(HTTPProvider(json_rpc_url)

web3.middleware_onion.clear()

# Get contracts
Factory = get_contract(web3, "sushi/UniswapV2Factory.json")

start_block = 1
end_block = web3.eth.block_number

filter = Filter.create_filter(
    factory_address,
    [Factory.events.PairCreated],
)

# Read through all the events, all the chain, using a single threaded slow loop.
# Only suitable for test EVM backends.
pairs = []
log: LogResult
for log in read_events(
    web3,
    start_block,
    end_block,
    filter=filter,
    extract_timestamps=None,
):
    # Signature this
    #
    #  event PairCreated(address indexed token0, address indexed token1, address pair, uint);
    #
    # topic 0 = keccak(event signature)
    # topic 1 = token 0
    # topic 2 = token 1
    # argument 0 = pair
    # argument 1 = pair id
    #
    # log for EthereumTester backend is
    #
    # {'type': 'mined',
    #  'logIndex': 0,
    #  'transactionIndex': 0,
    #  'transactionHash': HexBytes('0x2cf4563f8c275e5b5d7a4e5496bfbaf15cc00d530f15f730ac4a0decbc01d963'),
    #  'blockHash': HexBytes('0x7c0c6363bc8f4eac452a37e45248a720ff09f330117cdfac67640d31d140dc38'),
    #  'blockNumber': 6,
    #  'address': '0xF2E246BB76DF876Cef8b38ae84130F4F55De395b',
    #  'data': HexBytes('0x00000000000000000000000068931307edcb44c3389c507dab8d5d64d242e58f0000000000000000000000000000000000000000000000000000000000000001'),
    #  'topics': [HexBytes('0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9'),
    #   HexBytes('0x0000000000000000000000002946259e0334f33a064106302415ad3391bed384'),
    #   HexBytes('0x000000000000000000000000b9816fc57977d5a786e654c7cf76767be63b966e')],
    #  'context': None,
    #  'event': web3._utils.datatypes.PairCreated,
    #  'chunk_id': 1,
    #  'timestamp': None}
    #
    arguments = decode_data(log["data"])
    topics = log["topics"]
    token0 = convert_uint256_hex_string_to_address(topics[1])
    token1 = convert_uint256_hex_string_to_address(topics[2])
    pair_address = convert_uint256_bytes_to_address(arguments[0])
    pair_id = convert_int256_bytes_to_int(arguments[1])

    token0_details = fetch_erc20_details(web3, token0)
    token1_details = fetch_erc20_details(web3, token1)
Parameters
Returns

Iterate over LogResult instances for each event matched in the filter.

Return type

Iterable[eth_defi.event_reader.logresult.LogResult]

read_events_concurrent(executor, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)

Reads multiple events from the blockchain parallel using a thread pool for IO.

Optimized to read multiple events fast.

  • Uses a thread worker pool for concurrency

  • Even though we receive data from JSON-RPC API in random order, the iterable results are always in the correct order (and processes in a single thread)

  • Returns events as a dict for optimal performance

  • Can resume scan

  • Supports interactive progress bar

  • Reads all the events matching signature - any filtering must be done by the reader

See scripts/read-uniswap-v2-pairs-and-swaps-concurrent.py for a full example.

Example:

json_rpc_url = os.environ["JSON_RPC_URL"]
token_cache = TokenCache()
threads = 16
http_adapter = requests.adapters.HTTPAdapter(pool_connections=threads, pool_maxsize=threads)
web3_factory = TunedWeb3Factory(json_rpc_url, http_adapter)
web3 = web3_factory(token_cache)
executor = create_thread_pool_executor(web3_factory, context=token_cache, max_workers=threads)

# Get contracts
Factory = get_contract(web3, "UniswapV2Factory.json")

events = [
    Factory.events.PairCreated,
]

start_block = 10_000_835  # Uni deployed
end_block = 10_009_000  # The first pair created before this block

# Read through the blog ran
out = []
for log_result in read_events_concurrent(
    executor,
    start_block,
    end_block,
    events,
    None,
    chunk_size=100,
    context=token_cache,
    extract_timestamps=None,
):
    out.append(decode_pair_created(log_result))
Parameters
Returns

Iterate over LogResult instances for each event matched in the filter.

Return type

Iterable[eth_defi.event_reader.logresult.LogResult]