Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for reorged logs in eth_Logs Subscription #5851

Closed
wants to merge 6 commits into from

Conversation

Arindam2407
Copy link
Contributor

Addressing issue #5677

Updated EthFilter to create new task update_reorged_logs that is spawned when the EthFilter is initialized. The new task reacts to reorged blocks taking a clone of EthPubSub, checks impacted log filters and stores the reorged logs in the log filter in a new field called reorged_logs.

@Arindam2407 Arindam2407 changed the title Reorged logs Support for reorged logs in eth_Logs Subscription Dec 23, 2023
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks,
I have a few suggestions to simplify this a bit:

  • move the shared state into the ActiveFilter and not the rpc type
  • no additional generics on the EthFilter type, not required because we want to spawn a new task that instead takes a clone of EthFilter

crates/rpc/rpc-types/Cargo.toml Outdated Show resolved Hide resolved
crates/rpc/rpc-types/src/eth/filter.rs Outdated Show resolved Hide resolved
crates/rpc/rpc/src/eth/filter.rs Outdated Show resolved Hide resolved
crates/rpc/rpc/src/eth/filter.rs Outdated Show resolved Hide resolved
Comment on lines 157 to 163
/// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter
pub async fn reorged_logs_stream(
&self,
pubsub: EthPubSub<Provider, Pool, Events, Network>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure what this does.

this looks a bit convoluted.

how the on reorg logic should work is:

  1. listen for reorged blocks:
  2. acquire a lock on the shared state
  3. try to find matching filters, if so put the logs into the ActiveFilter

Copy link
Contributor Author

@Arindam2407 Arindam2407 Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The temp_reorged_blocks stores consecutive removed blocks as they come. The matching logs for the given filter are stored in reverted_logs.
  • If the hash of one of these blocks matches with the last polled hash of the filter, we know that the filter previously served all the blocks stored in temp_reorged_blocks. We can push the reverted logs in the reorged_logs vector along with the filter id.
  • If the last polled hash does not match before the block subscription receives a newly included block, we can simply clear the temp_reorged_blocks.

crates/rpc/rpc/src/eth/filter.rs Show resolved Hide resolved
@mattsse mattsse added the A-rpc Related to the RPC implementation label Dec 27, 2023
@Arindam2407 Arindam2407 closed this Jan 1, 2024
@Arindam2407 Arindam2407 reopened this Jan 1, 2024
@Arindam2407 Arindam2407 requested a review from mattsse January 2, 2024 12:53
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delay,
this is great progress!

could you please rebase this so we can proceed here?

@Arindam2407
Copy link
Contributor Author

Please, could you review @mattsse ? Have made the requested changes

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terribly sorry for the late review...

this is going in the right direction, now we can simplify a few more things. lmk if you need additional pointers

provider: Provider,
pool: Pool,
eth_cache: EthStateCache,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
pubsub: EthPubSub<Provider, Pool, Events, Network>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this type, we only need events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain this further?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only need the events trait not the pubsub isntance

if removed {
temp_reorged_blocks.push(block_receipts);

let filters = executor::block_on(self.active_filters().inner.lock());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no executor block,

this needs to be refactored so we can have async code

I suggest not do the flat_map stuff and instead move all of this to async code in the while loop


filters.iter().for_each(|(id, active_filter)| {
if let FilterKind::Log(ref filter) = active_filter.kind {
let mut reverted_logs: Vec<Log> = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to check here first if the filter is even affected by this, by checking the block number and only check for removed logs if the filter is at the same height

Comment on lines 150 to 204
let mut filters = executor::block_on(self.active_filters().inner.lock());
let active_filter =
filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap();
let mut guard = active_filter.reorged_logs.lock().await;
if let Some(reorged_logs_vec) = guard.as_mut() {
reorged_logs_vec.append(&mut logs);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we should handle the Reorg events, basically the flat map logic from below

/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind,
/// Reorged logs
reorged_logs: Arc<Mutex<Option<Vec<Log>>>>,
Copy link
Collaborator

@mattsse mattsse Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we already lock the active filters set, we don't need a mutex here, instead we can push removed logs in here directly when iterating over all active filters

@Arindam2407 Arindam2407 reopened this Feb 8, 2024
@Arindam2407 Arindam2407 requested a review from mattsse February 8, 2024 18:13
@github-actions github-actions bot added the S-stale This issue/PR is stale and will close with no further activity label Mar 8, 2024
@github-actions github-actions bot closed this Mar 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-rpc Related to the RPC implementation S-stale This issue/PR is stale and will close with no further activity
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants