-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for reorged logs in eth_Logs Subscription #5851
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks,
I have a few suggestions to simplify this a bit:
- move the shared state into the ActiveFilter and not the rpc type
- no additional generics on the EthFilter type, not required because we want to spawn a new task that instead takes a clone of EthFilter
crates/rpc/rpc/src/eth/filter.rs
Outdated
/// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter | ||
pub async fn reorged_logs_stream( | ||
&self, | ||
pubsub: EthPubSub<Provider, Pool, Events, Network>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not exactly sure what this does.
this looks a bit convoluted.
how the on reorg logic should work is:
- listen for reorged blocks:
- acquire a lock on the shared state
- try to find matching filters, if so put the logs into the ActiveFilter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The temp_reorged_blocks stores consecutive removed blocks as they come. The matching logs for the given filter are stored in reverted_logs.
- If the hash of one of these blocks matches with the last polled hash of the filter, we know that the filter previously served all the blocks stored in temp_reorged_blocks. We can push the reverted logs in the reorged_logs vector along with the filter id.
- If the last polled hash does not match before the block subscription receives a newly included block, we can simply clear the temp_reorged_blocks.
c3e4d38
to
8dff31e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the delay,
this is great progress!
could you please rebase this so we can proceed here?
3fbd7d6
to
1cc3576
Compare
Please, could you review @mattsse ? Have made the requested changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
terribly sorry for the late review...
this is going in the right direction, now we can simplify a few more things. lmk if you need additional pointers
crates/rpc/rpc/src/eth/filter.rs
Outdated
provider: Provider, | ||
pool: Pool, | ||
eth_cache: EthStateCache, | ||
config: EthFilterConfig, | ||
task_spawner: Box<dyn TaskSpawner>, | ||
) -> Self { | ||
pubsub: EthPubSub<Provider, Pool, Events, Network>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this type, we only need events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain this further?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only need the events trait not the pubsub isntance
crates/rpc/rpc/src/eth/filter.rs
Outdated
if removed { | ||
temp_reorged_blocks.push(block_receipts); | ||
|
||
let filters = executor::block_on(self.active_filters().inner.lock()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no executor block,
this needs to be refactored so we can have async code
I suggest not do the flat_map stuff and instead move all of this to async code in the while loop
crates/rpc/rpc/src/eth/filter.rs
Outdated
|
||
filters.iter().for_each(|(id, active_filter)| { | ||
if let FilterKind::Log(ref filter) = active_filter.kind { | ||
let mut reverted_logs: Vec<Log> = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to check here first if the filter is even affected by this, by checking the block number and only check for removed logs if the filter is at the same height
crates/rpc/rpc/src/eth/filter.rs
Outdated
let mut filters = executor::block_on(self.active_filters().inner.lock()); | ||
let active_filter = | ||
filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap(); | ||
let mut guard = active_filter.reorged_logs.lock().await; | ||
if let Some(reorged_logs_vec) = guard.as_mut() { | ||
reorged_logs_vec.append(&mut logs); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we should handle the Reorg events, basically the flat map logic from below
crates/rpc/rpc/src/eth/filter.rs
Outdated
/// Last time this filter was polled. | ||
last_poll_timestamp: Instant, | ||
/// What kind of filter it is. | ||
kind: FilterKind, | ||
/// Reorged logs | ||
reorged_logs: Arc<Mutex<Option<Vec<Log>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we already lock the active filters set, we don't need a mutex here, instead we can push removed logs in here directly when iterating over all active filters
Added improved implementation for reorged logs subscription
7513fde
to
51d367a
Compare
Addressing issue #5677
Updated EthFilter to create new task update_reorged_logs that is spawned when the EthFilter is initialized. The new task reacts to reorged blocks taking a clone of EthPubSub, checks impacted log filters and stores the reorged logs in the log filter in a new field called reorged_logs.