Skip to content

Commit

Permalink
Prune additional data from storage, add `min_epochs_for_block_request…
Browse files Browse the repository at this point in the history
…s` to `Config`
  • Loading branch information
Tumas committed Jan 22, 2025
1 parent eef7085 commit d556ebf
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 23 deletions.
51 changes: 37 additions & 14 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ where

if self.store.is_forward_synced() && misc::slots_since_epoch_start::<P>(tick.slot) == 0 {
if tick.kind == TickKind::AttestFourth {
self.prune_old_blob_sidecars()?;
self.prune_old_records()?;
}

if let Some(metrics) = self.metrics.as_ref() {
Expand Down Expand Up @@ -2336,27 +2336,50 @@ where
Ok(())
}

fn prune_old_blob_sidecars(&self) -> Result<()> {
fn prune_old_records(&self) -> Result<()> {
let storage = self.storage.clone_arc();
let current_epoch = misc::compute_epoch_at_slot::<P>(self.store.slot());
let up_to_epoch = current_epoch.saturating_sub(
self.store
.chain_config()
.min_epochs_for_blob_sidecars_requests,
);
let up_to_slot = misc::compute_start_slot_at_epoch::<P>(up_to_epoch);
let blobs_up_to_epoch = self.store.min_checked_data_availability_epoch();
let blobs_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blobs_up_to_epoch);
let blocks_up_to_epoch = self.store.min_checked_block_availability_epoch();
let blocks_up_to_slot = misc::compute_start_slot_at_epoch::<P>(blocks_up_to_epoch);

Builder::new()
.name("old-blob-pruner".to_owned())
.name("old-data-pruner".to_owned())
.spawn(move || {
debug!("pruning old blob sidecards from storage up to slot {up_to_slot}…");
debug!("pruning old blob sidecars from storage up to slot {blobs_up_to_slot}…");

match storage.prune_old_blob_sidecars(blobs_up_to_slot) {
Ok(()) => {
debug!(
"pruned old blob sidecars from storage up to slot {blobs_up_to_slot}"
);
}
Err(error) => {
error!("pruning old blob sidecars from storage failed: {error:?}")
}
}

debug!("pruning old blocks and states from storage up to slot {blocks_up_to_slot}…");

match storage.prune_old_blocks_and_states(blocks_up_to_slot) {
Ok(()) => {
debug!(
"pruned old blocks and states from storage up to slot {blocks_up_to_slot}"
);
}
Err(error) => {
error!("pruning old blocks and states from storage failed: {error:?}")
}
}

debug!("pruning old state roots from storage up to slot {blocks_up_to_slot}…");

match storage.prune_old_blob_sidecars(up_to_slot) {
match storage.prune_old_state_roots(blocks_up_to_slot) {
Ok(()) => {
debug!("pruned old blob sidecards from storage up to slot {up_to_slot}");
debug!("pruned old state roots from storage up to slot {blocks_up_to_slot}");
}
Err(error) => {
error!("pruning old blob sidecards from storage failed: {error:?}")
error!("pruning old state roots from storage failed: {error:?}")
}
}
})?;
Expand Down
8 changes: 8 additions & 0 deletions fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,14 @@ where
storage: self.storage(),
}
}

pub fn min_checked_block_availability_epoch(&self) -> Epoch {
self.store_snapshot().min_checked_block_availability_epoch()
}

pub fn min_checked_data_availability_epoch(&self) -> Epoch {
self.store_snapshot().min_checked_data_availability_epoch()
}
}

#[cfg(test)]
Expand Down
181 changes: 177 additions & 4 deletions fork_choice_control/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,71 @@ impl<P: Preset> Storage<P> {
Ok(())
}

pub(crate) fn prune_old_blocks_and_states(&self, up_to_slot: Slot) -> Result<()> {
let mut block_roots_to_remove = vec![];
let mut keys_to_remove = vec![];

let results = self
.database
.iterator_descending(..=BlockRootBySlot(up_to_slot.saturating_sub(1)).to_string())?;

for result in results {
let (key_bytes, value_bytes) = result?;

if !BlockRootBySlot::has_prefix(&key_bytes) {
break;
}

block_roots_to_remove.push(H256::from_ssz_default(value_bytes)?);
keys_to_remove.push(key_bytes.into_owned());
}

for block_root in block_roots_to_remove {
let key = FinalizedBlockByRoot(block_root).to_string();
self.database.delete(key)?;

let key = UnfinalizedBlockByRoot(block_root).to_string();
self.database.delete(key)?;

let key = StateByBlockRoot(block_root).to_string();
self.database.delete(key)?;
}

for key in keys_to_remove {
self.database.delete(key)?;
}

Ok(())
}

