diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index db066f5b..9a448d98 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -441,7 +441,7 @@ where if self.store.is_forward_synced() && misc::slots_since_epoch_start::

(tick.slot) == 0 { if tick.kind == TickKind::AttestFourth { - self.prune_old_blob_sidecars()?; + self.prune_old_records()?; } if let Some(metrics) = self.metrics.as_ref() { @@ -2336,27 +2336,50 @@ where Ok(()) } - fn prune_old_blob_sidecars(&self) -> Result<()> { + fn prune_old_records(&self) -> Result<()> { let storage = self.storage.clone_arc(); - let current_epoch = misc::compute_epoch_at_slot::

(self.store.slot()); - let up_to_epoch = current_epoch.saturating_sub( - self.store - .chain_config() - .min_epochs_for_blob_sidecars_requests, - ); - let up_to_slot = misc::compute_start_slot_at_epoch::

(up_to_epoch); + let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch(); + let blobs_up_to_slot = misc::compute_start_slot_at_epoch::

(blobs_up_to_epoch); + let blocks_up_to_epoch = self.store.min_checked_block_availability_epoch(); + let blocks_up_to_slot = misc::compute_start_slot_at_epoch::

(blocks_up_to_epoch); Builder::new() - .name("old-blob-pruner".to_owned()) + .name("old-data-pruner".to_owned()) .spawn(move || { - debug!("pruning old blob sidecards from storage up to slot {up_to_slot}…"); + debug!("pruning old blob sidecars from storage up to slot {blobs_up_to_slot}…"); + + match storage.prune_old_blob_sidecars(blobs_up_to_slot) { + Ok(()) => { + debug!( + "pruned old blob sidecars from storage up to slot {blobs_up_to_slot}" + ); + } + Err(error) => { + error!("pruning old blob sidecars from storage failed: {error:?}") + } + } + + debug!("pruning old blocks and states from storage up to slot {blocks_up_to_slot}…"); + + match storage.prune_old_blocks_and_states(blocks_up_to_slot) { + Ok(()) => { + debug!( + "pruned old blocks and states from storage up to slot {blocks_up_to_slot}" + ); + } + Err(error) => { + error!("pruning old blocks and states from storage failed: {error:?}") + } + } + + debug!("pruning old state roots from storage up to slot {blocks_up_to_slot}…"); - match storage.prune_old_blob_sidecars(up_to_slot) { + match storage.prune_old_state_roots(blocks_up_to_slot) { Ok(()) => { - debug!("pruned old blob sidecards from storage up to slot {up_to_slot}"); + debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}"); } Err(error) => { - error!("pruning old blob sidecards from storage failed: {error:?}") + error!("pruning old state roots from storage failed: {error:?}") } } })?; diff --git a/fork_choice_control/src/queries.rs b/fork_choice_control/src/queries.rs index 1b35ef28..bac7e082 100644 --- a/fork_choice_control/src/queries.rs +++ b/fork_choice_control/src/queries.rs @@ -554,6 +554,14 @@ where storage: self.storage(), } } + + pub fn min_checked_block_availability_epoch(&self) -> Epoch { + self.store_snapshot().min_checked_block_availability_epoch() + } + + pub fn min_checked_data_availability_epoch(&self) -> Epoch { + self.store_snapshot().min_checked_data_availability_epoch() + } } #[cfg(test)] diff --git a/fork_choice_control/src/storage.rs b/fork_choice_control/src/storage.rs index f4a0c947..3fbf78ab 100644 --- a/fork_choice_control/src/storage.rs +++ b/fork_choice_control/src/storage.rs @@ -397,6 +397,71 @@ impl Storage

{ Ok(()) } + pub(crate) fn prune_old_blocks_and_states(&self, up_to_slot: Slot) -> Result<()> { + let mut block_roots_to_remove = vec![]; + let mut keys_to_remove = vec![]; + + let results = self + .database + .iterator_descending(..=BlockRootBySlot(up_to_slot.saturating_sub(1)).to_string())?; + + for result in results { + let (key_bytes, value_bytes) = result?; + + if !BlockRootBySlot::has_prefix(&key_bytes) { + break; + } + + block_roots_to_remove.push(H256::from_ssz_default(value_bytes)?); + keys_to_remove.push(key_bytes.into_owned()); + } + + for block_root in block_roots_to_remove { + let key = FinalizedBlockByRoot(block_root).to_string(); + self.database.delete(key)?; + + let key = UnfinalizedBlockByRoot(block_root).to_string(); + self.database.delete(key)?; + + let key = StateByBlockRoot(block_root).to_string(); + self.database.delete(key)?; + } + + for key in keys_to_remove { + self.database.delete(key)?; + } + + Ok(()) + } + + pub(crate) fn prune_old_state_roots(&self, up_to_slot: Slot) -> Result<()> { + let mut keys_to_remove = vec![]; + + let results = self + .database + .iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?; + + for result in results { + let (key_bytes, value_bytes) = result?; + + if !SlotByStateRoot::has_prefix(&key_bytes) { + break; + } + + let slot = Slot::from_ssz_default(value_bytes)?; + + if slot < up_to_slot { + keys_to_remove.push(key_bytes.into_owned()); + } + } + + for key in keys_to_remove { + self.database.delete(key)?; + } + + Ok(()) + } + pub(crate) fn checkpoint_state_slot(&self) -> Result> { if let Some(StateCheckpoint { head_slot, .. }) = self.load_state_checkpoint()? { return Ok(Some(head_slot)); @@ -704,6 +769,18 @@ impl Storage

{ #[cfg(test)] impl Storage

{ + pub fn block_root_by_slot_count(&self) -> Result { + let results = self + .database + .iterator_ascending(BlockRootBySlot(0).to_string()..)?; + + itertools::process_results(results, |pairs| { + pairs + .take_while(|(key_bytes, _)| BlockRootBySlot::has_prefix(key_bytes)) + .count() + }) + } + pub fn finalized_block_count(&self) -> Result { let results = self .database @@ -719,10 +796,34 @@ impl Storage

{ }) } + pub fn unfinalized_block_count(&self) -> Result { + let results = self + .database + .iterator_ascending(UnfinalizedBlockByRoot(H256::zero()).to_string()..)?; + + itertools::process_results(results, |pairs| { + pairs + .take_while(|(key_bytes, _)| UnfinalizedBlockByRoot::has_prefix(key_bytes)) + .count() + }) + } + + pub fn slot_by_state_root_count(&self) -> Result { + let results = self + .database + .iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?; + + itertools::process_results(results, |pairs| { + pairs + .take_while(|(key_bytes, _)| SlotByStateRoot::has_prefix(key_bytes)) + .count() + }) + } + pub fn slot_by_blob_id_count(&self) -> Result { let results = self .database - .iterator_ascending((H256::zero()).to_string()..)?; + .iterator_ascending(SlotBlobId(0, H256::zero(), 0).to_string()..)?; itertools::process_results(results, |pairs| { pairs @@ -731,14 +832,26 @@ impl Storage

{ }) } + pub fn state_count(&self) -> Result { + let results = self + .database + .iterator_ascending(StateByBlockRoot(H256::zero()).to_string()..)?; + + itertools::process_results(results, |pairs| { + pairs + .take_while(|(key_bytes, _)| StateByBlockRoot::has_prefix(key_bytes)) + .count() + }) + } + pub fn blob_sidecar_by_blob_id_count(&self) -> Result { let results = self .database - .iterator_ascending((H256::zero()).to_string()..)?; + .iterator_ascending(BlobSidecarByBlobId(H256::zero(), 0).to_string()..)?; itertools::process_results(results, |pairs| { pairs - .filter(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes)) + .take_while(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes)) .count() }) } @@ -931,10 +1044,70 @@ pub fn serialize(key: impl Display, value: impl SszWrite) -> Result<(String, Vec mod tests { use bytesize::ByteSize; use tempfile::TempDir; - use types::preset::Mainnet; + use types::{ + phase0::containers::SignedBeaconBlock as Phase0SignedBeaconBlock, preset::Mainnet, + }; use super::*; + #[test] + fn test_prune_old_blocks_and_states() -> Result<()> { + let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(10), false)?; + let block = SignedBeaconBlock::::Phase0(Phase0SignedBeaconBlock::default()); + + database.put_batch(vec![ + // Slot 1 + serialize(BlockRootBySlot(1), H256::repeat_byte(1))?, + serialize(FinalizedBlockByRoot(H256::repeat_byte(1)), &block)?, + serialize(UnfinalizedBlockByRoot(H256::repeat_byte(1)), &block)?, + serialize(SlotByStateRoot(H256::repeat_byte(1)), 1_u64)?, + serialize(StateByBlockRoot(H256::repeat_byte(1)), 1_u64)?, + // Slot 3 + serialize(BlockRootBySlot(3), H256::repeat_byte(3))?, + serialize(FinalizedBlockByRoot(H256::repeat_byte(3)), &block)?, + // Slot 5 + serialize(BlockRootBySlot(5), H256::repeat_byte(5))?, + serialize(UnfinalizedBlockByRoot(H256::repeat_byte(5)), &block)?, + //Slot 6 + serialize(BlockRootBySlot(6), H256::repeat_byte(6))?, + serialize(UnfinalizedBlockByRoot(H256::repeat_byte(6)), &block)?, + serialize(SlotByStateRoot(H256::repeat_byte(6)), 6_u64)?, + serialize(StateByBlockRoot(H256::repeat_byte(6)), 6_u64)?, + // Slot 10, test case that "10" < "3" is not true + serialize(BlockRootBySlot(10), H256::repeat_byte(10))?, + serialize(UnfinalizedBlockByRoot(H256::repeat_byte(10)), &block)?, + serialize(SlotByStateRoot(H256::repeat_byte(10)), 10_u64)?, + serialize(StateByBlockRoot(H256::repeat_byte(10)), 10_u64)?, + ])?; + + let storage = Storage::::new( + Arc::new(Config::mainnet()), + database, + nonzero!(64_u64), + true, + ); + + assert_eq!(storage.finalized_block_count()?, 2); + assert_eq!(storage.unfinalized_block_count()?, 4); + assert_eq!(storage.block_root_by_slot_count()?, 5); + assert_eq!(storage.slot_by_state_root_count()?, 3); + assert_eq!(storage.state_count()?, 3); + + storage.prune_old_blocks_and_states(5)?; + + assert_eq!(storage.finalized_block_count()?, 0); + assert_eq!(storage.unfinalized_block_count()?, 3); + assert_eq!(storage.block_root_by_slot_count()?, 3); + assert_eq!(storage.slot_by_state_root_count()?, 3); + assert_eq!(storage.state_count()?, 2); + + storage.prune_old_state_roots(5)?; + + assert_eq!(storage.slot_by_state_root_count()?, 2); + + Ok(()) + } + #[test] #[expect(clippy::similar_names)] fn test_prune_old_blob_sidecars() -> Result<()> { diff --git a/fork_choice_store/src/store.rs b/fork_choice_store/src/store.rs index f1ebce3b..f6f5a2ba 100644 --- a/fork_choice_store/src/store.rs +++ b/fork_choice_store/src/store.rs @@ -3070,15 +3070,24 @@ impl Store

{ self.blob_cache.unpersisted_blob_sidecars() } - pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool { - let min_checked_epoch = self.chain_config.deneb_fork_epoch.max( + pub fn min_checked_block_availability_epoch(&self) -> Epoch { + self.tick + .epoch::

() + .checked_sub(self.chain_config.min_epochs_for_block_requests) + .unwrap_or(GENESIS_EPOCH) + } + + pub fn min_checked_data_availability_epoch(&self) -> Epoch { + self.chain_config.deneb_fork_epoch.max( self.tick .epoch::

() .checked_sub(self.chain_config.min_epochs_for_blob_sidecars_requests) .unwrap_or(GENESIS_EPOCH), - ); + ) + } - misc::compute_epoch_at_slot::

(slot) >= min_checked_epoch + pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool { + misc::compute_epoch_at_slot::

(slot) >= self.min_checked_data_availability_epoch() } pub fn state_cache(&self) -> Arc> { diff --git a/grandine-snapshot-tests b/grandine-snapshot-tests index 74846d6a..16a40dfb 160000 --- a/grandine-snapshot-tests +++ b/grandine-snapshot-tests @@ -1 +1 @@ -Subproject commit 74846d6ac73b34f58521298661bc2d02820b8ccb +Subproject commit 16a40dfb92c1fbb629802a592779bcd9671f06fc diff --git a/types/src/config.rs b/types/src/config.rs index cdbc44b4..c9ddb01c 100644 --- a/types/src/config.rs +++ b/types/src/config.rs @@ -143,6 +143,8 @@ pub struct Config { #[serde(with = "serde_utils::string_or_native")] pub min_epochs_for_blob_sidecars_requests: u64, #[serde(with = "serde_utils::string_or_native")] + pub min_epochs_for_block_requests: u64, + #[serde(with = "serde_utils::string_or_native")] pub blob_sidecar_subnet_count: NonZeroU64, #[serde(with = "serde_utils::string_or_native")] pub data_column_sidecar_subnet_count: u64, @@ -240,6 +242,7 @@ impl Default for Config { max_request_blob_sidecars: 768, max_request_data_column_sidecars: 0x4000, min_epochs_for_blob_sidecars_requests: 4096, + min_epochs_for_block_requests: 33024, blob_sidecar_subnet_count: nonzero!(6_u64), data_column_sidecar_subnet_count: 64, @@ -332,6 +335,9 @@ impl Config { deposit_contract_address: H160(hex!("1234567890123456789012345678901234567890")), deposit_network_id: 5, + // Networking + min_epochs_for_block_requests: 272, + ..Self::default() } }