event_reader.reorganisation_monitor
Documentation for eth_defi.event_reader.reorganisation_monitor Python module.
Chain reorganisation handling during the chain data reading.
All EMV based blockchains are subject to minor chain reorganisation, when nodes have not yet reached consensus on the chain tip around the world.
Functions
|
Set up a chain reorganisation monitor tactic based on the node supported APIs |
Classes
How did we fare getting hashes and timestamps for the latest blocks. |
|
Watch blockchain for reorgs using GoEthereum /graphql API. |
|
Watch blockchain for reorgs using eth_getBlockByNumber JSON-RPC API. |
|
A dummy reorganisation monitor for unit testing. |
|
Watch blockchain for reorgs. |
Exceptions
Tried to ask timestamp data for a block that does not exist yet. |
|
Chould not figure out chain reorgs after mutliple attempt. |
|
Reorg scan range is too long. |
- class ChainReorganisationResolution
Bases:
objectHow did we fare getting hashes and timestamps for the latest blocks.
- last_live_block: int
What we know is the chain tip on our node
This is the latest block at the JSON-RPC node. We can read data up to this block.
- latest_block_with_good_data: int
What we know is the block for which we do not need to perform rollback
This is the block number that does not need to purged from your internal database. All previously read events that have higher block number should be purged.
- get_read_range()
Get the range of blocks we should read on this poll cycle.
This range may overlap your previous event read range.
You should discard any data that’s older than the start of the range
You should be prepared to read an event again
- exception ChainReorganisationDetected
Bases:
Exception- __init__(block_number, original_hash, new_hash)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception TooLongRange
Bases:
ExceptionReorg scan range is too long.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception ReorganisationResolutionFailure
Bases:
ExceptionChould not figure out chain reorgs after mutliple attempt.
Node in a bad state?
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception BlockNotAvailable
Bases:
ExceptionTried to ask timestamp data for a block that does not exist yet.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class ReorganisationMonitor
Bases:
abc.ABCWatch blockchain for reorgs.
Most EMV blockchains have several minor chain organisations per day, when your node switched from one chain tip to another, due to block propagation issues. Any application reading blockchain event data must be able to detect such reorganisations and purge incorrect data from their data feeds.
Abstract base class for different ways to support chain reorganisations
Maintain the state where our blockchain read cursor is, using
get_last_block_read()Ingest and maintain the state of the last read blocks using
update_chain()Check block headers for chain reorganisations when reading events from the chain using
check_block_reorg()Manages the service for block timestamp lookups,
get_block_timestamp()Save and load block header state to disk cache, because APIs are slow, using
load_pandas()andto_pandas()
Example:
import os import time from web3 import HTTPProvider, Web3 from eth_defi.abi import get_contract from eth_defi.chain import install_chain_middleware from eth_defi.event_reader.filter import Filter from eth_defi.event_reader.reader import read_events, LogResult, from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor def main(): json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com") web3 = Web3(HTTPProvider(json_rpc_url)) web3.middleware_onion.clear() install_chain_middleware(web3) # Get contracts Pair = get_contract(web3, "sushi/UniswapV2Pair.json") filter = Filter.create_filter( address=None, # Listen events from any smart contract event_types=[Pair.events.Swap] ) reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3) reorg_mon.load_initial_block_headers(block_count=5) processed_events = set() latest_block = None # Keep reading events as they land while True: chain_reorg_resolution = reorg_mon.update_chain() start, end = chain_reorg_resolution.get_read_range() if chain_reorg_resolution.reorg_detected: print("Chain reorg warning") evt: LogResult for evt in read_events( web3, start_block=start, end_block=end, filter=filter, ): # How to uniquely identify EVM logs key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"] # The reader may cause duplicate events as the chain tip reorganises if key not in processed_events: print(f"Swap at block {evt['blockNumber']:,} tx: {evt['transactionHash']}") processed_events.add(key) if end != latest_block: print(f"Latest block is {end:,}") latest_block = end time.sleep(0.5) if __name__ == "__main__": main()
- block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]
Internal buffer of our block data
Block number -> Block header data
- last_block_read: int = 0
Last block served by
update_chain()in the duty cycle
- check_depth: int = 20
How many blocks we replay from the blockchain to detect any chain organisations
Done by
figure_reorganisation_and_new_blocks(). Adjust this for your EVM chain.
- max_cycle_tries = 10
How many times we try to re-read data from the blockchain in the case of reorganisation.
If our node constantly feeds us changing data give up.
- reorg_wait_seconds = 5
How long we allow our node to catch up in the case there has been a change in the chain tip.
If our node constantly feeds us changing data give up.
- get_block_by_number(block_number)
Get block header data for a specific block number from our memory buffer.
- Parameters
block_number (int) –
- Return type
- skip_to_block(block_number)
Skip scanning initial chain and directly start from a certain block.
- Parameters
block_number (int) –
- load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)
Get the initial block buffer filled up.
You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.
- Parameters
How many latest block to load
Give start_block or block_count.
What is the first block to read.
Give start_block or block_count.
tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar
save_callable (Optional[Callable]) –
Save after every block.
Called after every block.
TODO: Hack. Design a better interface.
- Returns
The initial block range to start to work with
- Return type
- add_block(record)
Add new block to header tracking.
Blocks must be added in order.
- Parameters
- check_block_reorg(block_number, block_hash)
Check that newly read block matches our record.
Called during the event reader
Event reader gets the block number and hash with the event
We have initial block_map in memory, previously buffered in
We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation
If we do not have records, ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- Parameters
- Return type
- truncate(latest_good_block)
Delete data after a block number because chain reorg happened.
- Parameters
latest_good_block (int) – Delete all data starting after this block (exclusive)
- figure_reorganisation_and_new_blocks(max_range=1000000)
Compare the local block database against the live data from chain.
Spot the differences in (block number, block header) tuples and determine a chain reorg.
- Parameters
Abort if we need to scan more than this amount of blocks.
This is because giving too long block range to scan is likely to take forever on non-graphql nodes.
Set None to ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- get_block_timestamp(block_number)
Return UNIX UTC timestamp of a block.
- get_block_timestamp_as_pandas(block_number)
Return UNIX UTC timestamp of a block.
- Parameters
block_number (int) –
- Return type
pandas._libs.tslibs.timestamps.Timestamp
- update_chain()
Update the internal memory buffer of block headers from the blockchain node.
Do several attempt to read data (as a fork can cause other forks can cause fork)
Give up after some time if we detect the chain to be in a doom loop
- Returns
What block range the consumer application should read.
What we think about the chain state.
- Return type
eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution
- to_pandas(partition_size=0)
Convert the data to Pandas DataFrame format for storing.
- Parameters
partition_size (int) –
To partition the outgoing data.
Set 0 to ignore.
- Return type
pandas.core.frame.DataFrame
- load_pandas(df)
Load block header data from Pandas data frame.
- Parameters
df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with
to_pandas().
- restore(block_map)
Restore the chain state from a saved data.
- Parameters
block_map (dict) – Block number -> Block header dictionary
- abstract fetch_block_data(start_block, end_block)
Read the new block headers.
- Parameters
start_block – The first block where to read (inclusive)
end_block – The block where to read (inclusive)
- Return type
- class JSONRPCReorganisationMonitor
Bases:
eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitorWatch blockchain for reorgs using eth_getBlockByNumber JSON-RPC API.
Use expensive eth_getBlockByNumber call to download block hash and timestamp from Ethereum compatible node
- __init__(web3, **kwargs)
- Parameters
web3 (web3.main.Web3) –
- get_last_block_live()
Get last block number
- fetch_block_data(start_block, end_block)
Read the new block headers.
- Parameters
start_block – The first block where to read (inclusive)
end_block – The block where to read (inclusive)
- Return type
- add_block(record)
Add new block to header tracking.
Blocks must be added in order.
- Parameters
- check_block_reorg(block_number, block_hash)
Check that newly read block matches our record.
Called during the event reader
Event reader gets the block number and hash with the event
We have initial block_map in memory, previously buffered in
We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation
If we do not have records, ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- Parameters
- Return type
- figure_reorganisation_and_new_blocks(max_range=1000000)
Compare the local block database against the live data from chain.
Spot the differences in (block number, block header) tuples and determine a chain reorg.
- Parameters
Abort if we need to scan more than this amount of blocks.
This is because giving too long block range to scan is likely to take forever on non-graphql nodes.
Set None to ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- get_block_by_number(block_number)
Get block header data for a specific block number from our memory buffer.
- Parameters
block_number (int) –
- Return type
- get_block_timestamp(block_number)
Return UNIX UTC timestamp of a block.
- get_block_timestamp_as_pandas(block_number)
Return UNIX UTC timestamp of a block.
- Parameters
block_number (int) –
- Return type
pandas._libs.tslibs.timestamps.Timestamp
- load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)
Get the initial block buffer filled up.
You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.
- Parameters
How many latest block to load
Give start_block or block_count.
What is the first block to read.
Give start_block or block_count.
tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar
save_callable (Optional[Callable]) –
Save after every block.
Called after every block.
TODO: Hack. Design a better interface.
- Returns
The initial block range to start to work with
- Return type
- load_pandas(df)
Load block header data from Pandas data frame.
- Parameters
df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with
to_pandas().
- restore(block_map)
Restore the chain state from a saved data.
- Parameters
block_map (dict) – Block number -> Block header dictionary
- skip_to_block(block_number)
Skip scanning initial chain and directly start from a certain block.
- Parameters
block_number (int) –
- to_pandas(partition_size=0)
Convert the data to Pandas DataFrame format for storing.
- Parameters
partition_size (int) –
To partition the outgoing data.
Set 0 to ignore.
- Return type
pandas.core.frame.DataFrame
- truncate(latest_good_block)
Delete data after a block number because chain reorg happened.
- Parameters
latest_good_block (int) – Delete all data starting after this block (exclusive)
- update_chain()
Update the internal memory buffer of block headers from the blockchain node.
Do several attempt to read data (as a fork can cause other forks can cause fork)
Give up after some time if we detect the chain to be in a doom loop
- Returns
What block range the consumer application should read.
What we think about the chain state.
- Return type
eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution
- block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]
Internal buffer of our block data
Block number -> Block header data
- class GraphQLReorganisationMonitor
Bases:
eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitorWatch blockchain for reorgs using GoEthereum /graphql API.
This is ~10x - 100x faster than using JSON-RPC API
See https://geth.ethereum.org/docs/interacting-with-geth/rpc/graphql for details
- Parameters
graphql_url – Give this or existing HTTPProvider
provider – Give this or graphql_url
- __init__(graphql_url=None, provider=None, **kwargs)
- get_last_block_live()
Get the chain tip using GraphQL.
See https://geth.ethereum.org/docs/interacting-with-geth/rpc/graphql for details
- Return type
- fetch_block_data(start_block, end_block)
Read the new block headers.
- Parameters
start_block – The first block where to read (inclusive)
end_block – The block where to read (inclusive)
- Return type
- add_block(record)
Add new block to header tracking.
Blocks must be added in order.
- Parameters
- check_block_reorg(block_number, block_hash)
Check that newly read block matches our record.
Called during the event reader
Event reader gets the block number and hash with the event
We have initial block_map in memory, previously buffered in
We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation
If we do not have records, ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- Parameters
- Return type
- figure_reorganisation_and_new_blocks(max_range=1000000)
Compare the local block database against the live data from chain.
Spot the differences in (block number, block header) tuples and determine a chain reorg.
- Parameters
Abort if we need to scan more than this amount of blocks.
This is because giving too long block range to scan is likely to take forever on non-graphql nodes.
Set None to ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- get_block_by_number(block_number)
Get block header data for a specific block number from our memory buffer.
- Parameters
block_number (int) –
- Return type
- get_block_timestamp(block_number)
Return UNIX UTC timestamp of a block.
- get_block_timestamp_as_pandas(block_number)
Return UNIX UTC timestamp of a block.
- Parameters
block_number (int) –
- Return type
pandas._libs.tslibs.timestamps.Timestamp
- load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)
Get the initial block buffer filled up.
You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.
- Parameters
How many latest block to load
Give start_block or block_count.
What is the first block to read.
Give start_block or block_count.
tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar
save_callable (Optional[Callable]) –
Save after every block.
Called after every block.
TODO: Hack. Design a better interface.
- Returns
The initial block range to start to work with
- Return type
- load_pandas(df)
Load block header data from Pandas data frame.
- Parameters
df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with
to_pandas().
- restore(block_map)
Restore the chain state from a saved data.
- Parameters
block_map (dict) – Block number -> Block header dictionary
- skip_to_block(block_number)
Skip scanning initial chain and directly start from a certain block.
- Parameters
block_number (int) –
- to_pandas(partition_size=0)
Convert the data to Pandas DataFrame format for storing.
- Parameters
partition_size (int) –
To partition the outgoing data.
Set 0 to ignore.
- Return type
pandas.core.frame.DataFrame
- truncate(latest_good_block)
Delete data after a block number because chain reorg happened.
- Parameters
latest_good_block (int) – Delete all data starting after this block (exclusive)
- update_chain()
Update the internal memory buffer of block headers from the blockchain node.
Do several attempt to read data (as a fork can cause other forks can cause fork)
Give up after some time if we detect the chain to be in a doom loop
- Returns
What block range the consumer application should read.
What we think about the chain state.
- Return type
eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution
- block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]
Internal buffer of our block data
Block number -> Block header data
- class MockChainAndReorganisationMonitor
Bases:
eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitorA dummy reorganisation monitor for unit testing.
Simulate block production and chain reorgs by minor forks, like a real blockchain.
We get the explicit control to introduce simulated forks
- simulated_block_number
Next available block number
- produce_blocks(block_count=1)
Populate the fake blocks in mock chain.
These blocks will be “read” in py:meth:figure_reorganisation_and_new_blocks.
- produce_fork(block_number, fork_marker='0x8888')
Mock a fork int he chain.
- Parameters
block_number (int) –
- get_last_block_live()
Get last block number
- add_block(record)
Add new block to header tracking.
Blocks must be added in order.
- Parameters
- check_block_reorg(block_number, block_hash)
Check that newly read block matches our record.
Called during the event reader
Event reader gets the block number and hash with the event
We have initial block_map in memory, previously buffered in
We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation
If we do not have records, ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- Parameters
- Return type
- fetch_block_data(start_block, end_block)
Read the new block headers.
- Parameters
start_block – The first block where to read (inclusive)
end_block – The block where to read (inclusive)
- Return type
- figure_reorganisation_and_new_blocks(max_range=1000000)
Compare the local block database against the live data from chain.
Spot the differences in (block number, block header) tuples and determine a chain reorg.
- Parameters
Abort if we need to scan more than this amount of blocks.
This is because giving too long block range to scan is likely to take forever on non-graphql nodes.
Set None to ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- get_block_by_number(block_number)
Get block header data for a specific block number from our memory buffer.
- Parameters
block_number (int) –
- Return type
- get_block_timestamp(block_number)
Return UNIX UTC timestamp of a block.
- get_block_timestamp_as_pandas(block_number)
Return UNIX UTC timestamp of a block.
- Parameters
block_number (int) –
- Return type
pandas._libs.tslibs.timestamps.Timestamp
- load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)
Get the initial block buffer filled up.
You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.
- Parameters
How many latest block to load
Give start_block or block_count.
What is the first block to read.
Give start_block or block_count.
tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar
save_callable (Optional[Callable]) –
Save after every block.
Called after every block.
TODO: Hack. Design a better interface.
- Returns
The initial block range to start to work with
- Return type
- load_pandas(df)
Load block header data from Pandas data frame.
- Parameters
df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with
to_pandas().
- restore(block_map)
Restore the chain state from a saved data.
- Parameters
block_map (dict) – Block number -> Block header dictionary
- skip_to_block(block_number)
Skip scanning initial chain and directly start from a certain block.
- Parameters
block_number (int) –
- to_pandas(partition_size=0)
Convert the data to Pandas DataFrame format for storing.
- Parameters
partition_size (int) –
To partition the outgoing data.
Set 0 to ignore.
- Return type
pandas.core.frame.DataFrame
- truncate(latest_good_block)
Delete data after a block number because chain reorg happened.
- Parameters
latest_good_block (int) – Delete all data starting after this block (exclusive)
- update_chain()
Update the internal memory buffer of block headers from the blockchain node.
Do several attempt to read data (as a fork can cause other forks can cause fork)
Give up after some time if we detect the chain to be in a doom loop
- Returns
What block range the consumer application should read.
What we think about the chain state.
- Return type
eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution
- block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]
Internal buffer of our block data
Block number -> Block header data
- create_reorganisation_monitor(web3, check_depth=250)
Set up a chain reorganisation monitor tactic based on the node supported APIs
Chain reorganisation monitor detects if any of blocks at the chain tip have changed since the last read or during the read
create_reorgation_monitor()sets up a fast /graphql API endpoint based block scanner when the endpoint is offered by the node. This is 10x - 50x faster than JSON-RPC.If /graphql endpoint is not available, then we fall back to JSON-RPC based slow reorganisation monitoring
- Parameters
check_depth –
How many blocks in the past we check on the reorganisation scan to detect any changes.
If the reorganisation happens further past then the off-chain accounting will be broken.
The maximum Polygon reorganisation depth has been 157 blocks.
web3 (web3.main.Web3) –
- Returns
A reorg mon instance.
Either
GraphQLReorganisationMonitororJSONRPCReorganisationMonitor- Return type
eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor