diff --git a/Cargo.lock b/Cargo.lock index 7e615708e..05f7ccf35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9073,6 +9073,7 @@ dependencies = [ "blockstore", "byteorder", "bytes", + "clap", "criterion", "digest 0.10.7", "futures", diff --git a/maat/tests/real_world.rs b/maat/tests/real_world.rs index 93ac3c2fa..e85e9e07a 100644 --- a/maat/tests/real_world.rs +++ b/maat/tests/real_world.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::{collections::BTreeSet, time::Duration}; use cid::Cid; use maat::*; @@ -35,7 +35,7 @@ where let result = client .register_storage_provider( charlie, - peer_id.clone(), + peer_id.clone().parse().expect("invalid peer_id"), primitives::proofs::RegisteredPoStProof::StackedDRGWindow2KiBV1P1, true, ) @@ -251,7 +251,7 @@ where assert_eq!(event.sectors.0, sectors_pre_commit_info); } - result.height + client.height(true).await.unwrap() } async fn prove_commit_sector( @@ -325,10 +325,10 @@ where SubmitWindowedPoStParams { deadline: 0, partitions: vec![0], - proof: storagext::types::storage_provider::PoStProof { + proofs: vec![storagext::types::storage_provider::PoStProof { post_proof: primitives::proofs::RegisteredPoStProof::StackedDRGWindow2KiBV1P1, proof_bytes: "beef".as_bytes().to_vec(), - }, + }], }, true, ) @@ -408,9 +408,11 @@ async fn real_world_use_case() { tracing::debug!("base dir: {:?}", network.base_dir()); - let collator = network.get_node(COLLATOR_NAME).unwrap(); - let client = - storagext::Client::from(collator.wait_client::().await.unwrap()); + let client = storagext::Client::new( + "ws://127.0.0.1:9944", + 5, + Duration::from_secs(2) + ).await.unwrap(); let alice_kp = pair_signer_from_str::("//Alice"); let charlie_kp = pair_signer_from_str::("//Charlie"); diff --git a/mater/cli/src/convert.rs b/mater/cli/src/convert.rs index 67c3a64fd..97c5f7670 100644 --- a/mater/cli/src/convert.rs +++ b/mater/cli/src/convert.rs @@ -3,22 +3,30 @@ use std::path::PathBuf; use mater::{create_filestore, Cid, Config}; use tokio::fs::File; -use crate::error::Error; +use crate::{error::Error, ConvertConfig, WrapMode}; /// Converts a file at location `input_path` to a CARv2 file at `output_path` pub(crate) async fn convert_file_to_car( input_path: &PathBuf, output_path: &PathBuf, - overwrite: bool, + config: ConvertConfig, ) -> Result { let source_file = File::open(input_path).await?; - let output_file = if overwrite { + let output_file = if config.overwrite { File::create(output_path).await } else { File::create_new(output_path).await }?; - let cid = create_filestore(source_file, output_file, Config::default()).await?; + let mater_config = match config.wrap_mode { + WrapMode::Raw => Config::default(), + WrapMode::UnixFS => Config::Balanced { + chunk_size: 64 * 1024, + tree_width: 2, + wrap_mode: WrapMode::UnixFS, + }, + }; + let cid = create_filestore(source_file, output_file, mater_config).await?; Ok(cid) } @@ -27,16 +35,14 @@ pub(crate) async fn convert_file_to_car( #[cfg(test)] mod tests { use std::str::FromStr; - use anyhow::Result; use mater::Cid; use tempfile::tempdir; use tokio::{fs::File, io::AsyncWriteExt}; - - use crate::{convert::convert_file_to_car, error::Error}; + use crate::{convert::convert_file_to_car, error::Error, ConvertConfig, WrapMode}; #[tokio::test] - async fn convert_file_to_car_success() -> Result<()> { + async fn convert_file_to_car_raw_success() -> Result<()> { // Setup: Create a dummy input file let temp_dir = tempdir()?; let input_path = temp_dir.path().join("test_input.txt"); @@ -49,18 +55,42 @@ mod tests { // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + // Configure for raw mode + let config = ConvertConfig { + wrap_mode: WrapMode::Raw, + overwrite: false, + }; - // Assert the result is Ok + // Call the function under test + let result = convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_ok()); - - // Verify that the CID is as expected assert_eq!(result?, expected_cid); - // Close temporary directory temp_dir.close()?; + Ok(()) + } + + #[tokio::test] + async fn convert_file_to_car_unixfs_success() -> Result<()> { + // Setup: Create a dummy input file + let temp_dir = tempdir()?; + let input_path = temp_dir.path().join("test_input.txt"); + let mut input_file = File::create(&input_path).await?; + input_file.write_all(b"test data").await?; + // Define output path + let output_path = temp_dir.path().join("test_output.car"); + + // Configure for UnixFS mode + let config = ConvertConfig { + wrap_mode: WrapMode::UnixFS, + overwrite: false, + }; + + let result = convert_file_to_car(&input_path, &output_path, config).await; + assert!(result.is_ok()); + + temp_dir.close()?; Ok(()) } @@ -69,20 +99,20 @@ mod tests { // Define non-existent input path let temp_dir = tempdir()?; let input_path = temp_dir.path().join("non_existent_input.txt"); - // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = ConvertConfig { + wrap_mode: WrapMode::Raw, + overwrite: false, + }; - // Assert the result is an error + // Call the function under test + let result = convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); assert!(matches!(result, Err(Error::IoError(..)))); - // Close temporary directory temp_dir.close()?; - Ok(()) } @@ -97,18 +127,18 @@ mod tests { // Create output file let output_path = temp_dir.path().join("output_file"); File::create_new(&output_path).await?; - println!("gets here"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = ConvertConfig { + wrap_mode: WrapMode::Raw, + overwrite: false, + }; - // Assert the result is an error + // Call the function under test + let result = convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); assert!(matches!(result, Err(Error::IoError(..)))); - // Close temporary directory temp_dir.close()?; - Ok(()) } } diff --git a/mater/cli/src/main.rs b/mater/cli/src/main.rs index ec00f6068..aa7666333 100644 --- a/mater/cli/src/main.rs +++ b/mater/cli/src/main.rs @@ -1,6 +1,8 @@ use std::path::PathBuf; use clap::Parser; +use mater::ConvertConfig; +use mater::WrapMode; use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car}; @@ -29,6 +31,11 @@ enum MaterCli { /// If enabled, the output will overwrite any existing files. #[arg(long, action)] overwrite: bool, + + /// Determines how content should be wrapped in the CAR file. + /// 'raw' stores content directly, 'unixfs' wraps it in UnixFS format (default: raw) + #[arg(long, value_enum, default_value = "raw")] + wrap: WrapMode, }, /// Convert a CARv2 file to its original format Extract { @@ -47,13 +54,18 @@ async fn main() -> Result<(), Error> { output_path, quiet, overwrite, + wrap } => { let output_path = output_path.unwrap_or_else(|| { let mut new_path = input_path.clone(); new_path.set_extension("car"); new_path }); - let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?; + let config = ConvertConfig { + wrap_mode: wrap, + overwrite, + }; + let cid = convert_file_to_car(&input_path, &output_path, config).await?; if quiet { println!("{}", cid); diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..875f2e163 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -29,6 +29,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["io"] } +clap = { workspace = true, features = ["derive"] } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index cb60d2383..62d7990ea 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -17,10 +17,14 @@ mod unixfs; mod v1; mod v2; +pub use stores::WrapMode; +pub use stores::ConvertConfig; +pub use stores::filestore::create_filestore; + // We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{Blockstore, Config, FileBlockstore}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..d48499abf 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,32 +1,41 @@ +use std::pin::Pin; + use bytes::BytesMut; use futures::stream::StreamExt; +use futures::Stream; use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; - -use super::Config; +use futures::stream::Peekable; +use crate::unixfs::stream_balanced_tree_unixfs; +use super::{Config, WrapMode}; use crate::{ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, }; +use std::collections::HashMap; + +type SendableStream = Pin), Error>> + Send>>; async fn balanced_import( mut source: Src, mut output: Out, chunk_size: usize, tree_width: usize, + wrap_mode: WrapMode, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt // to fill it's buffer before returning, voiding the whole promise of properly sized chunks // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 - let chunker = async_stream::try_stream! { + let chunker = Box::pin(async_stream::try_stream! { let mut buf = BytesMut::with_capacity(chunk_size); + let mut total_chunks = 0; loop { if buf.capacity() < chunk_size { @@ -45,28 +54,60 @@ where // this means there is no right way of knowing when the reader is fully exhausted! // If we need to support a case like that, we just need to track how many times // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it + let read_bytes = source.read_buf(&mut buf).await?; + println!("Read {} bytes, buffer now has {} bytes", read_bytes, buf.len()); + + if read_bytes == 0 { if buf.len() > 0 { + total_chunks += 1; + println!("Final chunk {}: {} bytes", total_chunks, buf.len()); let chunk = buf.split(); yield chunk.freeze(); } - break - } else if buf.len() >= chunk_size { + break; + } + + // While we have enough for a full chunk + while buf.len() >= chunk_size { + total_chunks += 1; + println!("Full chunk {}: {} bytes", total_chunks, chunk_size); // The buffer may have a larger capacity than chunk_size due to reserve // this also means that our read may have read more bytes than we expected, // thats why we check if the length if bigger than the chunk_size and if so // we split the buffer to the chunk_size, then freeze and return let chunk = buf.split_to(chunk_size); yield chunk.freeze(); - } // otherwise, the buffer is not full, so we don't do a thing + } + // If we reach EOF but still have a partial chunk, yield it + if read_bytes == 0 && buf.len() > 0 { + total_chunks += 1; + println!("Partial chunk {}: {} bytes", total_chunks, buf.len()); + let chunk = buf.split(); + yield chunk.freeze(); + } + } + println!("Total chunks yielded: {}", total_chunks); + }); + + let nodes: Peekable = match wrap_mode { + WrapMode::Raw => futures::stream::StreamExt::peekable(Box::pin( + stream_balanced_tree(chunker, tree_width).map(|result| { + result.map(|(cid, bytes)| (Cid::from(cid), bytes.to_vec())) + }) + )), + WrapMode::UnixFS => { + futures::stream::StreamExt::peekable(Box::pin( + stream_balanced_tree_unixfs(chunker, tree_width) + .map(|res| res) + )) + } }; - let nodes = stream_balanced_tree(chunker, tree_width).peekable(); - tokio::pin!(nodes); + tokio::pin!(nodes); let mut writer = CarV2Writer::new(&mut output); + let mut written_blocks = HashMap::new(); let mut position = 0; let placeholder_header = CarV2Header::default(); @@ -78,12 +119,20 @@ where let mut root = None; let mut entries = vec![]; + while let Some(node) = nodes.next().await { let (node_cid, node_bytes) = node?; - let digest = node_cid.hash().digest().to_owned(); - let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); - entries.push(entry); - position += writer.write_block(&node_cid, &node_bytes).await?; + + if let Some(existing_position) = written_blocks.get(&node_cid) { + let digest = node_cid.hash().digest().to_owned(); + entries.push(IndexEntry::new(digest, (*existing_position - car_v1_start) as u64)); + } else { + let digest = node_cid.hash().digest().to_owned(); + let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); + entries.push(entry); + position += writer.write_block(&node_cid, &node_bytes).await?; + written_blocks.insert(node_cid, position); + } if nodes.as_mut().peek().await.is_none() { root = Some(node_cid); @@ -95,8 +144,11 @@ where }; let index_offset = position; - let single_width_index = - SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries); + let single_width_index = SingleWidthIndex::new( + Sha256::output_size() as u32, + entries.len() as u64, + entries, + ); let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( SHA_256_CODE, single_width_index.into(), @@ -130,28 +182,28 @@ pub async fn create_filestore( config: Config, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { match config { Config::Balanced { chunk_size, tree_width, - } => balanced_import(source, output, chunk_size, tree_width).await, + wrap_mode, + } => balanced_import(source, output, chunk_size, tree_width, wrap_mode).await, } } #[cfg(test)] mod test { - use std::path::Path; - + use std::collections::HashSet; + use super::*; use tempfile::tempdir; use tokio::fs::File; - - use crate::{ - stores::{filestore::create_filestore, Config}, - test_utils::assert_buffer_eq, - }; + use ipld_dagpb::{DagPbCodec, PbNode}; + use quick_protobuf::MessageRead; + use ipld_core::codec::Codec; + use crate::unixfs::Data; async fn test_filestore_roundtrip(original: P1, expected: P2) where @@ -190,4 +242,113 @@ mod test { ) .await } + + #[tokio::test] + async fn test_filestore_unixfs_dag_structure() { + use rand::{thread_rng, Rng}; + + let temp_dir = tempdir().unwrap(); + let input_path = temp_dir.path().join("input.bin"); + let temp_path = temp_dir.path().join("temp.car"); + + // Create test file with random data to ensure unique chunks + let mut rng = thread_rng(); + let test_data = (0..512*1024) + .map(|_| rng.gen::()) + .collect::>(); + + println!("Creating test file of size: {} bytes", test_data.len()); + println!("Expected chunks: {}", test_data.len() / (64 * 1024)); + tokio::fs::write(&input_path, &test_data).await.unwrap(); + + let source_file = File::open(&input_path).await.unwrap(); + let output_file = File::create(&temp_path).await.unwrap(); + + let config = Config::Balanced { + chunk_size: 64 * 1024, // 64KB chunks + tree_width: 2, // Binary tree structure + wrap_mode: WrapMode::UnixFS, + }; + + let root_cid = create_filestore(source_file, output_file, config).await.unwrap(); + println!("Root CID: {}", root_cid); + + // Read back and verify structure + let file = File::open(&temp_path).await.unwrap(); + let mut reader = crate::CarV2Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + reader.read_v1_header().await.unwrap(); + + // Track all unique blocks and statistics + let mut unique_blocks = HashSet::new(); + let mut leaf_blocks = HashSet::new(); + let mut parent_blocks = HashSet::new(); + let mut total_blocks = 0; + let mut chunk_count = 0; + let mut intermediate_count = 0; + let mut total_size = 0; + let mut level_sizes = Vec::new(); + let mut current_level_nodes = HashSet::new(); + let mut current_level = 0; + + while let Ok((cid, data)) = reader.read_block().await { + total_blocks += 1; + unique_blocks.insert(cid); + total_size += data.len(); + + let pb_node: PbNode = DagPbCodec::decode(&data[..]).unwrap(); + let reader = &mut quick_protobuf::BytesReader::from_bytes(&pb_node.data.clone().unwrap()); + let bytes = &pb_node.data.unwrap(); + let unixfs_data = Data::from_reader(reader, bytes).unwrap(); + + if pb_node.links.is_empty() { + chunk_count += 1; + leaf_blocks.insert(cid); + println!("Found leaf node: {} (size: {})", cid, data.len()); + println!(" Data size: {}", unixfs_data.Data.as_ref().map_or(0, |d| d.len())); + println!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + // New level if this is first leaf + if current_level_nodes.is_empty() { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + } + } else { + intermediate_count += 1; + parent_blocks.insert(cid); + + println!("Found parent node: {} with {} links (size: {})", cid, pb_node.links.len(), data.len()); + println!(" Total filesize: {:?}", unixfs_data.filesize); + println!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + for link in &pb_node.links { + println!(" -> Link to: {} (size: {:?})", link.cid, link.size); + } + + // Track level changes + if !current_level_nodes.is_empty() && current_level_nodes.iter().any(|n| pb_node.links.iter().any(|l| l.cid == *n)) { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + current_level_nodes.clear(); + } + } + + level_sizes[current_level] += 1; + current_level_nodes.insert(cid); + } + + // Verify structure + assert!(!leaf_blocks.is_empty(), "No leaf nodes found"); + assert!(!parent_blocks.is_empty(), "No parent nodes found"); + assert_eq!(unique_blocks.len(), total_blocks, "Found duplicate blocks"); + + // Verify we have the expected number of chunks + let expected_chunks = (test_data.len() + 64 * 1024 - 1) / (64 * 1024); // Round up division + + assert_eq!(chunk_count, expected_chunks, "Unexpected number of chunks"); + assert_eq!(leaf_blocks.len(), chunk_count, "Wrong total leaf-block count"); + assert!(!parent_blocks.is_empty(), "No parent nodes found"); + } } diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..9c05a1c95 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -1,10 +1,9 @@ mod blockstore; mod file; -mod filestore; +pub mod filestore; pub use blockstore::Blockstore; pub use file::FileBlockstore; -pub use filestore::create_filestore; /// The default block size, as defined in /// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). @@ -16,23 +15,25 @@ pub(crate) const DEFAULT_TREE_WIDTH: usize = 174; /// Store configuration options. pub enum Config { - /// The store should use the balanced tree layout, - /// generating byte chunks of `chunk_size` and - /// generating parent nodes every `tree_width` nodes. + /// Creates a balanced tree structure by generating fixed-size chunks + /// and arranging them into a tree with a specified width. Balanced { - /// The size of the byte chunks. + /// Size of each chunk in bytes. Default is 256KiB. chunk_size: usize, - /// The number of children per parent node. + /// Maximum number of children per parent node. Default is 174 (IPFS default). tree_width: usize, - }, + /// How the content should be wrapped. See [`WrapMode`] for details. + wrap_mode: WrapMode, + } } impl Config { /// Create a new [`Config::Balanced`]. - pub fn balanced(chunk_size: usize, tree_width: usize) -> Self { + pub fn balanced(chunk_size: usize, tree_width: usize, wrap_mode: WrapMode) -> Self { Self::Balanced { chunk_size, tree_width, + wrap_mode, } } } @@ -40,8 +41,37 @@ impl Config { impl Default for Config { fn default() -> Self { Self::Balanced { - chunk_size: DEFAULT_BLOCK_SIZE, - tree_width: DEFAULT_TREE_WIDTH, + chunk_size: 256 * 1024, // 256KiB + tree_width: 174, // Default from ipfs + wrap_mode: WrapMode::Raw // Default to raw like go-car } } } + + +/// Determines how content is encoded in the CAR file. +#[derive(clap::ValueEnum, Clone, Debug)] +pub enum WrapMode { + /// Store content directly in the CAR file without additional metadata. + /// This is the most space-efficient option but provides minimal metadata. + Raw, + /// Wrap content in UnixFS format which includes file metadata and DAG structure. + /// This is compatible with IPFS and provides richer metadata about the content. + UnixFS, +} + +impl Default for WrapMode { + fn default() -> Self { + Self::Raw + } +} + +/// Configuration options for file conversion. +pub struct ConvertConfig { + /// How the content should be wrapped in the CAR file. + /// See [`WrapMode`] for details. + pub wrap_mode: WrapMode, + /// Whether to overwrite existing files at the output path. + /// If false, will error if the output file already exists. + pub overwrite: bool, +} diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 1a84cfa66..6329e8e1b 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -2,6 +2,7 @@ //! . mod unixfs_pb; +pub use unixfs_pb::{Data, mod_Data}; use std::collections::VecDeque; @@ -13,7 +14,6 @@ use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; use tokio_stream::{Stream, StreamExt}; - use crate::{ multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, Error, @@ -331,6 +331,154 @@ where } } +impl TreeNode { + fn encode_unixfs_leaf(self) -> Result<((Cid, Bytes), LinkInfo), Error> { + match self { + // + // 1) LEAF => encode as a DAG-PB "File" node carrying the chunk + // + TreeNode::Leaf(chunk) => { + let chunk_len = chunk.len() as u64; + // Build UnixFS proto + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(chunk_len), + blocksizes: vec![chunk_len], + Data: Some(chunk.to_vec().into()), + hashType: None, + fanout: None, + }; + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + let pb_node = PbNode { + links: vec![], // leaf => no links + data: Some(data_buf.clone().into()), + }; + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: data_buf.len() as u64, + encoded_data_length: encoded.len() as u64, + }; + Ok(((cid, encoded.into()), info)) + } + + TreeNode::Stem(children) => { + let total_file_size: u64 = children.iter().map(|c| c.1.raw_data_length).sum(); + let total_encoded: u64 = children.iter().map(|c| c.1.encoded_data_length).sum(); + let blocksizes: Vec<_> = children.iter().map(|c| c.1.raw_data_length).collect(); + + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(total_file_size), + blocksizes, + Data: None, + hashType: None, + fanout: None, + }; + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + let pb_links = children.into_iter().map(|(child_cid, link_info)| { + PbLink { + cid: child_cid, + name: Some("".to_string()), + size: Some(link_info.encoded_data_length), + } + }).collect::>(); + + let pb_node = PbNode { + links: pb_links, + data: Some(data_buf.clone().into()), + }; + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: data_buf.len() as u64, + encoded_data_length: encoded.len() as u64 + total_encoded, + }; + Ok(((cid, encoded.into()), info)) + } + } + } +} + +pub(crate) fn stream_balanced_tree_unixfs( + input: I, + width: usize, +) -> impl Stream), Error>> +where + I: Stream>, +{ + try_stream! { + use std::collections::VecDeque; + + let mut levels: VecDeque> = VecDeque::new(); + levels.push_back(vec![]); + + // Map input from io::Result => Result, then handle + let input = input + .err_into::() + .map(|data| { + let chunk = data?; + Ok::<_, Error>(TreeNode::Leaf(chunk)) + }) + .err_into::(); + + tokio::pin!(input); + + while let Some(node_res) = input.next().await { + let node = node_res?; + // Encode as a UnixFS leaf + let ((leaf_cid, leaf_bytes), leaf_info) = node.encode_unixfs_leaf()?; + // yield it as an output block + yield (leaf_cid, leaf_bytes.to_vec()); + + // Place into bottom level + levels[0].push((leaf_cid, leaf_info)); + + // If bottom level is full, bubble up + for level in 0..levels.len() { + if levels[level].len() < width { + break; + } + let children = std::mem::replace(&mut levels[level], Vec::with_capacity(width)); + let ((cid, data), info) = TreeNode::Stem(children).encode_unixfs_leaf()?; + yield (cid, data.to_vec()); + + if level + 1 == levels.len() { + levels.push_back(vec![]); + } + levels[level + 1].push((cid, info)); + } + } + + // End of input. Clean up partial layers from bottom to top + while let Some(leftover) = levels.pop_front() { + if leftover.is_empty() { + continue; + } + let ((cid, data), info) = TreeNode::Stem(leftover).encode_unixfs_leaf()?; + yield (cid, data.to_vec()); + + // if there's a higher level, push + if let Some(up) = levels.front_mut() { + up.push((cid, info)); + } + } + } +} + #[cfg(test)] mod tests { //! Tests were taken from [beetle][beetle] too, I did modify them to suit our needs. diff --git a/mater/lib/src/v2/reader.rs b/mater/lib/src/v2/reader.rs index caa6d689f..dfb0647bf 100644 --- a/mater/lib/src/v2/reader.rs +++ b/mater/lib/src/v2/reader.rs @@ -1,12 +1,17 @@ -use ipld_core::cid::Cid; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}; +use std::collections::HashSet; use super::index::read_index; +use crate::multicodec::DAG_PB_CODE; use crate::{ v2::{index::Index, Characteristics, Header, PRAGMA}, Error, }; - +use ipld_core::cid::Cid; +use ipld_core::codec::Codec; +use ipld_dagpb::DagPbCodec; +use ipld_dagpb::PbNode; +use tokio::io::AsyncSeek; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; /// Low-level CARv2 reader. pub struct Reader { reader: R, @@ -21,7 +26,7 @@ impl Reader { impl Reader where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + AsyncSeek, { /// Takes in a CID and checks that the contents in the reader matches this CID pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> { @@ -62,21 +67,48 @@ where { self.read_pragma().await?; let header = self.read_header().await?; - let _v1_header = self.read_v1_header().await?; + let v1_header = self.read_v1_header().await?; let mut written = 0; - while let Ok((_cid, contents)) = self.read_block().await { - // CAR file contents is empty - if contents.len() == 0 { - break; - } + // Keep track of root CID and position + let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?; + let data_end = header.data_offset + header.data_size; + + // Track what we've processed and need to process + let mut processed: HashSet = HashSet::new(); + let mut to_process = vec![*root_cid]; + + while !to_process.is_empty() { let position = self.get_inner_mut().stream_position().await?; - let data_end = header.data_offset + header.data_size; - // Add the `written != 0` clause for files that are less than a single block. if position >= data_end && written != 0 { break; } - written += output_file.write(&contents).await?; + + if let Ok((cid, contents)) = self.read_block().await { + if contents.len() == 0 { + break; + } + + // Write the block data + written += output_file.write(&contents).await?; + + // If it's a DAG-PB node, queue up its children + if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) { + let reader = std::io::BufReader::new(&contents[..]); + if let Ok(node) = DagPbCodec::decode(reader) { + let pb_node: PbNode = node; + to_process.extend( + pb_node + .links + .iter() + .map(|link| link.cid) + .filter(|cid| !processed.contains(cid)), + ); + } + } + + processed.insert(cid); + } } Ok(()) @@ -151,9 +183,11 @@ where } /// Function verifies that a given CID matches the CID for the CAR file in the given reader -pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> { - let mut reader = Reader::new(BufReader::new(reader)); - +pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> +where + R: AsyncRead + AsyncSeek + Unpin, +{ + let mut reader = Reader::new(reader); reader.verify_cid(contents_cid).await } @@ -162,11 +196,13 @@ mod tests { use std::{io::Cursor, path::PathBuf, str::FromStr}; use ipld_core::cid::Cid; + use ipld_core::codec::Codec; + use ipld_dagpb::{DagPbCodec, PbNode}; use sha2::Sha256; use tokio::{fs::File, io::AsyncSeekExt}; use crate::{ - multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, + multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE, SHA_256_CODE}, v2::{index::Index, reader::Reader}, verify_cid, Error, }; @@ -425,4 +461,48 @@ mod tests { assert_eq!(fst[0].entries.len(), 4); } } + + #[tokio::test] + async fn test_dag_pb_links() { + let file = File::open("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + let mut reader = Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + + let mut found_dag_pb = false; + let mut total_links = 0; + + while let Ok((cid, data)) = reader.read_block().await { + if cid.codec() == DAG_PB_CODE { + found_dag_pb = true; + let reader = std::io::BufReader::new(&data[..]); + + match DagPbCodec::decode(reader) { + Ok(node) => { + let pb_node: PbNode = node; + if !pb_node.links.is_empty() { + total_links += pb_node.links.len(); + } + + // Verify each link + for link in pb_node.links { + assert!( + !link.cid.to_string().is_empty(), + "Link should have valid CID" + ); + } + } + Err(err) => { + panic!("Failed to decode DAG-PB node: {}", err); + } + } + } + } + + assert!(found_dag_pb, "No DAG-PB nodes found in test file"); + assert!(total_links > 0, "No links found in DAG-PB nodes"); + } } diff --git a/pallets/proofs/src/porep/mod.rs b/pallets/proofs/src/porep/mod.rs index de0c82e31..da40d53f5 100644 --- a/pallets/proofs/src/porep/mod.rs +++ b/pallets/proofs/src/porep/mod.rs @@ -307,7 +307,7 @@ mod tests { comm_d: comm_d_fr, comm_r: comm_r_fr, }, - seed: seed.clone(), + seed: *seed, }; proof_scheme diff --git a/storage-provider/server/src/storage.rs b/storage-provider/server/src/storage.rs index 359b440f6..89a0324d1 100644 --- a/storage-provider/server/src/storage.rs +++ b/storage-provider/server/src/storage.rs @@ -8,8 +8,9 @@ use axum::{ routing::{get, put}, Router, }; +use cid::Cid; use futures::{TryFutureExt, TryStreamExt}; -use mater::Cid; +use mater::{create_filestore, WrapMode}; use polka_storage_proofs::ZeroPaddingReader; use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError}; use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof}; @@ -117,7 +118,10 @@ fn configure_router(state: Arc) -> Router { #[cfg(not(feature = "delia"))] fn config_non_delia(state: Arc) -> Router { Router::new() - .route("/upload/:cid", put(upload)) + .route( + "/upload/:cid", + put(upload as fn(State>, Path, Request) -> _) + ) .route("/download/:cid", get(download)) .with_state(state) .layer( @@ -161,9 +165,9 @@ fn configure_router(state: Arc) -> Router { /// ``` #[tracing::instrument(skip_all, fields(cid))] async fn upload( - ref s @ State(ref state): State>, + State(state): State>, Path(cid): Path, - request: Request, + request: Request, ) -> Result { let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { tracing::error!(cid, "failed to parse cid"); @@ -171,7 +175,7 @@ async fn upload( })?; let deal_db_conn = state.deal_db.clone(); - // If the deal hasn't been accepted, reject the upload + // If the deal hasn't been accepted, reject the upload let proposed_deal = // Move the fetch to the blocking pool since the RocksDB API is sync tokio::task::spawn_blocking(move || match deal_db_conn.get_proposed_deal(deal_cid) { @@ -194,20 +198,27 @@ async fn upload( // Branching needed here since the resulting `StreamReader`s don't have the same type let file_cid = if request.headers().contains_key("Content-Type") { - // Handle multipart forms - let mut multipart = Multipart::from_request(request, &s) + // Handle the multipart data + let mut multipart = Multipart::from_request(request, &state) .await .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - let Some(field) = multipart + + // Get the field and read it entirely into memory + let field = multipart .next_field() - .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string())) - .await? - else { - return Err((StatusCode::BAD_REQUEST, "empty request".to_string())); - }; + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? + .ok_or((StatusCode::BAD_REQUEST, "empty request".to_string()))?; - let field_reader = StreamReader::new(field.map_err(std::io::Error::other)); - stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader) + // Read the entire field into memory + let bytes = field + .bytes() + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; + + // Create a cursor over the bytes which implements AsyncRead + let reader = std::io::Cursor::new(bytes); + stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), reader) .await .map_err(|err| { tracing::error!(%err, "failed to store file into CAR archive"); @@ -373,13 +384,12 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { (name, path) } -/// Reads bytes from the source and writes them to a CAR file. async fn stream_contents_to_car( folder: &std::path::Path, source: R, ) -> Result> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { // Temp file which will be used to store the CAR file content. The temp // director has a randomized name and is created in the same folder as the @@ -391,7 +401,14 @@ where // Stream the body from source to the temp file. let file = File::create(&temp_file_path).await?; let writer = BufWriter::new(file); - let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; + + let config = mater::Config::Balanced { + chunk_size: 256 * 1024, + tree_width: 174, + wrap_mode: WrapMode::UnixFS, + }; + + let cid = create_filestore(source, writer, config).await?; tracing::trace!("finished writing the CAR archive"); // If the file is successfully written, we can now move it to the final