Extensions
Add new Event
TODO: Description of how to add an event goes here
Add new Contract ABI
TODO: Description of how to add a Contract ABI goes here
Add new data collection mode
Adding a new data collection / processing mode can be useful if you want to make use of parallelization via Kafka workers. This is an example on how we added the get_logs data collection mode.
- Add new DataCollectionModeenum value in app/model/init.py.
- GET_LOGS = auto()
- add documentation if needed
- Update app/config.py if needed
- (e.g. get_logs method requires 'params' field)
- Update app/producer.py to account for the new DataCollectionMode
- add new method _start_get_logs_producer()toDataProducer.
- add matchcase for the newDataCollectionModein_start_producer_task()method:
- implement _start_get_logs_producer():
- call eth_getLogsand then send all transactions to Kafka"""Start a producer that uses the `eth_getLogs` RPC method to get all the transactions""" # Get logs logs = await self.node_connector.w3.eth.get_logs(filter_params=data_collection_cfg.params) # Send them to Kafka if logs: # Encode the logs as kafka events messages = [ self.encode_kafka_event(log["transactionHash"].hex(), data_collection_cfg.mode) for log in logs ] # Send all the transaction hashes to Kafka so consumers can process them await self.kafka_manager.send_batch(msgs=messages) log.info(f"Finished collecting {len(logs)} logs")
- (Optional) implement a new transaction processor for the new DataCollectionMode in app/consumer/tx_processor.py
- Create new class for GetLogsTransactionProcessor
- Update self.tx_processorsinDataConsumerto use this newGetLogsTransactionProcessor
- Update the JSON config and run the app
Support more blockchains
TODO: