event_reader.reader
Documentation for eth_defi.event_reader.reader Python module.
High performance EVM event reader.
For further reading see:
Functions
|
Perform eth_getLogs call over a block range. |
|
Concurrency happy event extractor. |
|
Get block timestamps from block headers. |
|
Creates internal filter to match contract events. |
|
Reads multiple events from the blockchain. |
|
Reads multiple events from the blockchain parallel using a thread pool for IO. |
Classes
Informs any listener about the state of an event scan. |
|
Pass the event reader callable around. |
Exceptions
Timestamp does not look good. |
|
eth_getLogs call failed. |
|
Timestamp service does not have a timestasmp for a given block. |
- exception TimestampNotFound
Bases:
ExceptionTimestamp service does not have a timestasmp for a given block.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception BadTimestampValueReturned
Bases:
ExceptionTimestamp does not look good.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception ReadingLogsFailed
Bases:
Exceptioneth_getLogs call failed.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- ReaderConnection
How to pass a connection to the event readers
Single-threaded readers take Web3 instance as is, because this is the simplest
Multithreaded readers set up their own connection pools behind the scenes, and passing a single connection around is not meaningful
alias of
Optional[web3.main.Web3]
- class ProgressUpdate
Bases:
ProtocolInforms any listener about the state of an event scan.
Called before a new block is processed.
Hook this up with tqdm for an interactive progress bar.
- __init__(*args, **kwargs)
- class Web3EventReader
Bases:
ProtocolPass the event reader callable around.
An interface over event reader iterator.
Helps to type decorate event reader callable in the function arguments
The event reader implementation may be single-threaded, multithreaded, async based, etc.
For concrete implementation see
Example by using a single-thread reader:
from eth_defi.event_reader.reader import extract_events, Web3EventReader from eth_defi.enzyme.events import fetch_vault_balance_events read_events: Web3EventReader = cast(Web3EventReader, partial(extract_events)) balance_events = list(fetch_vault_balance_events(vault, start_block, end_block, read_events))
- __init__(*args, **kwargs)
- extract_timestamps_json_rpc(web3, start_block, end_block)
Get block timestamps from block headers.
Use slow JSON-RPC block headers call to get this information.
TODO: This is an old code path. This has been replaced by more robust
ReorganisationMonitorimplementation.
- extract_events(web3, start_block, end_block, filter, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, reorg_mon=None, attempts=5, throttle_sleep=15)
Perform eth_getLogs call over a block range.
You should use
read_events()unless you know the block range is something your node can handle.- Parameters
start_block (int) – First block to process (inclusive)
end_block (int) – Last block to process (inclusive)
filter (eth_defi.event_reader.filter.Filter) – Internal filter used to match logs
extract_timestamps (Optional[Callable]) –
Method to get the block timestamps.
This might need to use expensive`eth_getBlockByNumber` JSON-RPC API call. It will seriously slow down event reading. Set extract_timestamps to None to not get timestamps, but fast event lookups.
context (Optional[eth_defi.event_reader.logresult.LogContext]) – Passed to the all generated logs
reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) – If passed, use this instance to monitor and raise chain reorganisation exceptions.
web3 (web3.main.Web3) –
- Returns
Iterable for the raw event data
- Return type
- extract_events_concurrent(start_block, end_block, filter, context=None, extract_timestamps=<function extract_timestamps_json_rpc>)
Concurrency happy event extractor.
You should use
read_events_concurrent()unless you know the block range is something your node can handle.Called by the thread pool - you probably do not want to call this directly.
Assumes the web3 connection is preset when the concurrent worker has been created, see get_worker_web3().
- Parameters
start_block (int) –
end_block (int) –
filter (eth_defi.event_reader.filter.Filter) –
context (Optional[eth_defi.event_reader.logresult.LogContext]) –
- Return type
- prepare_filter(events)
Creates internal filter to match contract events.
- Parameters
events (List[web3.contract.contract.ContractEvent]) –
- Return type
- read_events(web3, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)
Reads multiple events from the blockchain.
Optimized to read multiple events from test blockchains.
Note
For a much faster event reader check
eth_defi.reader.multithread.MultithreadEventReader. This implementation is mostly good with EVM test backends or very small block ranges.Scans chains block by block
Returns events as a dict for optimal performance
Supports interactive progress bar
Reads all the events matching signature - any filtering must be done by the reader
See scripts/read-uniswap-v2-pairs-and-swaps.py for a full example.
Example:
json_rpc_url = os.environ["JSON_RPC_URL"] web3 = Web3(HTTPProvider(json_rpc_url) web3.middleware_onion.clear() # Get contracts Factory = get_contract(web3, "sushi/UniswapV2Factory.json") start_block = 1 end_block = web3.eth.block_number filter = Filter.create_filter( factory_address, [Factory.events.PairCreated], ) # Read through all the events, all the chain, using a single threaded slow loop. # Only suitable for test EVM backends. pairs = [] log: LogResult for log in read_events( web3, start_block, end_block, filter=filter, extract_timestamps=None, ): # Signature this # # event PairCreated(address indexed token0, address indexed token1, address pair, uint); # # topic 0 = keccak(event signature) # topic 1 = token 0 # topic 2 = token 1 # argument 0 = pair # argument 1 = pair id # # log for EthereumTester backend is # # {'type': 'mined', # 'logIndex': 0, # 'transactionIndex': 0, # 'transactionHash': HexBytes('0x2cf4563f8c275e5b5d7a4e5496bfbaf15cc00d530f15f730ac4a0decbc01d963'), # 'blockHash': HexBytes('0x7c0c6363bc8f4eac452a37e45248a720ff09f330117cdfac67640d31d140dc38'), # 'blockNumber': 6, # 'address': '0xF2E246BB76DF876Cef8b38ae84130F4F55De395b', # 'data': HexBytes('0x00000000000000000000000068931307edcb44c3389c507dab8d5d64d242e58f0000000000000000000000000000000000000000000000000000000000000001'), # 'topics': [HexBytes('0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9'), # HexBytes('0x0000000000000000000000002946259e0334f33a064106302415ad3391bed384'), # HexBytes('0x000000000000000000000000b9816fc57977d5a786e654c7cf76767be63b966e')], # 'context': None, # 'event': web3._utils.datatypes.PairCreated, # 'chunk_id': 1, # 'timestamp': None} # arguments = decode_data(log["data"]) topics = log["topics"] token0 = convert_uint256_hex_string_to_address(topics[1]) token1 = convert_uint256_hex_string_to_address(topics[2]) pair_address = convert_uint256_bytes_to_address(arguments[0]) pair_id = convert_int256_bytes_to_int(arguments[1]) token0_details = fetch_erc20_details(web3, token0) token1_details = fetch_erc20_details(web3, token1)
- Parameters
web3 (web3.main.Web3) – Web3 instance
events (Optional[List[web3.contract.contract.ContractEvent]]) –
List of Web3.py contract event classes to scan for.
Pass this or filter.
notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Optional callback to be called before starting to scan each chunk
start_block (int) – First block to process (inclusive)
end_block (int) – Last block to process (inclusive)
extract_timestamps (Optional[Callable]) –
Override for different block timestamp extraction methods.
This might need to use expensive`eth_getBlockByNumber` JSON-RPC API call. It will seriously slow down event reading. Set extract_timestamps to None to not get timestamps, but fast event lookups.
chunk_size (int) – How many blocks to scan in one eth_getLogs call
context (Optional[eth_defi.event_reader.logresult.LogContext]) – Passed to the all generated logs
filter (Optional[eth_defi.event_reader.filter.Filter]) –
Pass a custom event filter for the readers
Pass this or events.
reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) – If passed, use this instance to monitor and raise chain reorganisation exceptions.
- Returns
Iterate over
LogResultinstances for each event matched in the filter.- Return type
- read_events_concurrent(executor, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)
Reads multiple events from the blockchain parallel using a thread pool for IO.
Optimized to read multiple events fast.
Uses a thread worker pool for concurrency
Even though we receive data from JSON-RPC API in random order, the iterable results are always in the correct order (and processes in a single thread)
Returns events as a dict for optimal performance
Can resume scan
Supports interactive progress bar
Reads all the events matching signature - any filtering must be done by the reader
See scripts/read-uniswap-v2-pairs-and-swaps-concurrent.py for a full example.
Example:
json_rpc_url = os.environ["JSON_RPC_URL"] token_cache = TokenCache() threads = 16 http_adapter = requests.adapters.HTTPAdapter(pool_connections=threads, pool_maxsize=threads) web3_factory = TunedWeb3Factory(json_rpc_url, http_adapter) web3 = web3_factory(token_cache) executor = create_thread_pool_executor(web3_factory, context=token_cache, max_workers=threads) # Get contracts Factory = get_contract(web3, "UniswapV2Factory.json") events = [ Factory.events.PairCreated, ] start_block = 10_000_835 # Uni deployed end_block = 10_009_000 # The first pair created before this block # Read through the blog ran out = [] for log_result in read_events_concurrent( executor, start_block, end_block, events, None, chunk_size=100, context=token_cache, extract_timestamps=None, ): out.append(decode_pair_created(log_result))
- Parameters
executor (futureproof.executors.ThreadPoolExecutor) – Thread pool executor created with
eth_defi.event_reader.web3worker.create_thread_pool_executor()events (Optional[List[web3.contract.contract.ContractEvent]]) – List of Web3.py contract event classes to scan for
notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Optional callback to be called before starting to scan each chunk
start_block (int) – First block to process (inclusive)
end_block (int) – Last block to process (inclusive)
extract_timestamps (Optional[Callable]) – Override for different block timestamp extraction methods
chunk_size (int) – How many blocks to scan in one eth_getLogs call
context (Optional[eth_defi.event_reader.logresult.LogContext]) – Passed to the all generated logs
filter (Optional[eth_defi.event_reader.filter.Filter]) – Pass a custom event filter for the readers
reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) – If passed, use this instance to monitor and raise chain reorganisation exceptions.
- Returns
Iterate over
LogResultinstances for each event matched in the filter.- Return type