Skip to content

Commit

Permalink
refactor(prune-types/prune): move PruneLimiter to reth-prune (parad…
Browse files Browse the repository at this point in the history
  • Loading branch information
lean-apple committed Dec 11, 2024
1 parent cedf7d8 commit 0bb8715
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 82 deletions.
2 changes: 1 addition & 1 deletion crates/prune/prune/src/db_ext.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{fmt::Debug, ops::RangeBounds};

use crate::PruneLimiter;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, RangeWalker},
table::{Table, TableRow},
transaction::DbTxMut,
DatabaseError,
};
use reth_prune_types::PruneLimiter;
use tracing::debug;

pub(crate) trait DbTxPruneExt: DbTxMut {
Expand Down
2 changes: 2 additions & 0 deletions crates/prune/prune/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
mod builder;
mod db_ext;
mod error;
mod limiter;
mod metrics;
mod pruner;
pub mod segments;

use crate::metrics::Metrics;
pub use builder::PrunerBuilder;
pub use error::PrunerError;
pub use limiter::PruneLimiter;
pub use pruner::{Pruner, PrunerResult, PrunerWithFactory, PrunerWithResult};

// Re-export prune types
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use reth_prune_types::{PruneInterruptReason, PruneProgress};
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
Expand Down Expand Up @@ -119,6 +120,30 @@ impl PruneLimiter {
pub fn is_limit_reached(&self) -> bool {
self.is_deleted_entries_limit_reached() || self.is_time_limit_reached()
}

/// Creates new [`PruneInterruptReason`] based on the limiter's state.
pub fn interrupt_reason(&self) -> PruneInterruptReason {
if self.is_time_limit_reached() {
PruneInterruptReason::Timeout
} else if self.is_deleted_entries_limit_reached() {
PruneInterruptReason::DeletedEntriesLimitReached
} else {
PruneInterruptReason::Unknown
}
}

/// Creates new [`PruneProgress`].
///
/// If `done == true`, returns [`PruneProgress::Finished`], otherwise
/// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the
/// limiter's state.
pub fn progress(&self, done: bool) -> PruneProgress {
if done {
PruneProgress::Finished
} else {
PruneProgress::HasMoreData(self.interrupt_reason())
}
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
use crate::{
segments::{PruneInput, Segment},
Metrics, PrunerError, PrunerEvent,
Metrics, PruneLimiter, PrunerError, PrunerEvent,
};
use alloy_primitives::BlockNumber;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_prune_types::{PruneLimiter, PruneProgress, PrunedSegmentInfo, PrunerOutput};
use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
use reth_tokio_util::{EventSender, EventStream};
use std::time::{Duration, Instant};
use tokio::sync::watch;
Expand Down
6 changes: 2 additions & 4 deletions crates/prune/prune/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ mod set;
mod static_file;
mod user;

use crate::PrunerError;
use crate::{PruneLimiter, PrunerError};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
use reth_prune_types::{
PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
pub use set::SegmentSet;
pub use static_file::{
Headers as StaticFileHeaders, Receipts as StaticFileReceipts,
Expand Down
10 changes: 4 additions & 6 deletions crates/prune/prune/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use reth_provider::{
errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
PruneCheckpointWriter, TransactionsProvider,
};
use reth_prune_types::{
PruneCheckpoint, PruneProgress, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
use tracing::trace;

pub(crate) fn prune<Provider>(
Expand Down Expand Up @@ -56,7 +54,7 @@ where
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand All @@ -83,7 +81,7 @@ pub(crate) fn save_checkpoint(

#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, SegmentOutput};
use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
Expand All @@ -93,7 +91,7 @@ mod tests {
use reth_db::tables;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
Expand Down
14 changes: 7 additions & 7 deletions crates/prune/prune/src/segments/static_file/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::num::NonZeroUsize;
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
PruneLimiter, PrunerError,
};
use alloy_primitives::BlockNumber;
use itertools::Itertools;
Expand All @@ -14,8 +14,7 @@ use reth_db::{
};
use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
use reth_prune_types::{
PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
Expand Down Expand Up @@ -92,7 +91,7 @@ impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Prov
}

let done = last_pruned_block == Some(block_range_end);
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand Down Expand Up @@ -195,7 +194,8 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256, U256};
use assert_matches::assert_matches;
Expand All @@ -206,8 +206,8 @@ mod tests {
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutputCheckpoint,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
SegmentOutputCheckpoint,
};
use reth_stages::test_utils::TestStageDB;
use reth_testing_utils::{generators, generators::random_header_range};
Expand Down
10 changes: 5 additions & 5 deletions crates/prune/prune/src/segments/static_file/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_provider::{
TransactionsProvider,
};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
Expand Down Expand Up @@ -76,7 +76,7 @@ where
// so we could finish pruning its transactions on the next run.
.checked_sub(if done { 0 } else { 1 });

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand All @@ -91,7 +91,7 @@ where

#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, Segment};
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
Expand All @@ -104,8 +104,8 @@ mod tests {
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutput,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
SegmentOutput,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
Expand Down
11 changes: 5 additions & 6 deletions crates/prune/prune/src/segments/user/account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::ShardedKey;
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
Expand Down Expand Up @@ -65,7 +64,7 @@ where
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
Expand Down Expand Up @@ -113,7 +112,7 @@ where
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand All @@ -130,14 +129,14 @@ where
mod tests {
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
Segment, SegmentOutput,
PruneLimiter, Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
Expand Down
10 changes: 5 additions & 5 deletions crates/prune/prune/src/segments/user/receipts_by_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use reth_provider::{
BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
SegmentOutput, MINIMUM_PRUNING_DISTANCE,
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
MINIMUM_PRUNING_DISTANCE,
};
use tracing::{instrument, trace};
#[derive(Debug)]
Expand Down Expand Up @@ -219,22 +219,22 @@ where
},
)?;

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput { progress, pruned, checkpoint: None })
}
}

#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, ReceiptsByLogs, Segment};
use crate::segments::{PruneInput, PruneLimiter, ReceiptsByLogs, Segment};
use alloy_primitives::B256;
use assert_matches::assert_matches;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_primitives_traits::InMemorySize;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
use reth_prune_types::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig};
use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
Expand Down
8 changes: 4 additions & 4 deletions crates/prune/prune/src/segments/user/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use tracing::{instrument, trace};

Expand Down Expand Up @@ -67,7 +67,7 @@ where
// previous, so we could finish pruning its transaction senders on the next run.
.checked_sub(if done { 0 } else { 1 });

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand All @@ -82,7 +82,7 @@ where

#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, Segment, SegmentOutput, SenderRecovery};
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
Expand All @@ -92,7 +92,7 @@ mod tests {
use reth_db::tables;
use reth_primitives_traits::SignedTransaction;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
use std::ops::Sub;
Expand Down
15 changes: 6 additions & 9 deletions crates/prune/prune/src/segments/user/storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use itertools::Itertools;
use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress};
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
SegmentOutputCheckpoint,
};
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};

Expand Down Expand Up @@ -65,7 +62,7 @@ where
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
Expand Down Expand Up @@ -118,7 +115,7 @@ where
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");

let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);

Ok(SegmentOutput {
progress,
Expand All @@ -134,14 +131,14 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
StorageHistory,
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
Expand Down
Loading

0 comments on commit 0bb8715

Please sign in to comment.