Skip to content

Commit

Permalink
Integrate engine_getBlobsV1 with PeerDAS
Browse files Browse the repository at this point in the history
* Republish data column sidecars only once
  • Loading branch information
hangleang committed Feb 5, 2025
1 parent c6c5d0b commit b3acd5a
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 147 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions eth1_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ arc-swap = { workspace = true }
bls = { workspace = true }
dedicated_executor = { workspace = true }
derive_more = { workspace = true }
eip_7594 = { workspace = true }
either = { workspace = true }
enum-iterator = { workspace = true }
ethereum-types = { workspace = true }
Expand Down
103 changes: 74 additions & 29 deletions eth1_api/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ async fn download_blobs<P: Preset, W: Wait>(
blob_indices: Vec<BlobIndex>,
) {
if let Some(body) = block.message().body().post_deneb() {
let kzg_commitments = body
.blob_kzg_commitments()
.iter()
.zip(0..)
.filter(|(_, index)| blob_indices.contains(index))
.collect::<Vec<_>>();
let kzg_commitments = if block.phase().is_peerdas_activated() {
body.blob_kzg_commitments()
.iter()
.zip(0..)
.collect::<Vec<_>>()
} else {
body.blob_kzg_commitments()
.iter()
.zip(0..)
.filter(|(_, index)| blob_indices.contains(index))
.collect::<Vec<_>>()
};

let versioned_hashes = kzg_commitments
.iter()
Expand All @@ -66,30 +72,69 @@ async fn download_blobs<P: Preset, W: Wait>(
Ok(blobs_and_proofs) => {
let block_header = block.to_header();

for (blob_and_proof, kzg_commitment, index) in blobs_and_proofs
.into_iter()
.zip(kzg_commitments.into_iter())
.filter_map(|(blob_and_proof, (kzg_commitment, index))| {
blob_and_proof.map(|blob_and_proof| (blob_and_proof, kzg_commitment, index))
})
{
let BlobAndProofV1 { blob, proof } = blob_and_proof;

match misc::construct_blob_sidecar(
&block,
block_header,
index,
blob,
*kzg_commitment,
proof,
) {
Ok(blob_sidecar) => {
controller.on_el_blob_sidecar(Arc::new(blob_sidecar));
if block.phase().is_peerdas_activated() {
let blobs = blobs_and_proofs
.into_iter()
.filter_map(|blob_and_proof| {
blob_and_proof.map(|blob_and_proof| blob_and_proof.blob)
})
.collect::<Vec<_>>();

if blobs.len() == kzg_commitments.len() {
match eip_7594::try_convert_to_cells_and_kzg_proofs::<P>(blobs.into_iter())
{
Ok(cells_and_kzg_proofs) => {
match eip_7594::construct_data_column_sidecars(
&block,
&cells_and_kzg_proofs,
controller.chain_config(),
) {
Ok(data_column_sidecars) => {
for data_column_sidecar in data_column_sidecars {
controller.on_el_data_column_sidecar(Arc::new(
data_column_sidecar,
));
}
}
Err(error) => warn!(
"failed to construct data column sidecars with \
cells and kzg proofs: {error:?}"
),
}
}
Err(error) => warn!(
"failed to convert blobs received from execution layer \
into cells and kzg proofs: {error:?}"
),
}
}
} else {
for (blob_and_proof, kzg_commitment, index) in blobs_and_proofs
.into_iter()
.zip(kzg_commitments.into_iter())
.filter_map(|(blob_and_proof, (kzg_commitment, index))| {
blob_and_proof
.map(|blob_and_proof| (blob_and_proof, kzg_commitment, index))
})
{
let BlobAndProofV1 { blob, proof } = blob_and_proof;

match misc::construct_blob_sidecar(
&block,
block_header,
index,
blob,
*kzg_commitment,
proof,
) {
Ok(blob_sidecar) => {
controller.on_el_blob_sidecar(Arc::new(blob_sidecar));
}
Err(error) => warn!(
"failed to construct blob sidecar with blob and proof \
received from execution layer: {error:?}"
),
}
Err(error) => warn!(
"failed to construct blob sidecar with blob and proof \
received from execution layer: {error:?}"
),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ where
self.spawn_blob_sidecar_task(blob_sidecar, true, BlobSidecarOrigin::ExecutionLayer)
}

pub fn on_el_data_column_sidecar(&self, data_column_sidecar: Arc<DataColumnSidecar<P>>) {
self.spawn_data_column_sidecar_task(
data_column_sidecar,
true,
DataColumnSidecarOrigin::ExecutionLayer,
)
}

pub fn on_reconstruct_data_column_sidecar(
&self,
data_column_sidecar: Arc<DataColumnSidecar<P>>,
Expand Down
188 changes: 101 additions & 87 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,97 +559,111 @@ where
submission_time,
};

if pending_block.block.phase().is_peerdas_activated() {
let missing_column_indices = self
.store
.indices_of_missing_data_columns(&pending_block.block);
let available_columns_count = self
.store
.sampling_columns_count()
.saturating_sub(missing_column_indices.len());
let number_of_columns = self.store.chain_config().number_of_columns();
debug!(
"missing columns: [{}] at slot: {}",
missing_column_indices.iter().join(", "),
slot,
);

if missing_column_indices.is_empty() {
self.retry_block(wait_group, pending_block);
} else if available_columns_count * 2 >= number_of_columns {
self.handle_reconstructing_data_column_sidecars(wait_group, pending_block);
} else {
if let Some(body) = pending_block.block.message().body().post_deneb() {
let blob_count = body.blob_kzg_commitments().len();

if pending_block.block.phase().is_peerdas_activated() {
let missing_column_indices = self
.store
.indices_of_missing_data_columns(&pending_block.block);
let available_columns_count = self
.store
.sampling_columns_count()
.saturating_sub(missing_column_indices.len());
let number_of_columns = self.store.chain_config().number_of_columns();
debug!(
"block delayed until sufficient data column sidecars are available \
(column indices: {missing_column_indices:?}, pending block root: {block_root:?})",
);

if let Some(gossip_id) = pending_block.origin.gossip_id() {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}

let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
"missing columns: [{}] at slot: {}",
missing_column_indices.iter().join(", "),
slot,
);

// we need to request those columns from peers through RPC for fullnode,
// but for super-fullnode, we only need to request some columns enough to
// reconstruct the rest. this would significantly reduce bandwidth usage.
let request_length =
if missing_column_indices.len() * 2 >= number_of_columns {
number_of_columns
.saturating_div(2)
.saturating_sub(available_columns_count)
} else {
missing_column_indices.len()
};

let column_ids = missing_column_indices
.into_iter()
.take(request_length)
.map(|index| DataColumnIdentifier { block_root, index })
.collect_vec();
if missing_column_indices.is_empty() {
self.retry_block(wait_group, pending_block);
} else if available_columns_count * 2 >= number_of_columns {
self.handle_reconstructing_data_column_sidecars(
wait_group,
pending_block,
blob_count,
);
} else {
debug!(
"block delayed until sufficient data column sidecars are available \
(column indices: {missing_column_indices:?}, pending block root: {block_root:?})",
);

let peer_id = pending_block.origin.peer_id();
if let Some(gossip_id) = pending_block.origin.gossip_id() {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}

P2pMessage::DataColumnsNeeded(column_ids, slot, peer_id).send(&self.p2p_tx);
let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);

self.delay_block_until_blobs(block_root, pending_block);
}
} else {
let missing_blob_indices =
self.store.indices_of_missing_blobs(&pending_block.block);
self.request_blobs_from_execution_engine(
pending_block.block.clone_arc(),
(0..blob_count as BlobIndex).collect(),
);

if missing_blob_indices.is_empty() {
self.retry_block(wait_group, pending_block);
// we need to request those columns from peers through RPC for fullnode,
// but for super-fullnode, we only need to request some columns enough to
// reconstruct the rest. this would significantly reduce bandwidth usage.
let request_length =
if missing_column_indices.len() * 2 >= number_of_columns {
number_of_columns
.saturating_div(2)
.saturating_sub(available_columns_count)
} else {
missing_column_indices.len()
};

let column_ids = missing_column_indices
.into_iter()
.take(request_length)
.map(|index| DataColumnIdentifier { block_root, index })
.collect_vec();

let peer_id = pending_block.origin.peer_id();

P2pMessage::DataColumnsNeeded(column_ids, slot, peer_id)
.send(&self.p2p_tx);

self.delay_block_until_blobs(block_root, pending_block);
}
} else {
debug!("block delayed until blobs: {pending_block:?}");
let missing_blob_indices =
self.store.indices_of_missing_blobs(&pending_block.block);

if let Some(gossip_id) = pending_block.origin.gossip_id() {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}
if missing_blob_indices.is_empty() {
self.retry_block(wait_group, pending_block);
} else {
debug!("block delayed until blobs: {pending_block:?}");

let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);
if let Some(gossip_id) = pending_block.origin.gossip_id() {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}

self.request_blobs_from_execution_engine(
pending_block.block.clone_arc(),
missing_blob_indices.clone(),
);
let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);

let blob_ids = missing_blob_indices
.into_iter()
.map(|index| BlobIdentifier { block_root, index })
.collect_vec();
self.request_blobs_from_execution_engine(
pending_block.block.clone_arc(),
missing_blob_indices.clone(),
);

let peer_id = pending_block.origin.peer_id();
let blob_ids = missing_blob_indices
.into_iter()
.map(|index| BlobIdentifier { block_root, index })
.collect_vec();

P2pMessage::BlobsNeeded(blob_ids, slot, peer_id).send(&self.p2p_tx);
let peer_id = pending_block.origin.peer_id();

self.delay_block_until_blobs(block_root, pending_block);
P2pMessage::BlobsNeeded(blob_ids, slot, peer_id).send(&self.p2p_tx);

self.delay_block_until_blobs(block_root, pending_block);
}
}
}
}
Expand Down Expand Up @@ -1266,7 +1280,11 @@ where
) {
match result {
Ok(DataColumnSidecarAction::Accept(data_column_sidecar)) => {
if origin.is_from_reconstruction() {
if origin.is_from_el_or_reconstruction() {
self.store_mut()
.mark_as_republished(data_column_sidecar.slot(), data_column_sidecar.index);
self.update_store_snapshot();

P2pMessage::PublishDataColumnSidecar(data_column_sidecar.clone_arc())
.send(&self.p2p_tx);
}
Expand Down Expand Up @@ -1454,19 +1472,14 @@ where
&mut self,
wait_group: W,
pending_block: PendingBlock<P>,
blob_count: usize,
) {
let block = pending_block.block.clone_arc();
let block_root = block.message().hash_tree_root();
if !self.store.is_data_columns_reconstructed(block_root) {
let body = block
.message()
.body()
.post_deneb()
.expect("cannot compute post deneb block body");

let blob_count = body.blob_kzg_commitments().len();

if !self.store.is_data_columns_reconstructed(block_root) {
let missing_indices = self.store.indices_of_missing_data_columns(&block);

if missing_indices.is_empty() {
info!("all column sidecars are available, ignored reconstruction");
self.retry_block(wait_group, pending_block);
Expand Down Expand Up @@ -1990,13 +2003,14 @@ where

self.update_store_snapshot();

if let Some(pending_block) = self.delayed_until_blobs.get(&block_root) {
self.retry_block(wait_group.clone(), pending_block.clone());
if let Some(pending_block) = self.take_delayed_until_blobs(block_root) {
self.retry_block(wait_group.clone(), pending_block);
}

self.event_channels
.send_data_column_sidecar_event(block_root, data_column_sidecar);

// TODO(feature/fulu): only persist custody columns
self.spawn(PersistDataColumnSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
Expand Down
Loading

0 comments on commit b3acd5a

Please sign in to comment.