Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(pruner): delete history indices by changeset keys #9312

Merged
merged 16 commits into from
Jul 8, 2024
Merged
26 changes: 17 additions & 9 deletions crates/prune/prune/src/segments/account_history.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::HashMap;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks boss

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you don't leave the white space, then std and core libs are ordered alphabetically with rest of deps. it's more mature rust to put std and core deps up top, for developers who care ab std/no-std code - which includes us since we included the no-std option in several crates, like reth-evm

Copy link
Collaborator Author

@shekhirin shekhirin Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I disagree, I also prefer to have all imports in one place ordered alphabetically.

for developers who care ab std/no-std code

#![no_std] exists for this reason, and developers shouldn't judge the crate no_std guarantees purely by deps it uses

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we really seem to not have any consistency with it 😅

reth git:(alexey/history-pruning-perf) ✗ rg --multiline --type rust 'use std.*;\nuse' | wc -l
     204reth git:(alexey/history-pruning-perf) ✗ rg --multiline --type rust 'use std.*;\n\nuse' | wc -l
     132

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't look like a rule that everyone follows even in core Rust team https://github.com/rust-lang/rust/blob/59a4f02f836f74c4cf08f47d76c9f6069a2f8276/compiler/rustc_ast/src/ast.rs#L21-L42

anyway, there are no std import in this file now, and I believe with #9141 (comment) we will have this as a fmt rule

use crate::{
segments::{
history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment,
},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{database::Database, models::ShardedKey};
use reth_provider::DatabaseProviderRW;
Expand Down Expand Up @@ -63,27 +66,32 @@ impl<DB: Database> Segment<DB> for AccountHistory {
))
}

let mut deleted_keys = HashMap::new();
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::AccountChangeSets>(
range,
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0),
|(block_number, value)| {
let highest_block = deleted_keys.entry(value.address).or_insert(0);
if block_number > *highest_block {
*highest_block = block_number;
}
last_changeset_pruned_block = Some(block_number);
},
)?;
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let deleted_keys = deleted_keys
.into_iter()
.sorted_unstable()
.map(|(address, block_number)| ShardedKey::new(address, block_number));
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");

let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account account changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);

let (processed, pruned_indices) = prune_history_indices::<DB, tables::AccountsHistory, _>(
provider,
last_changeset_pruned_block,
deleted_keys,
|a, b| a.key == b.key,
|key| ShardedKey::last(key.key),
)?;
trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)");

Expand Down
180 changes: 112 additions & 68 deletions crates/prune/prune/src/segments/history.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_primitives::BlockNumber;
use reth_db::BlockNumberList;
use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
Expand All @@ -10,103 +10,147 @@ use reth_db_api::{
};
use reth_provider::DatabaseProviderRW;

