Skip to content

Commit

Permalink
Move data column topics to core topics, Fix data columns syncing
Browse files Browse the repository at this point in the history
* Move data column topics subscription to core topics

* Fix data columns syncing

* fix columns syncing

* refactor forward sync count for batch before fulu
  • Loading branch information
hangleang committed Feb 4, 2025
1 parent a84c15c commit 15db5ad
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 302 deletions.
2 changes: 1 addition & 1 deletion binary_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn initialize_logger(module_path: &str, always_write_style: bool) -> Result<
.filter_module("liveness_tracker", LevelFilter::Info)
.filter_module("metrics", LevelFilter::Info)
.filter_module("operation_pools", LevelFilter::Info)
.filter_module("p2p", LevelFilter::Info)
.filter_module("p2p", LevelFilter::Debug)
.filter_module("prometheus_metrics", LevelFilter::Info)
.filter_module("runtime", LevelFilter::Info)
.filter_module("signer", LevelFilter::Info)
Expand Down
3 changes: 1 addition & 2 deletions eip_7594/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,10 @@ pub fn recover_matrix(

let mut matrix = vec![];
for blob_index in 0..blob_count {
// TODO(feature/fulu): use cell reference, remove clone
let (cell_indices, cells): (Vec<_>, Vec<_>) = partial_matrix
.iter()
.filter(|&e| (e.row_index == blob_index as u64))
.map(|e| (e.column_index, e.cell.clone()))
.map(|e| (e.column_index, &e.cell))
.unzip();

let (recovered_cells, recovered_proofs) =
Expand Down
2 changes: 1 addition & 1 deletion eth2_libp2p
2 changes: 1 addition & 1 deletion fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub enum P2pMessage<P: Preset> {
BlockNeeded(H256, Option<PeerId>),
BlobsNeeded(Vec<BlobIdentifier>, Slot, Option<PeerId>),
DataColumnsNeeded(Vec<DataColumnIdentifier>, Slot, Option<PeerId>),
DataColumnReconstructed(Arc<DataColumnSidecar<P>>),
DataColumnReconstructed(Vec<Arc<DataColumnSidecar<P>>>),
FinalizedCheckpoint(Checkpoint),
HeadState(#[cfg_attr(test, derivative(Debug = "ignore"))] Arc<BeaconState<P>>),
}
Expand Down
25 changes: 12 additions & 13 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1505,27 +1505,26 @@ where
if !missing_indices.is_empty() {
let cells_and_kzg_proofs =
eip_7594::construct_cells_and_kzg_proofs(full_matrix, blob_count)?;
let columns_to_store =
let mut columns_to_store =
eip_7594::construct_data_column_sidecars(block, &cells_and_kzg_proofs, config)?
.into_iter()
.filter(|column| missing_indices.contains(&column.index));
.filter(|column| missing_indices.contains(&column.index))
.map(Arc::new)
.collect::<Vec<_>>();

// > The following data column sidecars, where they exist, MUST be sent in (slot, column_index) order.
columns_to_store.sort_by_key(|sidecar| (sidecar.slot(), sidecar.index));

info!(
"storing data column sidecars from reconstruction (block: {}, columns: [{}])",
block.message().hash_tree_root(),
missing_indices.iter().join(", "),
columns_to_store
.iter()
.map(|column| column.index)
.join(", "),
);

for data_column_sidecar in columns_to_store {
let data_column_sidecar = Arc::new(data_column_sidecar);

debug!(
"storing data column sidecar from reconstruction (slot: {}, data_column_sidecar: {data_column_sidecar:?})",
data_column_sidecar.slot(),
);

P2pMessage::DataColumnReconstructed(data_column_sidecar).send(&self.p2p_tx);
}
P2pMessage::DataColumnReconstructed(columns_to_store).send(&self.p2p_tx);
}

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions fork_choice_control/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use execution_engine::{ExecutionEngine, NullExecutionEngine};
use features::Feature;
use fork_choice_store::{
AggregateAndProofOrigin, AttestationItem, AttestationOrigin, AttesterSlashingOrigin,
BlobSidecarOrigin, BlockAction, BlockOrigin, DataColumnSidecarOrigin, StateCacheProcessor,
Store,
BlobSidecarOrigin, BlockAction, BlockOrigin, DataColumnSidecarAction, DataColumnSidecarOrigin,
StateCacheProcessor, Store,
};
use futures::channel::mpsc::Sender as MultiSender;
use helper_functions::{
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<P: Preset, W> Run for DataColumnSidecarTask<P, W> {
let result =
store_snapshot.validate_data_column_sidecar(data_column_sidecar, block_seen, &origin);

if result.is_ok() {
if let Ok(DataColumnSidecarAction::Accept(_)) = result {
if let Some(metrics) = metrics.as_ref() {
metrics.verified_gossip_data_column_sidecar.inc();
}
Expand Down
10 changes: 5 additions & 5 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,9 +1134,10 @@ impl<P: Preset> Store<P> {
}

let missing_indices = self.indices_of_missing_data_columns(block);
if missing_indices.len() * 2 >= self.chain_config.number_of_columns()
|| (self.sampling_columns_count() * 2 < self.chain_config.number_of_columns()
&& !missing_indices.is_empty())
let number_of_columns = self.chain_config.number_of_columns();
if !missing_indices.is_empty()
&& (self.sampling_columns_count() * 2 < number_of_columns
|| missing_indices.len() * 2 >= number_of_columns)
{
return Ok(BlockAction::DelayUntilBlobs(block.clone_arc()));
}
Expand Down Expand Up @@ -1943,7 +1944,6 @@ impl<P: Preset> Store<P> {
state_fn: impl FnOnce() -> Result<Arc<BeaconState<P>>>,
) -> Result<DataColumnSidecarAction<P>> {
let block_header = data_column_sidecar.signed_block_header.message;
let is_from_reconstruction = matches!(origin, DataColumnSidecarOrigin::Reconstruction);

// [REJECT] The sidecar's index is consistent with NUMBER_OF_COLUMNS -- i.e. sidecar.index < NUMBER_OF_COLUMNS.
ensure!(
Expand Down Expand Up @@ -2063,7 +2063,7 @@ impl<P: Preset> Store<P> {
// [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by verify_data_column_sidecar_inclusion_proof(sidecar).
//
// Skip inclusion proof and kzg proofs verification for reconstructed columns
if !is_from_reconstruction {
if !origin.is_from_reconstruction() {
ensure!(
verify_sidecar_inclusion_proof(&data_column_sidecar),
Error::DataColumnSidecarInvalidInclusionProof {
Expand Down
4 changes: 2 additions & 2 deletions kzg_utils/src/eip_7594.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ pub fn compute_cells_and_kzg_proofs<P: Preset>(
Ok((cells, proofs.into_iter().map(Into::into)))
}

pub fn recover_cells_and_kzg_proofs(
pub fn recover_cells_and_kzg_proofs<'cell>(
cell_indices: impl IntoIterator<Item = CellIndex>,
cells: impl IntoIterator<Item = Cell>,
cells: impl IntoIterator<Item = &'cell Cell>,
) -> Result<(
impl IntoIterator<Item = [u8; BytesPerCell::USIZE]>,
impl IntoIterator<Item = KzgProof>,
Expand Down
7 changes: 4 additions & 3 deletions kzg_utils/src/spec_tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use kzg::{eip_4844::bytes_to_blob, Fr as _, G1 as _};
use spec_test_utils::Case;
use test_generator::test_resources;
use types::preset::Mainnet;
use types::{fulu::primitives::Cell, preset::Mainnet};

use crate::{
eip_4844::{
Expand Down Expand Up @@ -276,7 +276,7 @@ fn test_recover_cells_and_kzg_proofs(case: Case) {
.cells
.iter()
.map(|cell| deserialize(cell))
.collect::<Result<Vec<_>, _>>()
.collect::<Result<Vec<Cell>, _>>()
{
Ok(cells) => cells,
Err(_) => {
Expand All @@ -293,7 +293,8 @@ fn test_recover_cells_and_kzg_proofs(case: Case) {
}
};

match recover_cells_and_kzg_proofs(cell_indices, cells) {
let result = recover_cells_and_kzg_proofs(cell_indices, cells.iter());
match result {
Ok((cells, proofs)) => {
assert_eq!(cells.into_iter().collect::<Vec<_>>(), expected_cells);
assert_eq!(proofs.into_iter().collect::<Vec<_>>(), expected_proofs);
Expand Down
3 changes: 0 additions & 3 deletions operation_pools/src/sync_committee_agg_pool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ impl<P: Preset, W: Wait> HandleExternalContributionTask<P, W> {

let beacon_state = controller.preprocessed_state_at_current_slot()?;

// TODO(feature/fulu): there is an issue when try to sync from genesis with data column sidecars,
// grandine got alot of `invalid_sync_committee_message`, need to double check with next
// devnet
let is_valid = validate_external_contribution_and_proof(
controller.chain_config(),
signed_contribution_and_proof,
Expand Down
Loading

0 comments on commit 15db5ad

Please sign in to comment.