pub(crate) fn prune_old_state_roots(&self, up_to_slot: Slot) -> Result<()> {
let mut keys_to_remove = vec![];

let results = self
.database
.iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?;

for result in results {
let (key_bytes, value_bytes) = result?;

if !SlotByStateRoot::has_prefix(&key_bytes) {
break;
}

let slot = Slot::from_ssz_default(value_bytes)?;

if slot < up_to_slot {
keys_to_remove.push(key_bytes.into_owned());
}
}

for key in keys_to_remove {
self.database.delete(key)?;
}

Ok(())
}

pub(crate) fn checkpoint_state_slot(&self) -> Result<Option<Slot>> {
if let Some(StateCheckpoint { head_slot, .. }) = self.load_state_checkpoint()? {
return Ok(Some(head_slot));
Expand Down Expand Up @@ -704,6 +769,18 @@ impl<P: Preset> Storage<P> {

#[cfg(test)]
impl<P: Preset> Storage<P> {
pub fn block_root_by_slot_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending(BlockRootBySlot(0).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
.take_while(|(key_bytes, _)| BlockRootBySlot::has_prefix(key_bytes))
.count()
})
}

pub fn finalized_block_count(&self) -> Result<usize> {
let results = self
.database
Expand All @@ -719,10 +796,34 @@ impl<P: Preset> Storage<P> {
})
}

pub fn unfinalized_block_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending(UnfinalizedBlockByRoot(H256::zero()).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
.take_while(|(key_bytes, _)| UnfinalizedBlockByRoot::has_prefix(key_bytes))
.count()
})
}

pub fn slot_by_state_root_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
.take_while(|(key_bytes, _)| SlotByStateRoot::has_prefix(key_bytes))
.count()
})
}

pub fn slot_by_blob_id_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending((H256::zero()).to_string()..)?;
.iterator_ascending(SlotBlobId(0, H256::zero(), 0).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
Expand All @@ -731,14 +832,26 @@ impl<P: Preset> Storage<P> {
})
}

pub fn state_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending(StateByBlockRoot(H256::zero()).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
.take_while(|(key_bytes, _)| StateByBlockRoot::has_prefix(key_bytes))
.count()
})
}

pub fn blob_sidecar_by_blob_id_count(&self) -> Result<usize> {
let results = self
.database
.iterator_ascending((H256::zero()).to_string()..)?;
.iterator_ascending(BlobSidecarByBlobId(H256::zero(), 0).to_string()..)?;

itertools::process_results(results, |pairs| {
pairs
.filter(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes))
.take_while(|(key_bytes, _)| BlobSidecarByBlobId::has_prefix(key_bytes))
.count()
})
}
Expand Down Expand Up @@ -931,10 +1044,70 @@ pub fn serialize(key: impl Display, value: impl SszWrite) -> Result<(String, Vec
mod tests {
use bytesize::ByteSize;
use tempfile::TempDir;
use types::preset::Mainnet;
use types::{
phase0::containers::SignedBeaconBlock as Phase0SignedBeaconBlock, preset::Mainnet,
};

use super::*;

#[test]
fn test_prune_old_blocks_and_states() -> Result<()> {
let database = Database::persistent("test_db", TempDir::new()?, ByteSize::mib(10), false)?;
let block = SignedBeaconBlock::<Mainnet>::Phase0(Phase0SignedBeaconBlock::default());

database.put_batch(vec![
// Slot 1
serialize(BlockRootBySlot(1), H256::repeat_byte(1))?,
serialize(FinalizedBlockByRoot(H256::repeat_byte(1)), &block)?,
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(1)), &block)?,
serialize(SlotByStateRoot(H256::repeat_byte(1)), 1_u64)?,
serialize(StateByBlockRoot(H256::repeat_byte(1)), 1_u64)?,
// Slot 3
serialize(BlockRootBySlot(3), H256::repeat_byte(3))?,
serialize(FinalizedBlockByRoot(H256::repeat_byte(3)), &block)?,
// Slot 5
serialize(BlockRootBySlot(5), H256::repeat_byte(5))?,
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(5)), &block)?,
//Slot 6
serialize(BlockRootBySlot(6), H256::repeat_byte(6))?,
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(6)), &block)?,
serialize(SlotByStateRoot(H256::repeat_byte(6)), 6_u64)?,
serialize(StateByBlockRoot(H256::repeat_byte(6)), 6_u64)?,
// Slot 10, test case that "10" < "3" is not true
serialize(BlockRootBySlot(10), H256::repeat_byte(10))?,
serialize(UnfinalizedBlockByRoot(H256::repeat_byte(10)), &block)?,
serialize(SlotByStateRoot(H256::repeat_byte(10)), 10_u64)?,
serialize(StateByBlockRoot(H256::repeat_byte(10)), 10_u64)?,
])?;