/// Prune history indices up to the provided block, inclusive.
/// Prune history indices according to the provided list of deleted changesets.
///
/// Returns total number of processed (walked) and deleted entities.
pub(crate) fn prune_history_indices<DB, T, SK>(
provider: &DatabaseProviderRW<DB>,
to_block: BlockNumber,
deleted_changesets: impl IntoIterator<Item = T::Key>,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
last_key: impl Fn(&T::Key) -> T::Key,
) -> Result<(usize, usize), DatabaseError>
where
DB: Database,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
SK: PartialEq,
{
let mut processed = 0;
let mut deleted = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;
let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;

// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
// 2. If the shard has `highest_block_number` greater than the target block number for
// pruning, filter block numbers inside the shard which are less than the target
// block number for pruning.
while let Some(result) = cursor.next()? {
let (key, blocks): (T::Key, BlockNumberList) = result;
for changeset_key in deleted_changesets {
// Seek to the shard that has the key >= the given changeset key
// TODO: optimize
let result = cursor
.seek(RawKey::new(changeset_key.clone()))?
.map(|(key, value)| Result::<_, DatabaseError>::Ok((key.key()?, value)))
.transpose()?;

// Get the highest block number that needs to be deleted for this changeset key
let to_block = changeset_key.as_ref().highest_block_number;

// If such shard doesn't exist, skip to the next changeset key
if result.as_ref().map_or(true, |(key, _)| !key_matches(key, &changeset_key)) {
continue
}

// If shard consists only of block numbers less than the target one, delete shard
// completely.
if key.as_ref().highest_block_number <= to_block {
cursor.delete_current()?;
deleted += 1;
if key.as_ref().highest_block_number == to_block {
// Shard contains only block numbers up to the target one, so we can skip to
// the last shard for this key. It is guaranteed that further shards for this
// sharded key will not contain the target block number, as it's in this shard.
cursor.seek_exact(last_key(&key))?;
// At this point, we're sure that the shard with the given changeset key exists
let (key, raw_blocks): (T::Key, RawValue<BlockNumberList>) = result.unwrap();

deleted += prune_shard(&mut cursor, key, raw_blocks, to_block, &key_matches)?;

while let Some((key, value)) = cursor
.next()?
.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
.transpose()?
{
if key_matches(&key, &changeset_key) {
deleted += prune_shard(&mut cursor, key, value, to_block, &key_matches)?;
} else {
break
}
}
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
else {
let higher_blocks =
blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();

// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if blocks.len() as usize != higher_blocks.len() {
// If there will be no more blocks in the shard after pruning blocks below target
// block, we need to remove it, as empty shards are not allowed.
if higher_blocks.is_empty() {
if key.as_ref().highest_block_number == u64::MAX {
let prev_row = cursor.prev()?;
match prev_row {
// If current shard is the last shard for the sharded key that
// has previous shards, replace it with the previous shard.
Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
cursor.delete_current()?;
deleted += 1;
// Upsert will replace the last shard for this sharded key with
// the previous value.
cursor.upsert(key.clone(), prev_value)?;
}
// If there's no previous shard for this sharded key,
// just delete last shard completely.
_ => {
// If we successfully moved the cursor to a previous row,
// jump to the original last shard.
if prev_row.is_some() {
cursor.next()?;
}
// Delete shard.
cursor.delete_current()?;
deleted += 1;
processed += 1;
}

Ok((processed, deleted))
}

fn prune_shard<C, T, SK>(
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
cursor: &mut C,
key: T::Key,
raw_blocks: RawValue<T::Value>,
to_block: BlockNumber,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<usize, DatabaseError>
where
C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
SK: PartialEq,
{
let mut deleted = 0;

// If shard consists only of block numbers less than the target one, delete shard
// completely.
if key.as_ref().highest_block_number <= to_block {
cursor.delete_current()?;
deleted += 1;
}
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
else {
let blocks = raw_blocks.value()?;
let higher_blocks =
blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();

// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if blocks.len() as usize != higher_blocks.len() {
// If there will be no more blocks in the shard after pruning blocks below target
// block, we need to remove it, as empty shards are not allowed.
if higher_blocks.is_empty() {
if key.as_ref().highest_block_number == u64::MAX {
let prev_row = cursor
.prev()?
.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
.transpose()?;
match prev_row {
// If current shard is the last shard for the sharded key that
// has previous shards, replace it with the previous shard.
Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
cursor.delete_current()?;
deleted += 1;
// Upsert will replace the last shard for this sharded key with
// the previous value.
cursor.upsert(RawKey::new(key), prev_value)?;
}
// If there's no previous shard for this sharded key,
// just delete last shard completely.
_ => {
// If we successfully moved the cursor to a previous row,
// jump to the original last shard.
if prev_row.is_some() {
cursor.next()?;
}
// Delete shard.
cursor.delete_current()?;
deleted += 1;
}
}
// If current shard is not the last shard for this sharded key,
// just delete it.
else {
cursor.delete_current()?;
deleted += 1;
}
} else {
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(higher_blocks))?;
}
}

// Jump to the last shard for this key, if current key isn't already the last shard.
if key.as_ref().highest_block_number != u64::MAX {
cursor.seek_exact(last_key(&key))?;
// If current shard is not the last shard for this sharded key,
// just delete it.
else {
cursor.delete_current()?;
deleted += 1;
}
} else {
cursor.upsert(
RawKey::new(key),
RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
)?;
}
}

processed += 1;
}

Ok((processed, deleted))
Ok(deleted)
}
27 changes: 18 additions & 9 deletions crates/prune/prune/src/segments/storage_history.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::HashMap;

use crate::{
segments::{
history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment,
},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{
database::Database,
Expand Down Expand Up @@ -66,27 +69,33 @@ impl<DB: Database> Segment<DB> for StorageHistory {
))
}

let mut deleted_keys = HashMap::new();
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::StorageChangeSets>(
BlockNumberAddress::range(range),
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0.block_number()),
|(BlockNumberAddress((block_number, address)), entry)| {
let highest_block = deleted_keys.entry((address, entry.key)).or_insert(0);
if block_number > *highest_block {
*highest_block = block_number;
}
last_changeset_pruned_block = Some(block_number);
},
)?;
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let deleted_keys = deleted_keys.into_iter().sorted_unstable().map(
|((address, storage_key), block_number)| {
StorageShardedKey::new(address, storage_key, block_number)
},
);
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");

let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage storage changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);

let (processed, pruned_indices) = prune_history_indices::<DB, tables::StoragesHistory, _>(
provider,
last_changeset_pruned_block,
deleted_keys,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|key| StorageShardedKey::last(key.address, key.sharded_key.key),
)?;
trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)");

Expand Down
Loading