Skip to content

Commit

Permalink
feat(rooch-da): add granular control options for unpacking (#3307)
Browse files Browse the repository at this point in the history
Add support for chunk-specific unpacking and force unpacking with new CLI flags. Refactored logic to verify transaction order only when required and improved error messages for better debugging.
  • Loading branch information
popcnt1 authored Feb 12, 2025
1 parent a622c86 commit 79babc1
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 28 deletions.
5 changes: 4 additions & 1 deletion crates/rooch-types/src/da/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ impl DABatch {
for mut tx in tx_list {
let tx_order = tx.sequence_info.tx_order;
if tx_order != last_order {
return Err(anyhow::anyhow!("tx order mismatch"));
return Err(anyhow::anyhow!(
"Transaction order is not strictly incremental for block {}: last_tx_order: {}, tx_order: {}",
self.meta.block_range.block_number, last_order, tx_order
));
}

let tx_hash = tx.data.tx_hash();
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch/src/commands/da/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ the easiest way to download segments is using `getda` tool from [here](https://g
segments from cloud storage.

```shell
getda --output={segment-dir} --url={da-cloud-storage-path} --last_chunk={max-chunk-id-expected} --max_goroutines={max-goroutines}
getda --output={segment-dir} --url={da-cloud-storage-path} --last_chunk={max-chunk-id-expected}
```

#### unpack segments
Expand Down
54 changes: 42 additions & 12 deletions crates/rooch/src/commands/da/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ impl SequencedTxStore {
}
}

pub(crate) fn collect_chunk(segment_dir: PathBuf, chunk_id: u128) -> anyhow::Result<Vec<u64>> {
let mut segments = Vec::new();
for segment_number in 0.. {
let segment_id = SegmentID {
chunk_id,
segment_number,
};
let segment_path = segment_dir.join(segment_id.to_string());
if !segment_path.exists() {
if segment_number == 0 {
return Err(anyhow::anyhow!("No segment found in chunk: {}", chunk_id));
} else {
break;
}
}

segments.push(segment_number);
}
Ok(segments)
}

// collect all the chunks from segment_dir.
// each segment is stored in a file named by the segment_id.
// each chunk may contain multiple segments.
Expand Down Expand Up @@ -197,6 +218,7 @@ pub(crate) fn get_tx_list_from_chunk(
segment_dir: PathBuf,
chunk_id: u128,
segment_numbers: Vec<u64>,
verify_order: bool,
) -> anyhow::Result<Vec<LedgerTransaction>> {
let mut segments = Vec::new();
for segment_number in segment_numbers {
Expand All @@ -211,7 +233,7 @@ pub(crate) fn get_tx_list_from_chunk(
}
let chunk = chunk_from_segments(segments)?;
let batch = chunk.get_batches().into_iter().next().unwrap();
batch.verify(true)?;
batch.verify(verify_order)?;
Ok(batch.get_tx_list())
}

Expand Down Expand Up @@ -419,6 +441,7 @@ impl LedgerTxGetter {
self.segment_dir.clone(),
chunk_id,
segment_numbers.clone(),
true,
)?;
Ok(Some(tx_list))
},
Expand All @@ -436,17 +459,24 @@ impl LedgerTxGetter {
let tx_info = resp.into_iter().next().flatten().ok_or_else(|| {
anyhow!("No transaction info found for tx: {:?}", last_tx_hash)
})?;
let tx_state_root = tx_info
.execution_info
.ok_or(anyhow!(
"No execution info found for tx: {:?}",
last_tx_hash
))?
.state_root
.0;
let tx_accumulator_root = tx_info.transaction.sequence_info.tx_accumulator_root.0;
let mut exp_roots = exp_roots.write().await;
exp_roots.insert(tx_order, (tx_state_root, tx_accumulator_root));
let tx_order_in_resp = tx_info.transaction.sequence_info.tx_order.0;
if tx_order_in_resp != tx_order {
return Err(anyhow!(
"failed to request tx by RPC: Tx order mismatch, expect: {}, actual: {}",
tx_order,
tx_order_in_resp
));
} else {
let execution_info_opt = tx_info.execution_info;
// not all sequenced tx could be executed successfully
if let Some(execution_info) = execution_info_opt {
let tx_state_root = execution_info.state_root.0;
let tx_accumulator_root =
tx_info.transaction.sequence_info.tx_accumulator_root.0;
let mut exp_roots = exp_roots.write().await;
exp_roots.insert(tx_order, (tx_state_root, tx_accumulator_root));
}
}
}
Ok(Some(tx_list))
} else {
Expand Down
38 changes: 24 additions & 14 deletions crates/rooch/src/commands/da/commands/unpack.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::commands::da::commands::{collect_chunks, get_tx_list_from_chunk};
use crate::commands::da::commands::{collect_chunk, collect_chunks, get_tx_list_from_chunk};
use clap::Parser;
use rooch_types::error::RoochResult;
use std::collections::{BinaryHeap, HashMap, HashSet};
Expand All @@ -18,8 +18,15 @@ pub struct UnpackCommand {
pub segment_dir: PathBuf,
#[clap(long = "batch-dir")]
pub batch_dir: PathBuf,
#[clap(
long = "chunk-id",
help = "Only unpack the specified chunk_id, otherwise unpack all chunks"
)]
pub chunk_id: Option<u128>,
#[clap(long = "stats-only", help = "Only print L2Tx size stats, no unpacking")]
pub stats_only: bool,
#[clap(long = "force", help = "Force unpacking, even if the batch has issues")]
pub force: bool,
}

impl UnpackCommand {
Expand All @@ -31,7 +38,7 @@ impl UnpackCommand {
batch_dir: self.batch_dir,
stats_only: self.stats_only,
};
unpacker.unpack()?;
unpacker.unpack(self.force, self.chunk_id)?;

Ok(())
}
Expand Down Expand Up @@ -76,19 +83,29 @@ impl UnpackInner {
Ok(())
}

fn collect_chunks(&mut self) -> anyhow::Result<()> {
let (chunks, _min_chunk_id, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?;
fn collect_chunks(&mut self, unpack_chunk_id_opt: Option<u128>) -> anyhow::Result<()> {
let chunks = if let Some(chunk_id) = unpack_chunk_id_opt {
let segment_numbers = collect_chunk(self.segment_dir.clone(), chunk_id)?;
let mut chunks = HashMap::new();
chunks.insert(chunk_id, segment_numbers);
chunks
} else {
let (chunks, _min_chunk_id, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?;
chunks
};

self.chunks = chunks;

Ok(())
}

// unpack batches from segment_dir to batch_dir.
// warn: ChunkV0 only in present
fn unpack(&mut self) -> anyhow::Result<()> {
fn unpack(&mut self, force: bool, unpack_chunk_id_opt: Option<u128>) -> anyhow::Result<()> {
const TOP_N: usize = 20;

self.collect_unpacked()?;
self.collect_chunks()?;
self.collect_chunks(unpack_chunk_id_opt)?;

let mut new_unpacked = HashSet::new();

Expand All @@ -108,18 +125,11 @@ impl UnpackInner {
self.segment_dir.clone(),
*chunk_id,
segment_numbers.clone(),
!force,
)?;

let mut last_tx_order = 0; // the first tx_order in DA is 1
for tx in &tx_list {
let tx_order = tx.sequence_info.tx_order;
if last_tx_order != 0 && tx_order != last_tx_order + 1 {
return Err(anyhow::anyhow!(
"Transaction order is not strictly incremental for block {}: last_tx_order: {}, tx_order: {}",
chunk_id, last_tx_order, tx_order
));
}
last_tx_order = tx_order;
if let rooch_types::transaction::LedgerTxData::L2Tx(tx) = &tx.data {
let tx_size = tx.tx_size();
l2tx_hist.record(tx_order, tx_size)?;
Expand Down

0 comments on commit 79babc1

Please sign in to comment.