let storage = Storage::<Mainnet>::new(
Arc::new(Config::mainnet()),
database,
nonzero!(64_u64),
true,
);

assert_eq!(storage.finalized_block_count()?, 2);
assert_eq!(storage.unfinalized_block_count()?, 4);
assert_eq!(storage.block_root_by_slot_count()?, 5);
assert_eq!(storage.slot_by_state_root_count()?, 3);
assert_eq!(storage.state_count()?, 3);

storage.prune_old_blocks_and_states(5)?;

assert_eq!(storage.finalized_block_count()?, 0);
assert_eq!(storage.unfinalized_block_count()?, 3);
assert_eq!(storage.block_root_by_slot_count()?, 3);
assert_eq!(storage.slot_by_state_root_count()?, 3);
assert_eq!(storage.state_count()?, 2);

storage.prune_old_state_roots(5)?;

assert_eq!(storage.slot_by_state_root_count()?, 2);

Ok(())
}

#[test]
#[expect(clippy::similar_names)]
fn test_prune_old_blob_sidecars() -> Result<()> {
Expand Down
17 changes: 13 additions & 4 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3070,15 +3070,24 @@ impl<P: Preset> Store<P> {
self.blob_cache.unpersisted_blob_sidecars()
}

pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool {
let min_checked_epoch = self.chain_config.deneb_fork_epoch.max(
pub fn min_checked_block_availability_epoch(&self) -> Epoch {
self.tick
.epoch::<P>()
.checked_sub(self.chain_config.min_epochs_for_block_requests)
.unwrap_or(GENESIS_EPOCH)
}

pub fn min_checked_data_availability_epoch(&self) -> Epoch {
self.chain_config.deneb_fork_epoch.max(
self.tick
.epoch::<P>()
.checked_sub(self.chain_config.min_epochs_for_blob_sidecars_requests)
.unwrap_or(GENESIS_EPOCH),
);
)
}

misc::compute_epoch_at_slot::<P>(slot) >= min_checked_epoch
pub fn should_check_data_availability_at_slot(&self, slot: Slot) -> bool {
misc::compute_epoch_at_slot::<P>(slot) >= self.min_checked_data_availability_epoch()
}

pub fn state_cache(&self) -> Arc<StateCacheProcessor<P>> {
Expand Down
6 changes: 6 additions & 0 deletions types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub struct Config {
#[serde(with = "serde_utils::string_or_native")]
pub min_epochs_for_blob_sidecars_requests: u64,
#[serde(with = "serde_utils::string_or_native")]
pub min_epochs_for_block_requests: u64,
#[serde(with = "serde_utils::string_or_native")]
pub blob_sidecar_subnet_count: NonZeroU64,
#[serde(with = "serde_utils::string_or_native")]
pub data_column_sidecar_subnet_count: u64,
Expand Down Expand Up @@ -240,6 +242,7 @@ impl Default for Config {
max_request_blob_sidecars: 768,
max_request_data_column_sidecars: 0x4000,
min_epochs_for_blob_sidecars_requests: 4096,
min_epochs_for_block_requests: 33024,
blob_sidecar_subnet_count: nonzero!(6_u64),
data_column_sidecar_subnet_count: 64,

Expand Down Expand Up @@ -332,6 +335,9 @@ impl Config {
deposit_contract_address: H160(hex!("1234567890123456789012345678901234567890")),
deposit_network_id: 5,

// Networking
min_epochs_for_block_requests: 272,

..Self::default()
}
}
Expand Down

0 comments on commit d556ebf

Please sign in to comment.