diff --git a/.rustfmt.toml b/.rustfmt.toml index 6380de253..a4527b7fc 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,3 +1,5 @@ edition = "2021" group_imports = "StdExternalCrate" +hard_tabs = false imports_granularity = "Crate" +tab_spaces = 4 diff --git a/Cargo.lock b/Cargo.lock index dee871c78..45420a625 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -751,6 +751,28 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite 0.2.14", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1220,6 +1242,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cbor4ii" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.0.97" @@ -4424,6 +4455,16 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "integer-encoding" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf" +dependencies = [ + "async-trait", + "tokio", +] + [[package]] name = "integer-sqrt" version = "0.1.5" @@ -4462,6 +4503,29 @@ dependencies = [ "winreg", ] +[[package]] +name = "ipld-core" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b" +dependencies = [ + "cid 0.11.1", + "serde", + "serde_bytes", +] + +[[package]] +name = "ipld-dagpb" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5eb3c08f1508fa62ffe1a805aafa116d2eb034b3542c85596db132f279abc47" +dependencies = [ + "bytes", + "ipld-core", + "quick-protobuf", + "thiserror", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -5511,6 +5575,31 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "mater" +version = "0.1.0" +dependencies = [ + "async-stream", + "bitflags 2.5.0", + "byteorder", + "bytes", + "digest 0.10.7", + "futures", + "indexmap 2.2.6", + "integer-encoding 4.0.0", + "ipld-core", + "ipld-dagpb", + "quick-protobuf", + "rand 0.8.5", + "serde", + "serde_ipld_dagcbor", + "sha2 0.10.8", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -11705,6 +11794,18 @@ dependencies = [ "syn 2.0.61", ] +[[package]] +name = "serde_ipld_dagcbor" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493" +dependencies = [ + "cbor4ii", + "ipld-core", + "scopeguard", + "serde", +] + [[package]] name = "serde_json" version = "1.0.117" @@ -13473,7 +13574,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e" dependencies = [ "byteorder", - "integer-encoding", + "integer-encoding 3.0.4", "log", "ordered-float", "threadpool", diff --git a/Cargo.toml b/Cargo.toml index 947942c1f..6ec3cc234 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ license-file = "LICENSE" repository = "https://github.com/eigerco/polka-storage" [workspace] -members = ["node", "runtime", "storage/polka-index"] +members = ["node", "runtime", "storage/mater", "storage/polka-index"] resolver = "2" # FIXME(#@jmg-duarte,#7,14/5/24): remove the patch once something >1.11.0 is released @@ -28,30 +28,47 @@ panic = 'abort' # Use abort on panic to reduce binary size substrate-build-script-utils = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" } +async-stream = "0.3.5" +bitflags = "2.5.0" +byteorder = "1.5.0" +bytes = "1.6.0" ciborium = "0.2.2" cid = { version = "0.11.1" } clap = { version = "4.5.3" } codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false } color-print = "0.3.4" +digest = "0.10.7" futures = "0.3.28" hex-literal = { version = "0.4.1" } +indexmap = "2.2.6" +integer-encoding = "4.0.0" +ipld-core = "0.4.1" +ipld-dagpb = "0.2.1" +itertools = "0.13.0" jsonrpsee = { version = "0.22" } log = { version = "0.4.21", default-features = false } polkavm = "0.9.3" polkavm-derive = "0.9.1" polkavm-linker = "0.9.2" +quick-protobuf = "0.8.1" quote = { version = "1.0.33" } +rand = "0.8.5" rocksdb = { version = "0.21" } scale-info = { version = "2.11.1", default-features = false } serde = { version = "1.0.197", default-features = false } serde-big-array = { version = "0.3.2" } serde_derive = { version = "1.0.117" } +serde_ipld_dagcbor = "0.6.1" serde_json = { version = "1.0.114", default-features = false } serde_yaml = { version = "0.9" } +sha2 = "0.10.8" smallvec = "1.11.0" syn = { version = "2.0.53" } tempfile = "3.10.1" thiserror = { version = "1.0.48" } +tokio = "1.37.0" +tokio-stream = "0.1.15" +tokio-util = "0.7.11" tracing-subscriber = { version = "0.3.18" } # Local diff --git a/rustfmt.toml b/rustfmt.toml deleted file mode 100644 index 83f0002f0..000000000 --- a/rustfmt.toml +++ /dev/null @@ -1,2 +0,0 @@ -hard_tabs = false -tab_spaces = 4 diff --git a/storage/mater/Cargo.toml b/storage/mater/Cargo.toml new file mode 100644 index 000000000..741244cd0 --- /dev/null +++ b/storage/mater/Cargo.toml @@ -0,0 +1,35 @@ +[package] +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license-file.workspace = true +name = "mater" # name WIP +repository.workspace = true +version = "0.1.0" + +[dependencies] +async-stream.workspace = true +bitflags.workspace = true +byteorder = { workspace = true, features = ["i128"] } +bytes.workspace = true +digest.workspace = true +futures.workspace = true +indexmap.workspace = true +integer-encoding = { workspace = true, features = ["tokio_async"] } +ipld-core = { workspace = true, features = ["serde"] } +ipld-dagpb.workspace = true +quick-protobuf.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_ipld_dagcbor.workspace = true +sha2.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["fs", "macros", "rt"] } +tokio-stream.workspace = true +tokio-util = { workspace = true, features = ["io"] } + +[dev-dependencies] +rand.workspace = true + + +[lints] +workspace = true diff --git a/storage/mater/README.md b/storage/mater/README.md new file mode 100644 index 000000000..588d4cfe7 --- /dev/null +++ b/storage/mater/README.md @@ -0,0 +1,29 @@ +# Mater + +A Rust library to read and write CAR files. + +This library is based on [beetle](https://github.com/n0-computer/beetle). + +## Specifications + +CARv1 specification: https://ipld.io/specs/transport/car/carv1/ +CARv2 specification: https://ipld.io/specs/transport/car/carv2/ +UnixFS specification: https://github.com/ipfs/specs/blob/e4e5754ad4a4bfbb2ebe63f4c27631f573703de0/UNIXFS.md + +## Developing + +### Overview + +This crate is composed of three main modules: + +- `unixfs/` — which covers the main UnixFS abstractions +- `v1/` — which contains the CARv1 implementation and respective abstractions +- `v2/` — which contains the CARv2 implementation and respective abstractions + +### Further notes + +The [`unixfs_pb.rs`](src/unixfs/unixfs_pb.rs) was automatically generated using +[`pb-rs`](https://github.com/tafia/quick-protobuf/tree/master/pb-rs). +The file was generated and checked-in instead of making `pb-rs` part of the build +because the definition file ([`unixfs.proto`](src/unixfs/unixfs.proto)) does not +change frequently, hence, there is no need to add complexity to the build process. diff --git a/storage/mater/src/blockstore.rs b/storage/mater/src/blockstore.rs new file mode 100644 index 000000000..024235ab5 --- /dev/null +++ b/storage/mater/src/blockstore.rs @@ -0,0 +1,379 @@ +// NOTE(@jmg-duarte,28/05/2024): the blockstore can (and should) evolve to support other backends. +// At the time of writing, there is no need invest more time in it because the current PR(#25) is delayed enough. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use indexmap::IndexMap; +use integer_encoding::VarInt; +use ipld_core::cid::Cid; +use sha2::{Digest, Sha256}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; + +use crate::{ + multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, + Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, +}; + +/// The default block size, as defined in +/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). +const DEFAULT_BLOCK_SIZE: usize = 1024 * 256; + +/// The default tree width, also called links per block, as defined in +/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30). +const DEFAULT_TREE_WIDTH: usize = 174; + +/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory. +/// +/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children. +/// You can visualize the underlying tree in , using the "Balanced DAG" layout. +/// +/// It is necessary to keep the blocks somewhere before writing them to a file since the CARv2 header +/// has data size, index offset and indexes fields, all these requiring information that only becomes +/// "available" after you process all the blocks. +/// +/// The store keeps track of ([`Cid`], [`Bytes`]) pairs, performing de-duplication based on the [`Cid`]. +/// +/// **Important note: currently, the blockstore only supports a single file!** +pub struct Blockstore { + root: Option, + blocks: IndexMap, + indexed: HashSet, + + chunk_size: usize, + tree_width: usize, +} + +impl Blockstore { + /// The size of the [`Header`] when encoded using [`DagCborCodec`]. + /// + /// The formula is: `overhead + 37 * roots.len()`. + /// It is based on reversing the CBOR encoding, see an example: + /// ```text + /// A2 # map(2) + /// 65 # text(5) + /// 726F6F7473 # "roots" + /// 81 # array(1) + /// D8 2A # tag(42) + /// 58 25 # bytes(37) + /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B + /// 67 # text(7) + /// 76657273696F6E # "version" + /// 01 # unsigned(1) + /// ``` + /// In this case we're always doing a single root, so we just use the fixed size: 58 + /// + /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice. + /// We can cache it, but for now, this should be better. + const V1_HEADER_OVERHEAD: u64 = 58; + + /// Construct a new [`Blockstore`], using the default parameters. + pub fn new() -> Self { + Default::default() + } + + /// Construct a new [`Blockstore`], using custom parameters. + /// If set to `None`, the corresponding default value will be used. + pub fn with_parameters(chunk_size: Option, tree_width: Option) -> Self { + // NOTE(@jmg-duarte,28/05/2024): once the time comes, this method should probably be replaced with a builder + Self { + root: None, + blocks: IndexMap::new(), + indexed: HashSet::new(), + chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), + tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), + } + } + + /// Fully read the contents of an arbitrary `reader` into the [`Blockstore`], + /// converting the contents into a CARv2 file. + pub async fn read(&mut self, reader: R) -> Result<(), Error> + where + R: AsyncRead + Unpin + Send, + { + let chunks = ReaderStream::with_capacity(reader, self.chunk_size); + + // The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird + // but it has to do with two things: + // - The fact that the stream can be self-referential: + // https://users.rust-lang.org/t/why-is-pin-mut-needed-for-iteration-of-async-stream/51107 + // - Using a tokio_stream::Peekable instead of futures::Peekable, they differ on who is required to be pinned + // - tokio_stream::Peekable::peek(&mut self) + // https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37 + // - futures::Peekable::peek(self: Pin<&mut Self>) + // https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40 + let tree = stream_balanced_tree(chunks, self.tree_width); + tokio::pin!(tree); + let mut tree = tree.peekable(); + + while let Some(block) = tree.next().await { + let (cid, bytes) = block?; + self.insert(cid, bytes, true); + + // If the stream is exhausted, we know the current block is the root + if tree.peek().await.is_none() { + // The root should always be indexed, there's no official spec saying it should though, it just makes sense. + // So, if the insert line is changed, the root should be placed in the `indexed` structure here + self.root = Some(cid); + } + } + + Ok(()) + } + + /// Write the contents of the [`Blockstore`] as CARv2 to a writer. + pub async fn write(mut self, writer: W) -> Result + where + W: AsyncWrite + Unpin, + { + let mut position = 0; + + let mut writer = CarV2Writer::new(writer); + let header_v2 = self.header_v2(); + + // Writing the CARv1 starts where the CARv2 header ends + // this value is required for indexing, + // whose offset starts at the beginning of the CARv1 header + let car_v1_start = writer.write_header(&header_v2).await?; + position += car_v1_start; + + // CARv1 files are REQUIRED to have a root + let header_v1 = self + .root + .map(|root| CarV1Header::new(vec![root])) + .ok_or(Error::EmptyRootsError)?; + position += writer.write_v1_header(&header_v1).await?; + + let mut offsets = HashMap::new(); + for (cid, block) in self.blocks.drain(..) { + if self.indexed.contains(&cid) { + offsets.insert(cid, position - car_v1_start); + } + position += writer.write_block(&cid, &block).await?; + } + + let count = offsets.len() as u64; + let entries = offsets + .into_iter() + .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) + .collect(); + let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( + SHA_256_CODE, + SingleWidthIndex::new(Sha256::output_size() as u32, count, entries).into(), + )); + position += writer.write_index(&index).await?; + + Ok(position) + } + + /// Get the [`CarV2Header`] that will be written out. + fn header_v2(&self) -> CarV2Header { + let data_offset = CarV2Header::SIZE as u64; + let data_size: u64 = self + .blocks + .iter() + .map(|(cid, bytes)| { + let size = (cid.encoded_len() + bytes.len()) as u64; + let varint_size = size.required_space() as u64; + size + varint_size + }) + .sum(); + + let header_v1_varint = Self::V1_HEADER_OVERHEAD.required_space() as u64; + let car_v1_payload_length = Self::V1_HEADER_OVERHEAD + header_v1_varint + data_size; + + // If there is padding, this does not apply, however, the go-car tool doesn't seem to ever add padding + let index_offset = data_offset + car_v1_payload_length; + + // NOTE(@jmg-duarte,28/05/2024): the `fully_indexed` field is currently set to `false` as the + // go-car tool doesn't seem to ever set it, however, according to the written definition we have from the spec + // we're performing full indexing, as all blocks are inserted with `index: true`. + CarV2Header::new(false, data_offset, car_v1_payload_length, index_offset) + } + + /// Insert a new block into the [`Blockstore`]. + /// + /// If the [`Cid`] has been previously inserted, this function is a no-op. + fn insert(&mut self, cid: Cid, data: Bytes, index: bool) { + if !self.blocks.contains_key(&cid) { + self.blocks.insert_full(cid, data); + if index { + self.indexed.insert(cid); + } + } + } +} + +impl Default for Blockstore { + fn default() -> Self { + Self { + root: None, + blocks: IndexMap::new(), + indexed: HashSet::new(), + chunk_size: DEFAULT_BLOCK_SIZE, + tree_width: DEFAULT_TREE_WIDTH, + } + } +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, str::FromStr}; + + use ipld_core::{cid::Cid, codec::Codec}; + use ipld_dagpb::{DagPbCodec, PbNode}; + use sha2::{Digest, Sha256}; + use tokio::fs::File; + + use crate::{ + blockstore::Blockstore, + multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, + test_utils::assert_buffer_eq, + CarV2Header, CarV2Reader, Index, + }; + + #[tokio::test] + async fn byte_eq_lorem() { + let file = File::open("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let mut store = Blockstore::new(); + store.read(file).await.unwrap(); + assert_eq!(store.blocks.len(), 1); + + let mut result_buffer = vec![]; + store.write(&mut result_buffer).await.unwrap(); + + let car_contents = tokio::fs::read("tests/fixtures/car_v2/lorem.car") + .await + .unwrap(); + + assert_buffer_eq!(&result_buffer, &car_contents); + } + + #[tokio::test] + async fn byte_eq_spaceglenda() { + let file = File::open("tests/fixtures/original/spaceglenda.jpg") + .await + .unwrap(); + let mut store = Blockstore::new(); + store.read(file).await.unwrap(); + assert_eq!(store.blocks.len(), 4); + + let mut result_buffer = vec![]; + store.write(&mut result_buffer).await.unwrap(); + + let car_contents = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + + assert_buffer_eq!(&result_buffer, &car_contents); + } + + #[tokio::test] + async fn dedup_lorem() { + let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") + .await + .unwrap(); + let mut store = Blockstore::with_parameters(Some(1024), None); + store.read(file).await.unwrap(); + // We're expecting there to exist a single data block and a root + assert_eq!(store.blocks.len(), 2); + } + + // We can't fully validate this test using go-car because they don't offer parametrization + #[tokio::test] + async fn dedup_lorem_roundtrip() { + let file = File::open("tests/fixtures/original/lorem_4096_dup.txt") + .await + .unwrap(); + let mut store = Blockstore::with_parameters(Some(1024), None); + store.read(file).await.unwrap(); + // We're expecting there to exist a single data block and a root + assert_eq!(store.blocks.len(), 2); + + let mut result_buffer = vec![]; + store.write(&mut result_buffer).await.unwrap(); + + let mut cursor = Cursor::new(result_buffer); + std::io::Seek::rewind(&mut cursor).unwrap(); + let mut car_reader = CarV2Reader::new(cursor); + + car_reader.read_pragma().await.unwrap(); + + let car_v2_header = car_reader.read_header().await.unwrap(); + assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE as u64); + // Extracted with go-car and validated with an hex viewer + // to extract the values, run the following commands: + // $ car inspect + // The dump is necessary because go-car does not support parametrization + assert_eq!(car_v2_header.data_size, 1358); + assert_eq!( + car_v2_header.index_offset, + (CarV2Header::SIZE as u64) + 1358 + ); + + let car_v1_header = car_reader.read_v1_header().await.unwrap(); + assert_eq!(car_v1_header.roots.len(), 1); + + // Extracted with go-car + let root_cid = + Cid::from_str("bafybeiapxsorxw7yqywquebgmlz37nyjt44vxlskhx6wcgirkurojow7xu").unwrap(); + assert_eq!(car_v1_header.roots[0], root_cid); + + let original_1024 = tokio::fs::read("tests/fixtures/original/lorem_1024.txt") + .await + .unwrap(); + let (cid, data) = car_reader.read_block().await.unwrap(); + assert_buffer_eq!(&data, &original_1024); + let lorem_cid = Cid::new_v1(RAW_CODE, generate_multihash::(&original_1024)); + assert_eq!(cid, lorem_cid); + + let (cid, data) = car_reader.read_block().await.unwrap(); + let node: PbNode = DagPbCodec::decode_from_slice(&data).unwrap(); + assert_eq!(cid, root_cid); + + // There are 4 blocks of repeated 1024 bytes + assert_eq!(node.links.len(), 4); + + for pb_link in node.links { + assert_eq!(pb_link.cid, lorem_cid); + assert_eq!(pb_link.name, Some("".to_string())); + assert_eq!(pb_link.size, Some(1024)); + } + + let index = car_reader.read_index().await.unwrap(); + + match index { + Index::MultihashIndexSorted(index) => { + // There's only Sha256 + assert_eq!(index.0.len(), 1); + + let index_sorted = &index.0[&SHA_256_CODE]; + // There's only a single length + assert_eq!(index_sorted.0.len(), 1); + + let single_width_index = &index_sorted.0[0]; + assert_eq!(single_width_index.count, 2); + // Sha256 output size (32) + the offset size (8) + assert_eq!(single_width_index.width, Sha256::output_size() as u32 + 8); + assert_eq!(single_width_index.entries.len(), 2); + + // Sorting order is byte-wise, I extracted it manually + assert_eq!(single_width_index.entries[0].offset, 1121); + assert_eq!(single_width_index.entries[1].offset, 59); + assert_eq!( + single_width_index.entries[0].digest, + root_cid.hash().digest() + ); + assert_eq!( + single_width_index.entries[1].digest, + lorem_cid.hash().digest() + ); + } + Index::IndexSorted(_) => panic!("expected index to be MultihashIndexSorted"), + } + } +} diff --git a/storage/mater/src/lib.rs b/storage/mater/src/lib.rs new file mode 100644 index 000000000..ae3e793f7 --- /dev/null +++ b/storage/mater/src/lib.rs @@ -0,0 +1,132 @@ +//! A library to handle CAR files. +//! Both version 1 and version 2 are supported. +//! +//! You can make use of the lower-level utilities such as [`CarV2Reader`] to read a CARv2 file, +//! though these utilies were designed to be used in higher-level abstractions, like the [`Blockstore`]. + +#![warn(unused_crate_dependencies)] +#![warn(missing_docs)] +#![deny(rustdoc::broken_intra_doc_links)] +#![deny(rustdoc::private_intra_doc_links)] +#![deny(unsafe_code)] + +mod blockstore; +mod multicodec; +mod unixfs; +mod v1; +mod v2; + +pub use blockstore::Blockstore; +// We need to expose this because `read_block` returns `(Cid, Vec)`. +pub use ipld_core::cid::Cid; +pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; +pub use v2::{ + Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, MultihashIndexSorted, + Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer, +}; + +/// CAR handling errors. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Returned when a version was expected, but another was received. + /// + /// For example, when reading CARv1 files, the only valid version is 1, + /// otherwise, this error should be returned. + #[error("expected version {expected}, but received version {received} instead")] + VersionMismatchError { + /// Expected version (usually 1 or 2) + expected: u8, + /// Received version + received: u8, + }, + + /// According to the [specification](https://ipld.io/specs/transport/car/carv1/#constraints) + /// CAR files MUST have **one or more** [`Cid`] roots. + #[error("CAR file must have roots")] + EmptyRootsError, + + /// Unknown type of index. Supported indexes are + /// [`IndexSorted`] and [`MultihashIndexSorted`]. + #[error("unknown index type {0}")] + UnknownIndexError(u64), + + /// Digest does not match the expected length. + #[error("digest has length {received}, instead of {expected}")] + NonMatchingDigestError { + /// Expected digest length + expected: usize, + /// Received digest length + received: usize, + }, + + /// Cannot know width or count from an empty vector. + #[error("cannot create an index out of an empty `Vec`")] + EmptyIndexError, + + /// The [specification](https://ipld.io/specs/transport/car/carv2/#characteristics) + /// does not discuss how to handle unknown characteristics + /// — i.e. if we should ignore them, truncate them or return an error — + /// we decided to return an error when there are unknown bits set. + #[error("unknown characteristics were set: {0}")] + UnknownCharacteristicsError(u128), + + /// According to the [specification](https://ipld.io/specs/transport/car/carv2/#pragma) + /// the pragma is composed of a pre-defined list of bytes, + /// if the received pragma is not the same, we return an error. + #[error("received an invalid pragma: {0:?}")] + InvalidPragmaError(Vec), + + /// See [`CodecError`](serde_ipld_dagcbor::error::CodecError) for more information. + #[error(transparent)] + CodecError(#[from] serde_ipld_dagcbor::error::CodecError), + + /// See [`IoError`](tokio::io::Error) for more information. + #[error(transparent)] + IoError(#[from] tokio::io::Error), + + /// See [`CidError`](ipld_core::cid::Error) for more information. + #[error(transparent)] + CidError(#[from] ipld_core::cid::Error), + + /// See [`MultihashError`](ipld_core::cid::multihash::Error) for more information. + #[error(transparent)] + MultihashError(#[from] ipld_core::cid::multihash::Error), + + /// See [`ProtobufError`](quick_protobuf::Error) for more information. + #[error(transparent)] + ProtobufError(#[from] quick_protobuf::Error), + + /// See [`DagPbError`](ipld_dagpb::Error) for more information. + #[error(transparent)] + DagPbError(#[from] ipld_dagpb::Error), +} + +#[cfg(test)] +pub(crate) mod test_utils { + /// Check if two given slices are equal. + /// + /// First checks if the two slices have the same size, + /// then checks each byte-pair. If the slices differ, + /// it will show an error message with the difference index + /// along with a window showing surrounding elements + /// (instead of spamming your terminal like `assert_eq!` does). + macro_rules! assert_buffer_eq { + ($left:expr, $right:expr $(,)?) => {{ + assert_eq!($left.len(), $right.len()); + for (i, (l, r)) in $left.iter().zip($right).enumerate() { + let before = i.checked_sub(5).unwrap_or(0); + let after = (i + 5).min($right.len()); + assert_eq!( + l, + r, + "difference at index {}\n left: {:02x?}\n right: {:02x?}", + i, + &$left[before..=after], + &$right[before..=after], + ) + } + }}; + } + + pub(crate) use assert_buffer_eq; +} diff --git a/storage/mater/src/multicodec.rs b/storage/mater/src/multicodec.rs new file mode 100644 index 000000000..34a37f4fc --- /dev/null +++ b/storage/mater/src/multicodec.rs @@ -0,0 +1,37 @@ +//! Multicodec utilities, such as the list of codes, +//! as per the [code table](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv). + +use digest::Digest; +use ipld_core::cid::multihash::Multihash; + +pub const SHA_256_CODE: u64 = 0x12; +pub const SHA_512_CODE: u64 = 0x13; +pub const RAW_CODE: u64 = 0x55; +pub const DAG_PB_CODE: u64 = 0x70; + +/// Trait to ease implementing generic multihash generation. +pub(crate) trait MultihashCode { + /// Multihash code as defined in the [specification](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv). + const CODE: u64; +} + +impl MultihashCode for sha2::Sha256 { + const CODE: u64 = SHA_256_CODE; +} + +impl MultihashCode for sha2::Sha512 { + const CODE: u64 = SHA_512_CODE; +} + +/// Generate a multihash for a byte slice. +pub(crate) fn generate_multihash(bytes: B) -> Multihash<64> +where + H: Digest + MultihashCode, + B: AsRef<[u8]>, +{ + let mut hasher = H::new(); + hasher.update(bytes.as_ref()); + let hashed_bytes = hasher.finalize(); + Multihash::wrap(H::CODE, &hashed_bytes) + .expect("the digest should be valid (enforced by the type system)") +} diff --git a/storage/mater/src/unixfs/mod.rs b/storage/mater/src/unixfs/mod.rs new file mode 100644 index 000000000..963701c02 --- /dev/null +++ b/storage/mater/src/unixfs/mod.rs @@ -0,0 +1,599 @@ +//! The original implementation of this module is located at +//! . + +mod unixfs_pb; + +use std::collections::VecDeque; + +use async_stream::try_stream; +use bytes::Bytes; +use futures::TryStreamExt; +use ipld_core::{cid::Cid, codec::Codec}; +use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; +use quick_protobuf::MessageWrite; +use sha2::Sha256; +use tokio_stream::{Stream, StreamExt}; + +use crate::{ + multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, + Error, +}; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct LinkInfo { + raw_data_length: u64, + encoded_data_length: u64, +} + +impl LinkInfo { + fn new(raw_data_length: u64, encoded_data_length: u64) -> Self { + Self { + raw_data_length, + encoded_data_length, + } + } +} + +#[derive(Debug)] +enum TreeNode { + Leaf(Bytes), + Stem(Vec<(Cid, LinkInfo)>), +} + +impl TreeNode { + fn encode(self) -> Result<((Cid, Bytes), LinkInfo), Error> { + match self { + TreeNode::Leaf(bytes) => { + let data_length = bytes.len() as u64; + let multihash = generate_multihash::(&bytes); + // Storing the block as RAW as go-car does + // https://github.com/ipfs/go-unixfsnode/blob/c41f115d06cff90e0cbc634da5073b4c1447af09/data/builder/file.go#L54-L63 + let cid = Cid::new_v1(RAW_CODE, multihash); + let block = (cid, bytes); + // The data is raw, so the raw length == encoded length + let link_info = LinkInfo::new(data_length, data_length); + Ok((block, link_info)) + } + TreeNode::Stem(links) => { + let mut encoded_length: u64 = + links.iter().map(|(_, l)| l.encoded_data_length).sum(); + let blocksizes: Vec<_> = links.iter().map(|(_, l)| l.raw_data_length).collect(); + let filesize: u64 = blocksizes.iter().sum(); + let pb_links: Vec<_> = links + .into_iter() + .map(|(cid, link)| PbLink { + cid, + // Having an empty name makes it compliant with go-car + name: Some("".to_string()), + size: Some(link.encoded_data_length), + }) + .collect(); + + let pb_node_data = unixfs_pb::Data { + Type: unixfs_pb::mod_Data::DataType::File, + filesize: Some(filesize), + blocksizes, + ..Default::default() + }; + let mut pb_node_data_bytes = vec![]; + let mut pb_node_data_writer = quick_protobuf::Writer::new(&mut pb_node_data_bytes); + pb_node_data.write_message(&mut pb_node_data_writer)?; + let pb_node_data_length = pb_node_data_bytes.len() as u64; + + let pb_node = PbNode { + links: pb_links, + data: Some(pb_node_data_bytes.into()), + }; + + let outer = DagPbCodec::encode_to_vec(&pb_node)?; + let cid = Cid::new_v1(DAG_PB_CODE, generate_multihash::(&outer)); + encoded_length += outer.len() as u64; + + Ok(( + // NOTE(@jmg-duarte,28/05/2024): In the original implementation + // they have a `Block` structure that contains the child links, + // we're not currently using them and as such I didn't include them + (cid, outer.into()), + LinkInfo { + raw_data_length: pb_node_data_length, + encoded_data_length: encoded_length, + }, + )) + } + } + } +} + +/// This function takes a stream of chunks of bytes and returns a stream of [`Block`]s. +/// +/// It works by accumulating `width` blocks and lazily creating stems. +/// The tree grows upwards and does not keep previously completed `width` blocks. +/// +/// As a demonstration, consider a `width` of 2 and an `input` stream that will yield 7 blocks. +/// ```text +/// Input stream <- Block 1, Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 +/// ``` +/// +/// Each time a block is taken out of the stream, it is stored in the lower level of the tree, +/// but it is also yielded as output: +/// ```text +/// Input stream <- Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 +/// Tree: [ +/// [Block 1] +/// ] +/// Output stream -> Block 1 +/// ``` +/// +/// Once the first `width` blocks (in this case, 2) are taken from the stream: +/// * A new stem is added, linking back to the two blocks +/// ```text +/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | +/// Tree: [ +/// [Block 1, Block 2], +/// [Stem (B1, B2)] +/// ] +/// ``` +/// * The previous level to the stem is evicted +/// ```text +/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | +/// Tree: [ +/// [], +/// [Stem 1 (B1, B2)] +/// ] +/// ``` +/// * The new stem is yielded +/// ```text +/// Input stream <- Block 3, Block 4, Block 5, Block 6, Block 7 +/// Tree: [ +/// [], +/// [Stem 1 (B1, B2)] +/// ] +/// Output stream -> Stem (B1, B2) +/// ``` +/// +/// This process happens recursively, so when the stem level is full, like so: +/// ```text +/// Input stream <- Block 5, Block 6, Block 7 +/// Tree: [ +/// [], +/// [Stem 1 (B1, B2), Stem 2 (B3, B4)] +/// ] +/// ``` +/// +/// A new stem is built upwards: +/// ```text +/// Input stream <- Block 5, Block 6, Block 7 +/// Tree: [ +/// [], +/// [], +/// [Stem 3 (S1, S2)] +/// ] +/// Output stream -> Stem 3 (S1, S2) +/// ``` +/// +/// Once the stream is exhausted, we need to clean up any remaining state: +/// ```text +/// Input stream <- +/// Tree: [ +/// [Block 7], +/// [Stem 4 (B5, B6)], +/// [Stem 3 (S1, S2)], +/// ] +/// ``` +/// +/// In this case, the yielded tree looks like: +/// ```text +/// S3 +/// / \ +/// S1 S2 S4 +/// / \ / \ / \ +/// B1 B2 B3 B4 B5 B6 B7 +/// ``` +/// +/// We work bottom-up, removing the levels one by one, creating new stems from them and returning the stems: +/// ```text +/// Tree: [ +/// [], # popped +/// [Stem 4 (B5, B6), Stem 5 (B7)], +/// [Stem 3 (S1, S2)] +/// ] +/// Output stream -> Stem 5 (B7) +/// ``` +/// +/// The previous tree now looks like: +/// ```text +/// S3 +/// / \ +/// S1 S2 S4 S5 +/// / \ / \ / \ | +/// B1 B2 B3 B4 B5 B6 B7 +/// ``` +/// +/// If we repeat the process again: +/// ```text +/// Tree: [ +/// [Stem 4 (B5, B6), Stem 5 (B7)], # popped +/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] +/// ] +/// Output stream -> Stem 6 (S4, S5) +/// ``` +/// +/// The tree becomes: +/// ```text +/// S3 S6 +/// / \ / \ +/// S1 S2 S4 S5 +/// / \ / \ / \ | +/// B1 B2 B3 B4 B5 B6 B7 +/// ``` +/// +/// And finally, we build the last stem, yielding it: +/// ```text +/// Tree: [ +/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] # popped +/// ] +/// Output stream -> Stem 7 (S3, S6) +/// ``` +/// +/// Making the final tree: +/// ```text +/// S7 +/// / \ +/// S3 S6 +/// / \ / \ +/// S1 S2 S4 S5 +/// / \ / \ / \ | +/// B1 B2 B3 B4 B5 B6 B7 +/// ``` +/// +/// The original implementation is in +/// . +pub(crate) fn stream_balanced_tree( + input: I, + width: usize, +) -> impl Stream> +where + I: Stream> + Send, +{ + try_stream! { + let mut tree: VecDeque> = VecDeque::new(); + tree.push_back(vec![]); + + let input = input + .err_into::() + // The TreeNode::Leaf(data).encode() just wraps it with a Cid marking the payload as Raw + // we may be able move this responsibility to the caller for more efficient memory usage + .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode())) + .err_into::(); + tokio::pin!(input); + + while let Some(data) = input.next().await { + let (block @ (cid, _), link_info) = data?; + let tree_height = tree.len(); + + // Check if the leaf node is full + // i.e. we can build a new stem + if tree[0].len() == width { + // Go up the tree, as adding a new stem + // may complete another level and so on + for level in 0..tree_height { + // If a node is not full, stop there + // no more "stem-ing" to be done + if tree[level].len() < width { + break; + } + + // If we're at the top of the tree, we're going to need another level. + if level == tree_height - 1 { + tree.push_back(Vec::with_capacity(width)); + } + + // Replace the previous level elements with a new empty vector + // while `tree[level].drain().collect>` is much more readable + // it's most likely less performant (I didn't measure) + // due to the different nature of the approaches (batch vs iterator) + let links = std::mem::replace(&mut tree[level], Vec::with_capacity(width)); + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + yield block; + + tree[level + 1].push((cid, link_info)); + } + // Once we're done "trimming" the tree + // it's good to receive new elements + } + + // If the tree level is empty, we can push, + // if the tree level was not empty, the `for` took care of it + tree[0].push((cid, link_info)); + yield block; + } + + // If `input` yielded a single block, + // the tree has height 1 and the lower level has a single element + if tree.len() == 1 && tree[0].len() == 1 { + return; + } + + // Once `input` is exhausted, we need to perform cleanup of any leftovers, + // to do so, we start by popping levels from the front and building stems over them. + while let Some(links) = tree.pop_front() { + let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; + yield block; + + // If there's still a level in the front, it means the stem we just built will have a parent + // we push the stem into the front level so we can build the parent on the next iteration + if let Some(front) = tree.front_mut() { + front.push((cid, link_info)); + } + // Once there's nothing else in the front, that means we just yielded the root + // and the current `while` will stop in the next iteration + } + } +} + +#[cfg(test)] +mod tests { + //! Tests were taken from [beetle][beetle] too, I did modify them to suit our needs. + //! In certain places, I made them check for byte equality as its way simpler + //! and there's enough tests around the repo to ensure that if the underlying + //! bytes are equal, the expected block sizes are as well. + //! + //! We also didn't write our own chunker, relying on [`tokio_util::io::ReadStream`] instead. + //! + //! [beetle]: https://github.com/n0-computer/beetle/blob/3e137cb2bc18e1d458c3f72d5e817b03d9537d5d/iroh-unixfs/src/balanced_tree.rs#L234-L507 + + use bytes::BytesMut; + use futures::StreamExt; + + use super::*; + + fn test_chunk_stream(num_chunks: usize) -> impl Stream> { + futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into()))) + } + + async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec> { + let chunks = test_chunk_stream(num_chunks); + tokio::pin!(chunks); + let mut tree = vec![vec![]]; + let mut links = vec![vec![]]; + + if num_chunks / degree == 0 { + let chunk = chunks.next().await.unwrap().unwrap(); + let leaf = TreeNode::Leaf(chunk); + let (block, _) = leaf.encode().unwrap(); + tree[0].push(block); + return tree; + } + + while let Some(chunk) = chunks.next().await { + let chunk = chunk.unwrap(); + let leaf = TreeNode::Leaf(chunk); + let (block @ (cid, _), link_info) = leaf.encode().unwrap(); + links[0].push((cid, link_info)); + tree[0].push(block); + } + + while tree.last().unwrap().len() > 1 { + let prev_layer = links.last().unwrap(); + let count = prev_layer.len() / degree; + let mut tree_layer = Vec::with_capacity(count); + let mut links_layer = Vec::with_capacity(count); + for links in prev_layer.chunks(degree) { + let stem = TreeNode::Stem(links.to_vec()); + let (block @ (cid, _), link_info) = stem.encode().unwrap(); + links_layer.push((cid, link_info)); + tree_layer.push(block); + } + tree.push(tree_layer); + links.push(links_layer); + } + tree + } + + async fn build_expect_vec_from_tree( + tree: Vec>, + num_chunks: usize, + degree: usize, + ) -> Vec<(Cid, Bytes)> { + let mut out = vec![]; + + if num_chunks == 1 { + out.push(tree[0][0].clone()); + return out; + } + + let mut counts = vec![0; tree.len()]; + + for leaf in tree[0].iter() { + out.push(leaf.clone()); + counts[0] += 1; + let mut push = counts[0] % degree == 0; + for (num_layer, count) in counts.iter_mut().enumerate() { + if num_layer == 0 { + continue; + } + if !push { + break; + } + out.push(tree[num_layer][*count].clone()); + *count += 1; + if *count % degree != 0 { + push = false; + } + } + } + + for (num_layer, count) in counts.into_iter().enumerate() { + if num_layer == 0 { + continue; + } + let layer = tree[num_layer].clone(); + for node in layer.into_iter().skip(count) { + out.push(node); + } + } + + out + } + + async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Bytes)> { + let tree = build_expect_tree(num_chunks, degree).await; + println!("{tree:?}"); + build_expect_vec_from_tree(tree, num_chunks, degree).await + } + + fn make_leaf(data: usize) -> ((Cid, Bytes), LinkInfo) { + TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) + .encode() + .unwrap() + } + + fn make_stem(links: Vec<(Cid, LinkInfo)>) -> ((Cid, Bytes), LinkInfo) { + TreeNode::Stem(links).encode().unwrap() + } + + #[tokio::test] + async fn test_build_expect() { + // manually build tree made of 7 chunks (11 total nodes) + let (leaf_0, len_0) = make_leaf(0); + let (leaf_1, len_1) = make_leaf(1); + let (leaf_2, len_2) = make_leaf(2); + let (stem_0, stem_len_0) = make_stem(vec![ + (leaf_0.0, len_0), + (leaf_1.0, len_1), + (leaf_2.0, len_2), + ]); + let (leaf_3, len_3) = make_leaf(3); + let (leaf_4, len_4) = make_leaf(4); + let (leaf_5, len_5) = make_leaf(5); + let (stem_1, stem_len_1) = make_stem(vec![ + (leaf_3.0, len_3), + (leaf_4.0, len_4), + (leaf_5.0, len_5), + ]); + let (leaf_6, len_6) = make_leaf(6); + let (stem_2, stem_len_2) = make_stem(vec![(leaf_6.0, len_6)]); + let (root, _root_len) = make_stem(vec![ + (stem_0.0, stem_len_0), + (stem_1.0, stem_len_1), + (stem_2.0, stem_len_2), + ]); + + let expect_tree = vec![ + vec![ + leaf_0.clone(), + leaf_1.clone(), + leaf_2.clone(), + leaf_3.clone(), + leaf_4.clone(), + leaf_5.clone(), + leaf_6.clone(), + ], + vec![stem_0.clone(), stem_1.clone(), stem_2.clone()], + vec![root.clone()], + ]; + let got_tree = build_expect_tree(7, 3).await; + assert_eq!(expect_tree, got_tree); + + let expect_vec = vec![ + leaf_0, leaf_1, leaf_2, stem_0, leaf_3, leaf_4, leaf_5, stem_1, leaf_6, stem_2, root, + ]; + let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await; + assert_eq!(expect_vec, got_vec); + } + + async fn ensure_equal( + expect: Vec<(Cid, Bytes)>, + got: impl Stream>, + ) { + let mut i = 0; + tokio::pin!(got); + while let Some(node) = got.next().await { + let (expect_cid, expect_bytes) = expect + .get(i) + .expect("too many nodes in balanced tree stream") + .clone(); + let (got_cid, got_bytes) = node.expect("unexpected error in balanced tree stream"); + println!("node index {i}"); + assert_eq!(expect_cid, got_cid); + assert_eq!(expect_bytes, got_bytes); + i += 1; + } + if expect.len() != i { + panic!( + "expected at {} nodes of the stream, got {}", + expect.len(), + i + ); + } + } + + #[tokio::test] + async fn balanced_tree_test_leaf() { + let num_chunks = 1; + let expect = build_expect(num_chunks, 3).await; + let got = stream_balanced_tree(test_chunk_stream(1), 3); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_one() { + let num_chunks = 3; + let degrees = 3; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_two_full() { + let degrees = 3; + let num_chunks = 9; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_two_not_full() { + let degrees = 3; + let num_chunks = 10; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_height_three() { + let num_chunks = 125; + let degrees = 5; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_large() { + let num_chunks = 780; + let degrees = 11; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } + + #[tokio::test] + async fn balanced_tree_test_lar() { + let num_chunks = 7; + let degrees = 2; + let expect = build_expect(num_chunks, degrees).await; + let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); + tokio::pin!(got); + ensure_equal(expect, got).await; + } +} diff --git a/storage/mater/src/unixfs/unixfs.proto b/storage/mater/src/unixfs/unixfs.proto new file mode 100644 index 000000000..fa1df2770 --- /dev/null +++ b/storage/mater/src/unixfs/unixfs.proto @@ -0,0 +1,30 @@ +// Taken from ipfs/boxo +// https://github.com/ipfs/boxo/blob/1bcd5451413c7ce3b1f647191109bcc2d307e584/ipld/unixfs/pb/unixfs.proto +syntax = "proto2"; + +package unixfs.v1.pb; + +option go_package = "unixfs_pb"; + +message Data { + enum DataType { + Raw = 0; + Directory = 1; + File = 2; + Metadata = 3; + Symlink = 4; + HAMTShard = 5; + } + + required DataType Type = 1; + optional bytes Data = 2; + optional uint64 filesize = 3; + repeated uint64 blocksizes = 4; + + optional uint64 hashType = 5; + optional uint64 fanout = 6; +} + +message Metadata { + optional string MimeType = 1; +} diff --git a/storage/mater/src/unixfs/unixfs_pb.rs b/storage/mater/src/unixfs/unixfs_pb.rs new file mode 100644 index 000000000..3667026a4 --- /dev/null +++ b/storage/mater/src/unixfs/unixfs_pb.rs @@ -0,0 +1,149 @@ +// Automatically generated rust module for 'unixfs.proto' file + +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(unused_imports)] +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt_skip)] + + +use std::borrow::Cow; +use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; +use quick_protobuf::sizeofs::*; +use super::super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Data<'a> { + pub Type: crate::unixfs::unixfs_pb::mod_Data::DataType, + pub Data: Option>, + pub filesize: Option, + pub blocksizes: Vec, + pub hashType: Option, + pub fanout: Option, +} + +impl<'a> MessageRead<'a> for Data<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.Type = r.read_enum(bytes)?, + Ok(18) => msg.Data = Some(r.read_bytes(bytes).map(Cow::Borrowed)?), + Ok(24) => msg.filesize = Some(r.read_uint64(bytes)?), + Ok(32) => msg.blocksizes.push(r.read_uint64(bytes)?), + Ok(40) => msg.hashType = Some(r.read_uint64(bytes)?), + Ok(48) => msg.fanout = Some(r.read_uint64(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Data<'a> { + fn get_size(&self) -> usize { + 0 + + 1 + sizeof_varint(*(&self.Type) as u64) + + self.Data.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.filesize.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.blocksizes.iter().map(|s| 1 + sizeof_varint(*(s) as u64)).sum::() + + self.hashType.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.fanout.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + w.write_with_tag(8, |w| w.write_enum(*&self.Type as i32))?; + if let Some(ref s) = self.Data { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.filesize { w.write_with_tag(24, |w| w.write_uint64(*s))?; } + for s in &self.blocksizes { w.write_with_tag(32, |w| w.write_uint64(*s))?; } + if let Some(ref s) = self.hashType { w.write_with_tag(40, |w| w.write_uint64(*s))?; } + if let Some(ref s) = self.fanout { w.write_with_tag(48, |w| w.write_uint64(*s))?; } + Ok(()) + } +} + +pub mod mod_Data { + + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DataType { + Raw = 0, + Directory = 1, + File = 2, + Metadata = 3, + Symlink = 4, + HAMTShard = 5, +} + +impl Default for DataType { + fn default() -> Self { + DataType::Raw + } +} + +impl From for DataType { + fn from(i: i32) -> Self { + match i { + 0 => DataType::Raw, + 1 => DataType::Directory, + 2 => DataType::File, + 3 => DataType::Metadata, + 4 => DataType::Symlink, + 5 => DataType::HAMTShard, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for DataType { + fn from(s: &'a str) -> Self { + match s { + "Raw" => DataType::Raw, + "Directory" => DataType::Directory, + "File" => DataType::File, + "Metadata" => DataType::Metadata, + "Symlink" => DataType::Symlink, + "HAMTShard" => DataType::HAMTShard, + _ => Self::default(), + } + } +} + +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Metadata<'a> { + pub MimeType: Option>, +} + +impl<'a> MessageRead<'a> for Metadata<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.MimeType = Some(r.read_string(bytes).map(Cow::Borrowed)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Metadata<'a> { + fn get_size(&self) -> usize { + 0 + + self.MimeType.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.MimeType { w.write_with_tag(10, |w| w.write_string(&**s))?; } + Ok(()) + } +} + diff --git a/storage/mater/src/v1/mod.rs b/storage/mater/src/v1/mod.rs new file mode 100644 index 000000000..a5ab66157 --- /dev/null +++ b/storage/mater/src/v1/mod.rs @@ -0,0 +1,86 @@ +mod reader; +mod writer; + +use ipld_core::cid::Cid; +use serde::{Deserialize, Serialize}; + +pub use crate::v1::{reader::Reader, writer::Writer}; +pub(crate) use crate::v1::{ + reader::{read_block, read_header}, + writer::{write_block, write_header}, +}; + +/// Low-level CARv1 header. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct Header { + /// CAR file version. + /// + /// It is always 1, as defined in the + /// [specification](https://ipld.io/specs/transport/car/carv1/#constraints). + version: u8, + + /// Root [`Cid`]s for the contained data. + pub roots: Vec, +} + +impl Header { + /// Construct a new [`Header`]. + /// + /// The version will always be 1, as defined in the + /// [specification](https://ipld.io/specs/transport/car/carv1/#constraints). + pub fn new(roots: Vec) -> Self { + Self { version: 1, roots } + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use ipld_core::cid::Cid; + use sha2::Sha256; + use tokio::io::BufWriter; + + use crate::{ + multicodec::{generate_multihash, RAW_CODE}, + v1::{Header, Reader, Writer}, + }; + + impl Writer>> { + pub fn test_writer() -> Self { + let buffer = Vec::new(); + let buf_writer = BufWriter::new(buffer); + Writer::new(buf_writer) + } + } + + #[tokio::test] + async fn roundtrip_lorem() { + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let root_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let written_header = Header::new(vec![root_cid]); + let mut writer = crate::v1::Writer::test_writer(); + writer.write_header(&written_header).await.unwrap(); + + // There's only one block + writer.write_block(&root_cid, &file_contents).await.unwrap(); + let buf_writer = writer.finish().await.unwrap(); + let expected_header = tokio::fs::read("tests/fixtures/car_v1/lorem.car") + .await + .unwrap(); + assert_eq!(&expected_header, buf_writer.get_ref()); + + let buffer = buf_writer.into_inner(); + let mut reader = Reader::new(Cursor::new(buffer)); + let read_header = reader.read_header().await.unwrap(); + assert_eq!(read_header, written_header); + + let (read_cid, read_block) = reader.read_block().await.unwrap(); + assert_eq!(read_cid, root_cid); + assert_eq!(read_block, file_contents); + } +} diff --git a/storage/mater/src/v1/reader.rs b/storage/mater/src/v1/reader.rs new file mode 100644 index 000000000..cffa3ff96 --- /dev/null +++ b/storage/mater/src/v1/reader.rs @@ -0,0 +1,176 @@ +use std::io::Cursor; + +use integer_encoding::VarIntAsyncReader; +use ipld_core::{cid::Cid, codec::Codec}; +use serde_ipld_dagcbor::codec::DagCborCodec; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use crate::{v1::Header, v2::PRAGMA, Error}; + +pub(crate) async fn read_header(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let header_length: usize = reader.read_varint_async().await?; + let mut header_buffer = vec![0; header_length]; + reader.read_exact(&mut header_buffer).await?; + + // From the V2 specification: + // > This 11 byte string remains fixed and may be matched using a + // > simple byte comparison and does not require a varint or CBOR + // > decode since it does not vary for the CARv2 format. + // We're skipping the first byte because we already read the length + if header_buffer.starts_with(&PRAGMA[1..]) { + return Err(Error::VersionMismatchError { + expected: 1, + received: 2, + }); + } + + let header: Header = DagCborCodec::decode_from_slice(&header_buffer)?; + // NOTE(@jmg-duarte,23/05/2024): implementing a custom Deserialize for Header + // would make this shorter and overall handling more reliable + if header.version != 1 { + return Err(Error::VersionMismatchError { + expected: 1, + received: header.version, + }); + } + if header.roots.is_empty() { + return Err(Error::EmptyRootsError); + } + Ok(header) +} + +pub(crate) async fn read_block(mut reader: R) -> Result<(Cid, Vec), Error> +where + R: AsyncRead + Unpin, +{ + let full_block_length: usize = reader.read_varint_async().await?; + let mut full_block_buffer = vec![0; full_block_length]; + reader.read_exact(&mut full_block_buffer).await?; + + // We're cheating to get Seek + let mut full_block_cursor = Cursor::new(full_block_buffer); + let cid = Cid::read_bytes(&mut full_block_cursor)?; + + let data_start_position = full_block_cursor.position() as usize; + let mut full_block_buffer = full_block_cursor.into_inner(); + + // NOTE(@jmg-duarte,19/05/2024): could we avoid getting a new vector here and just drop the beginning? + Ok((cid, full_block_buffer.split_off(data_start_position))) +} + +/// Low-level CARv1 reader. +pub struct Reader { + reader: R, +} + +impl Reader { + /// Constructs a new [`Reader`]. + pub fn new(reader: R) -> Self { + Self { reader } + } +} + +impl Reader +where + R: AsyncRead + Unpin, +{ + /// Read a [`Header`]. + /// + /// As defined in the [specification constraints](https://ipld.io/specs/transport/car/carv1/#constraints), + /// this function will return an error if: + /// * The read header does not have version 1. + /// * The read header does not have roots. + /// + /// For more information, check the [header specification](https://ipld.io/specs/transport/car/carv1/#header). + pub async fn read_header(&mut self) -> Result { + read_header(&mut self.reader).await + } + + /// Reads a [`Cid`] and a data block. + /// + /// A block is composed of a CID (either version 0 or 1) and data, it is prefixed with the data length. + /// ```text + /// ┌──────────────────────┬─────┬────────────────────────┐ + /// │ Data length (varint) │ CID │ Data block (raw bytes) │ + /// └──────────────────────┴─────┴────────────────────────┘ + /// ``` + /// *The data block is returned AS IS, callers should use the codec field of the [`Cid`] to parse it.* + /// + /// For more information, check the [block specification](https://ipld.io/specs/transport/car/carv1/#data). + pub async fn read_block(&mut self) -> Result<(Cid, Vec), Error> { + read_block(&mut self.reader).await + } +} + +#[cfg(test)] +mod tests { + use ipld_core::cid::Cid; + use sha2::Sha256; + use tokio::{fs::File, io::BufReader}; + + use crate::{ + multicodec::{generate_multihash, RAW_CODE}, + v1::reader::Reader, + Error, + }; + + #[tokio::test] + async fn header_reader() { + let contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&contents); + let contents_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let file = File::open("tests/fixtures/car_v1/lorem_header.car") + .await + .unwrap(); + let reader = BufReader::new(file); + let mut reader = Reader::new(reader); + let header = reader.read_header().await.unwrap(); + + assert_eq!(header.version, 1); + assert_eq!(header.roots.len(), 1); + assert_eq!(header.roots[0], contents_cid); + } + + #[tokio::test] + async fn full_reader() { + let contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&contents); + let contents_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let file = File::open("tests/fixtures/car_v1/lorem.car").await.unwrap(); + let reader = BufReader::new(file); + let mut reader = Reader::new(reader); + let header = reader.read_header().await.unwrap(); + + assert_eq!(header.version, 1); + assert_eq!(header.roots.len(), 1); + assert_eq!(header.roots[0], contents_cid); + + let (cid, block) = reader.read_block().await.unwrap(); + assert_eq!(cid, contents_cid); + assert_eq!(block, contents); + } + + #[tokio::test] + async fn v2_header() { + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + let header = reader.read_header().await; + println!("{:?}", header); + assert!(matches!( + header, + Err(Error::VersionMismatchError { + expected: 1, + received: 2 + }) + )); + } +} diff --git a/storage/mater/src/v1/writer.rs b/storage/mater/src/v1/writer.rs new file mode 100644 index 000000000..0a06429f2 --- /dev/null +++ b/storage/mater/src/v1/writer.rs @@ -0,0 +1,135 @@ +use integer_encoding::VarIntAsyncWriter; +use ipld_core::{cid::Cid, codec::Codec}; +use serde_ipld_dagcbor::codec::DagCborCodec; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +pub use crate::v1::Header; +use crate::Error; + +/// Write [`crate::v1::Header`] to the provider writer. +pub(crate) async fn write_header(writer: &mut W, header: &Header) -> Result +where + W: AsyncWrite + Unpin, +{ + let encoded_header = DagCborCodec::encode_to_vec(header)?; + let varint_len = writer.write_varint_async(encoded_header.len()).await?; + writer.write_all(&encoded_header).await?; + Ok(varint_len + encoded_header.len()) +} + +/// Write a [`Cid`] and data block to the given writer. +/// +/// This is a low-level function to be used in the implementation of CAR writers. +pub(crate) async fn write_block( + writer: &mut W, + cid: &Cid, + block: Block, +) -> Result +where + W: AsyncWrite + Unpin, + Block: AsRef<[u8]>, +{ + let data = block.as_ref(); + let len = cid.encoded_len() + data.len(); + + let varint_len = writer.write_varint_async(len).await?; + // This allocation can probably be spared + writer.write_all(&cid.to_bytes()).await?; + writer.write_all(block.as_ref()).await?; + Ok(varint_len + cid.encoded_len() + block.as_ref().len()) +} + +/// Low-level CARv1 writer. +pub struct Writer { + writer: W, +} + +impl Writer { + /// Construct a new [`crate::v1::Writer`]. + /// + /// Takes a writer into which the data will be written. + pub fn new(writer: W) -> Self { + Self { writer } + } +} + +impl Writer +where + W: AsyncWrite + Unpin, +{ + /// Write a [`crate::v1::Header`]. + pub async fn write_header(&mut self, header: &Header) -> Result { + write_header(&mut self.writer, header).await + } + + /// Write a [`Cid`] and the respective data block. + pub async fn write_block(&mut self, cid: &Cid, data: &D) -> Result + where + D: AsRef<[u8]>, + { + write_block(&mut self.writer, cid, data).await + } + + /// Flushes and returns the inner writer. + pub async fn finish(mut self) -> Result { + self.writer.flush().await?; + Ok(self.writer) + } +} + +#[cfg(test)] +mod tests { + use ipld_core::cid::Cid; + use sha2::Sha256; + + use super::Writer; + use crate::{ + multicodec::{generate_multihash, RAW_CODE}, + v1::Header, + }; + + #[tokio::test] + async fn header_writer() { + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let root_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let mut writer = Writer::test_writer(); + writer + .write_header(&Header::new(vec![root_cid])) + .await + .unwrap(); + let buf_writer = writer.finish().await.unwrap(); + + let expected_header = tokio::fs::read("tests/fixtures/car_v1/lorem_header.car") + .await + .unwrap(); + + assert_eq!(&expected_header, buf_writer.get_ref()); + } + + #[tokio::test] + async fn full_writer() { + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let root_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let mut writer = Writer::test_writer(); + writer + .write_header(&Header::new(vec![root_cid])) + .await + .unwrap(); + // There's only one block + writer.write_block(&root_cid, &file_contents).await.unwrap(); + let buf_writer = writer.finish().await.unwrap(); + + let expected_header = tokio::fs::read("tests/fixtures/car_v1/lorem.car") + .await + .unwrap(); + assert_eq!(&expected_header, buf_writer.get_ref()); + } +} diff --git a/storage/mater/src/v2/index.rs b/storage/mater/src/v2/index.rs new file mode 100644 index 000000000..000d82458 --- /dev/null +++ b/storage/mater/src/v2/index.rs @@ -0,0 +1,598 @@ +use std::{collections::BTreeMap, mem::size_of}; + +use integer_encoding::{VarIntAsyncReader, VarIntAsyncWriter}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use crate::Error; + +/// `IndexSorted` code format value, as defined in the +/// [specification](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted). +pub const INDEX_SORTED_CODE: u64 = 0x0400; + +/// `MultihashIndexSorted` code format value, as defined in the +/// [specification](https://ipld.io/specs/transport/car/carv2/#format-0x0401-multihashindexsorted). +pub const MULTIHASH_INDEX_SORTED_CODE: u64 = 0x0401; + +// Basically, everything that does not have explicit endianness in the specification +// is little-endian, as made evident by the go-car source code: +// https://github.com/ipld/go-car/blob/45b81c1cc5117b3340dfdb025afeca90bfbe8d86/v2/index/mhindexsorted.go#L45-L53 + +/// A index entry for a data block inside the CARv1. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct IndexEntry { + /// Hash digest of the data. + pub digest: Vec, + + /// Offset to the first byte of the varint that prefix the CID:Bytes pair within the CARv1 payload. + /// + /// See the [data section in the CARv1 specification](https://ipld.io/specs/transport/car/carv1/#data) + /// for details on block encoding. + pub offset: u64, +} + +impl IndexEntry { + /// Construct a new [`IndexEntry`](crate::v2::IndexEntry). + pub fn new(digest: Vec, offset: u64) -> Self { + Self { digest, offset } + } +} + +/// An index containing a single digest length. +#[derive(Debug, PartialEq, Eq)] +pub struct SingleWidthIndex { + /// The hash digest and the respective offset length. + pub width: u32, + + /// The number of index entries. + /// It is serialized as the length of all entries in bytes + /// (i.e. `self.count * self.width`). + /// + /// See `go-car`'s source code for more information: + /// + pub count: u64, + + /// The index entries. + pub entries: Vec, +} + +impl SingleWidthIndex { + /// Construct a new [`SingleWidthIndex`](crate::v2::SingleWidthIndex). + /// + /// Notes: + /// * The `digest_width` should not account for the offset length. + /// * This function sorts the `entries`. + pub fn new(digest_width: u32, count: u64, mut entries: Vec) -> Self { + entries.sort_by(|fst, snd| fst.digest.cmp(&snd.digest)); + Self { + width: digest_width + 8, // digest_width + offset len + count, + entries, + } + } +} + +impl From for SingleWidthIndex { + fn from(value: IndexEntry) -> Self { + SingleWidthIndex::new(value.digest.len() as u32, 1, vec![value]) + } +} + +impl TryFrom> for SingleWidthIndex { + type Error = Error; + + /// Performs the conversion, validating that all the [`IndexEntry`] have the same width. + fn try_from(value: Vec) -> Result { + if value.is_empty() { + return Err(Error::EmptyIndexError); + } + let width = value[0].digest.len(); + let count = value.len(); + for entry in &value[1..] { + if entry.digest.len() != width { + return Err(Error::NonMatchingDigestError { + expected: width, + received: entry.digest.len(), + }); + } + } + Ok(Self::new(width as u32, count as u64, value)) + } +} + +/// An index containing hash digests of multiple lengths. +/// +/// To find a given index entry, first find the right index width, +/// and then find the hash to the data block. +/// +/// For more details, read the [`Format 0x0400: IndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted) section in the CARv2 specification. +#[derive(Debug, PartialEq, Eq)] +pub struct IndexSorted(pub Vec); + +impl From for IndexSorted { + fn from(value: IndexEntry) -> Self { + Self(vec![SingleWidthIndex::from(value)]) + } +} + +impl From for IndexSorted { + fn from(value: SingleWidthIndex) -> Self { + Self(vec![value]) + } +} + +impl From> for IndexSorted { + fn from(value: Vec) -> Self { + Self(value) + } +} + +/// An index mapping Multihash codes to [`IndexSorted`]. +/// +/// For more details, read the [`Format 0x0401: MultihashIndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0401-multihashindexsorted) section in the CARv2 specification. +#[derive(Debug, PartialEq, Eq)] +pub struct MultihashIndexSorted( + // NOTE(@jmg-duarte,21/05/2024): maybe we should implement Deref where Deref::Target = BTreeMap? + pub BTreeMap, +); + +impl MultihashIndexSorted { + /// Create a [`MultihashIndexSorted`] from a [digest code](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv) and an [`IndexSorted`]. + pub fn from_single_width(code: u64, index: IndexSorted) -> Self { + let mut map = BTreeMap::new(); + map.insert(code, index); + Self(map) + } +} + +impl From> for MultihashIndexSorted { + fn from(value: BTreeMap) -> Self { + Self(value) + } +} + +/// CARv2 index. +/// +/// For more information, check the [specification](https://ipld.io/specs/transport/car/carv2/#index-payload). +#[derive(Debug, PartialEq, Eq)] +pub enum Index { + /// An index sorting by digest length, from smallest to largest. + /// + /// Check [`IndexSorted`] for more information. + IndexSorted(IndexSorted), + + /// An index sorting by [Multihash code](https://github.com/multiformats/multicodec/blob/master/table.csv). + /// + /// Check [`MultihashIndexSorted`] for more information. + MultihashIndexSorted(MultihashIndexSorted), +} + +impl Index { + /// Construct a new [`Index::MultihashIndexSorted`]. + /// + /// Check [`MultihashIndexSorted`] for more information. + pub fn multihash(index: BTreeMap) -> Self { + Self::MultihashIndexSorted(index.into()) + } +} + +pub(crate) async fn write_index(mut writer: W, index: &Index) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut written_bytes = 0; + match index { + Index::IndexSorted(index) => { + written_bytes += writer.write_varint_async(INDEX_SORTED_CODE).await?; + written_bytes += write_index_sorted(&mut writer, index).await?; + } + Index::MultihashIndexSorted(index) => { + written_bytes += writer + .write_varint_async(MULTIHASH_INDEX_SORTED_CODE) + .await?; + written_bytes += write_multihash_index_sorted(&mut writer, index).await?; + } + } + Ok(written_bytes) +} + +pub(crate) async fn write_multihash_index_sorted( + mut writer: W, + index: &MultihashIndexSorted, +) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut written_bytes = 0; + writer.write_i32_le(index.0.len() as i32).await?; + written_bytes += size_of::(); + for (hash_code, index) in index.0.iter() { + writer.write_u64_le(*hash_code).await?; + written_bytes += size_of::(); + written_bytes += write_index_sorted(&mut writer, index).await?; + } + Ok(written_bytes) +} + +pub(crate) async fn write_index_sorted( + mut writer: W, + index: &IndexSorted, +) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut written_bytes = 0; + writer.write_i32_le(index.0.len() as i32).await?; + written_bytes += size_of::(); + for idx in &index.0 { + written_bytes += write_single_width_index(&mut writer, idx).await?; + } + Ok(written_bytes) +} + +pub(crate) async fn write_single_width_index( + mut writer: W, + index: &SingleWidthIndex, +) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut written_bytes = 0; + writer.write_u32_le(index.width).await?; + written_bytes += size_of::(); + writer + .write_u64_le(index.count * (index.width as u64)) + .await?; + written_bytes += size_of::(); + for entry in &index.entries { + written_bytes += write_index_entry(&mut writer, entry).await?; + } + Ok(written_bytes) +} + +pub(crate) async fn write_index_entry(mut writer: W, entry: &IndexEntry) -> Result +where + W: AsyncWrite + Unpin, +{ + writer.write_all(&entry.digest).await?; + writer.write_u64_le(entry.offset).await?; + Ok(entry.digest.len() + size_of::()) +} + +pub(crate) async fn read_index(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let index_type: u64 = reader.read_varint_async().await?; + return match index_type { + INDEX_SORTED_CODE => Ok(Index::IndexSorted(read_index_sorted(&mut reader).await?)), + MULTIHASH_INDEX_SORTED_CODE => Ok(Index::MultihashIndexSorted( + read_multihash_index_sorted(&mut reader).await?, + )), + other => Err(Error::UnknownIndexError(other)), + }; +} + +pub(crate) async fn read_multihash_index_sorted( + mut reader: R, +) -> Result +where + R: AsyncRead + Unpin, +{ + let n_indexes = reader.read_i32_le().await?; + let mut indexes = BTreeMap::new(); + for _ in 0..n_indexes { + let multihash_code = reader.read_u64_le().await?; + let index = read_index_sorted(&mut reader).await?; + indexes.insert(multihash_code, index); + } + Ok(MultihashIndexSorted(indexes)) +} + +pub(crate) async fn read_index_sorted(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let n_buckets = reader.read_i32_le().await?; + let mut buckets = Vec::with_capacity(n_buckets as usize); + for _ in 0..n_buckets { + let index = read_single_width_index(&mut reader).await?; + buckets.push(index); + } + Ok(IndexSorted(buckets)) +} + +pub(crate) async fn read_single_width_index(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let width = reader.read_u32_le().await?; + // Because someone decided that "total number of hash digests" means their length in bytes... + // https://github.com/ipld/go-car/blob/45b81c1cc5117b3340dfdb025afeca90bfbe8d86/v2/index/indexsorted.go#L29 + let count = reader.read_u64_le().await? / (width as u64); + let mut entries = Vec::with_capacity(count as usize); + for _ in 0..count { + // The offset is always 8 bytes + // https://github.com/ipld/go-car/blob/45b81c1cc5117b3340dfdb025afeca90bfbe8d86/v2/index/indexsorted.go#L176 + let entry = read_index_entry(&mut reader, width - 8).await?; + entries.push(entry); + } + + // Sorting by the digest only because it should be enough (famous last words) + // > ... and finally within those buckets ordered by a simple byte-wise sorting. + // — https://ipld.io/specs/transport/car/carv2/#format-0x0401-multihashindexsorted + entries.sort_by(|fst, snd| fst.digest.cmp(&snd.digest)); + + Ok(SingleWidthIndex { + width, + count, + entries, + }) +} + +pub(crate) async fn read_index_entry(mut reader: R, length: u32) -> Result +where + R: AsyncRead + Unpin, +{ + let mut digest = vec![0; length as usize]; + reader.read_exact(&mut digest).await?; + let offset = reader.read_u64_le().await?; + Ok(IndexEntry { digest, offset }) +} + +#[cfg(test)] +mod tests { + use std::{ + collections::{BTreeMap, HashMap}, + io::Cursor, + }; + + use rand::{random, Rng}; + use sha2::{Digest, Sha256, Sha512}; + use tokio::{fs::File, io::AsyncSeekExt}; + + use crate::{ + multicodec::{generate_multihash, MultihashCode, DAG_PB_CODE, RAW_CODE, SHA_256_CODE}, + v1::read_block, + v2::index::{ + read_index, read_index_entry, read_index_sorted, read_multihash_index_sorted, + read_single_width_index, write_index, write_index_entry, write_index_sorted, + write_multihash_index_sorted, write_single_width_index, Index, IndexEntry, IndexSorted, + MultihashIndexSorted, SingleWidthIndex, + }, + }; + + fn generate_single_width_index(count: u64) -> SingleWidthIndex + where + H: Digest, + { + let mut entries = vec![]; + let mut data = vec![0u8; ::output_size()]; + data.fill_with(random); + for idx in 0..count { + let digest = H::digest(&data).to_vec(); + entries.push(IndexEntry::new(digest, idx)); + } + SingleWidthIndex::try_from(entries).unwrap() + } + + #[tokio::test] + async fn multihash_index_sorted_lorem() { + let contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let digest = Sha256::digest(&contents); + + let mut file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + // We're skipping 2 bytes from the actual offset because we're not decoding the index type + file.seek(std::io::SeekFrom::Start(7714)).await.unwrap(); + let index = read_multihash_index_sorted(file).await.unwrap(); + assert_eq!(index.0.len(), 1); + assert!(index.0.contains_key(&Sha256::CODE)); + + let multi_width_index = &index.0[&Sha256::CODE]; + assert_eq!(multi_width_index.0.len(), 1); + + let single_width_index = &multi_width_index.0[0]; + assert_eq!(single_width_index.width, 40); + assert_eq!(single_width_index.count, 1); + assert_eq!(single_width_index.entries.len(), 1); + + let entry = &single_width_index.entries[0]; + // Data offset: 51 & Hash length: 8 + assert_eq!(entry.offset, 51 + 8); + assert_eq!(entry.digest, *digest); + } + + /// `tests/fixtures/original/spaceglenda.jpg` generates a CARv2 file + /// with multiple blocks, but not an insane amount, perfect for testing. + #[tokio::test] + async fn multihash_index_sorted_spaceglenda() { + let mut file = File::open("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + // We're skipping 2 bytes from the actual offset because we're not decoding the index type + file.seek(std::io::SeekFrom::Start(654455)).await.unwrap(); + let index = read_multihash_index_sorted(&mut file).await.unwrap(); + assert_eq!(index.0.len(), 1); + assert!(index.0.contains_key(&Sha256::CODE)); + + let multi_width_index = &index.0[&Sha256::CODE]; + assert_eq!(multi_width_index.0.len(), 1); + + let single_width_index = &multi_width_index.0[0]; + assert_eq!(single_width_index.width, 40); + assert_eq!(single_width_index.count, 4); + assert_eq!(single_width_index.entries.len(), 4); + + let mut codec_frequencies = HashMap::new(); + for entry in &single_width_index.entries { + file.seek(std::io::SeekFrom::Start( + 51 + // Cheating a bit using the start data offset + entry.offset, + )) + .await + .unwrap(); + + let (cid, block) = read_block(&mut file).await.unwrap(); + assert_eq!(cid.hash().code(), SHA_256_CODE); + + // Sorting at this level is made byte-wise, so there's no short way + // to compare the expected codecs... + assert!(cid.codec() == DAG_PB_CODE || cid.codec() == RAW_CODE); + // instead we build a frequency table and check against that later! + if let Some(frequency) = codec_frequencies.get_mut(&cid.codec()) { + *frequency += 1; + } else { + codec_frequencies.insert(cid.codec(), 1); + } + + let multihash = generate_multihash::(&block); + assert_eq!(cid.hash(), &multihash); + } + + assert!(matches!(codec_frequencies.get(&DAG_PB_CODE), Some(1))); + assert!(matches!(codec_frequencies.get(&RAW_CODE), Some(3))); + } + + #[tokio::test] + async fn multihash_index_sorted_from_read_index() { + let contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let digest = Sha256::digest(&contents); + + let mut file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + + file.seek(std::io::SeekFrom::Start(7712)).await.unwrap(); + let index = read_index(file).await.unwrap(); + assert!(matches!(index, Index::MultihashIndexSorted(_))); + + if let Index::MultihashIndexSorted(index) = index { + assert_eq!(index.0.len(), 1); + assert!(index.0.contains_key(&Sha256::CODE)); + + let multi_width_index = &index.0[&Sha256::CODE]; + assert_eq!(multi_width_index.0.len(), 1); + + let single_width_index = &multi_width_index.0[0]; + assert_eq!(single_width_index.width, 40); + assert_eq!(single_width_index.count, 1); + assert_eq!(single_width_index.entries.len(), 1); + + let entry = &single_width_index.entries[0]; + // Data offset: 51 & Hash length: 8 + assert_eq!(entry.offset, 51 + 8); + assert_eq!(entry.digest, *digest); + } + } + + #[tokio::test] + async fn rountrip_index_entry() { + let mut data = [0u8; 32]; + rand::thread_rng().fill(&mut data); + let digest = Sha256::digest(data).to_vec(); + let entry = IndexEntry { + digest: digest.clone(), + offset: 42, + }; + + let mut buffer = vec![]; + write_index_entry(&mut buffer, &entry).await.unwrap(); + + let mut reader = Cursor::new(buffer); + let result = read_index_entry(&mut reader, 32).await.unwrap(); + assert_eq!(entry.digest, result.digest); + assert_eq!(entry.offset, result.offset); + } + + #[tokio::test] + async fn roundtrip_single_width_index() { + let single_width = generate_single_width_index::(5); + + let mut buffer = vec![]; + write_single_width_index(&mut buffer, &single_width) + .await + .unwrap(); + let mut reader = Cursor::new(buffer); + let index = read_single_width_index(&mut reader).await.unwrap(); + assert_eq!(single_width, index); + } + + #[tokio::test] + async fn roundtrip_multiwidth_index() { + let index = IndexSorted(vec![ + generate_single_width_index::(5), + generate_single_width_index::(5), + ]); + + let mut buffer = vec![]; + write_index_sorted(&mut buffer, &index).await.unwrap(); + + let mut reader = Cursor::new(buffer); + let result = read_index_sorted(&mut reader).await.unwrap(); + + assert_eq!(index, result); + } + + #[tokio::test] + async fn roundtrip_multihash_index() { + let mut mapping = BTreeMap::new(); + mapping.insert( + Sha256::CODE, + generate_single_width_index::(5).into(), + ); + mapping.insert( + Sha512::CODE, + generate_single_width_index::(5).into(), + ); + let index = MultihashIndexSorted(mapping); + + let mut buffer = vec![]; + write_multihash_index_sorted(&mut buffer, &index) + .await + .unwrap(); + + let mut reader = Cursor::new(buffer); + let result = read_multihash_index_sorted(&mut reader).await.unwrap(); + + assert_eq!(index, result); + } + + #[tokio::test] + async fn roundtrip_index_multihash() { + let mut mapping = BTreeMap::new(); + mapping.insert( + Sha256::CODE, + generate_single_width_index::(5).into(), + ); + mapping.insert( + Sha512::CODE, + generate_single_width_index::(5).into(), + ); + let index = Index::MultihashIndexSorted(MultihashIndexSorted(mapping)); + + let mut buffer = vec![]; + write_index(&mut buffer, &index).await.unwrap(); + + let mut reader = Cursor::new(buffer); + let result = read_index(&mut reader).await.unwrap(); + + assert_eq!(index, result); + } + + #[tokio::test] + async fn roundtrip_index_sorted() { + let index = Index::IndexSorted(IndexSorted(vec![ + generate_single_width_index::(5), + generate_single_width_index::(5), + ])); + + let mut buffer = vec![]; + write_index(&mut buffer, &index).await.unwrap(); + + let mut reader = Cursor::new(buffer); + let result = read_index(&mut reader).await.unwrap(); + + assert_eq!(index, result); + } +} diff --git a/storage/mater/src/v2/mod.rs b/storage/mater/src/v2/mod.rs new file mode 100644 index 000000000..38dc6df7f --- /dev/null +++ b/storage/mater/src/v2/mod.rs @@ -0,0 +1,181 @@ +mod index; +mod reader; +mod writer; + +use bitflags::bitflags; +pub use index::{Index, IndexEntry, IndexSorted, MultihashIndexSorted, SingleWidthIndex}; +pub use reader::Reader; +pub use writer::Writer; + +/// The pragma for a CARv2. This is also a valid CARv1 header, with version 2 and no root CIDs. +/// +/// For more information, check the specification: +pub const PRAGMA: [u8; 11] = [ + 0x0a, // unit(10) + 0xa1, // map(1) + 0x67, // string(7) + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, // "version" + 0x02, // uint(2) +]; + +bitflags! { + /// Characteristics of the enclosed data. + #[derive(Debug, PartialEq, Eq)] + pub struct Characteristics: u128 { + /// No characteristics were set. + const EMPTY = 0; + /// When this characteristic is set (1), the index includes + /// a complete catalog of the section CIDs regardless of + /// whether they are identity CIDs or not. + const FULLY_INDEXED = 1 << 127; + } +} + +impl Characteristics { + /// Create a new [`Characteristics`]. + pub fn new(fully_indexed: bool) -> Self { + if fully_indexed { + Self::FULLY_INDEXED + } else { + Self::EMPTY + } + } + + /// Check whether the `fully-indexed` characteristic is set. + #[inline] + pub const fn is_fully_indexed(&self) -> bool { + self.intersects(Self::FULLY_INDEXED) + } +} + +impl Default for Characteristics { + fn default() -> Self { + Self::EMPTY + } +} + +/// Low-level CARv2 header. +#[derive(Debug, PartialEq, Eq)] +pub struct Header { + /// Describes certain features of the enclosed data. + pub characteristics: Characteristics, + /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the CARv1 data payload. + pub data_offset: u64, + /// Byte-length of the CARv1 data payload. + pub data_size: u64, + /// Byte-offset from the beginning of the CARv2 pragma to the first byte of the index payload. + /// This value may be 0 to indicate the absence of index data. + pub index_offset: u64, +} + +impl Header { + /// Construct a new [`Header`]. + pub fn new(fully_indexed: bool, data_offset: u64, data_size: u64, index_offset: u64) -> Self { + Self { + characteristics: Characteristics::new(fully_indexed), + data_offset, + data_size, + index_offset, + } + } + + /// The [`Header`] size in bytes (includes the pragma). + /// + /// As defined in the [specification](https://ipld.io/specs/transport/car/carv2/#header). + pub const SIZE: usize = PRAGMA.len() + 40; +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, io::Cursor}; + + use ipld_core::cid::Cid; + use sha2::Sha256; + use tokio::io::{AsyncSeekExt, BufWriter}; + + use crate::{ + multicodec::{generate_multihash, MultihashCode, RAW_CODE}, + test_utils::assert_buffer_eq, + v2::{ + index::{Index, IndexEntry, IndexSorted}, + Header, Reader, Writer, + }, + }; + + #[tokio::test] + async fn roundtrip_lorem() { + let cursor = Cursor::new(vec![]); + let buf_writer = BufWriter::new(cursor); + let mut writer = Writer::new(buf_writer); + + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let root_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let written_header = Header::new(false, 51, 7661, 7712); + // To simplify testing, the values were extracted using `car inspect` + writer.write_header(&written_header).await.unwrap(); + + // We start writing the CARv1 here and keep the stream positions + // so that we can properly index the blocks later + let start_car_v1 = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + + let written_header_v1 = crate::v1::Header::new(vec![root_cid]); + writer.write_v1_header(&written_header_v1).await.unwrap(); + + let start_car_v1_data = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + + // There's only one block + writer.write_block(&root_cid, &file_contents).await.unwrap(); + + let written = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + assert_eq!(written, 7712); + + let mut mapping = BTreeMap::new(); + mapping.insert( + Sha256::CODE, + IndexSorted::from(IndexEntry::new( + root_cid.hash().digest().to_vec(), + // This detail is "hidden" in the spec even though it's SO IMPORTANT + // See: https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted + // > Individual index entries are the concatenation of the hash digest + // > an an additional 64-bit unsigned little-endian integer indicating + // > the offset of the block from the begining of the CARv1 data payload. + start_car_v1_data - start_car_v1, + )), + ); + let written_index = Index::multihash(mapping); + writer.write_index(&written_index).await.unwrap(); + + let mut buffer = writer.finish().await.unwrap().into_inner(); + buffer.rewind().await.unwrap(); + let expected_header = tokio::fs::read("tests/fixtures/car_v2/lorem.car") + .await + .unwrap(); + + assert_buffer_eq!(&expected_header, buffer.get_ref()); + + let mut reader = Reader::new(buffer); + reader.read_pragma().await.unwrap(); + let read_header = reader.read_header().await.unwrap(); + assert_eq!(read_header, written_header); + let read_header_v1 = reader.read_v1_header().await.unwrap(); + assert_eq!(read_header_v1, written_header_v1); + let (read_cid, read_block) = reader.read_block().await.unwrap(); + assert_eq!(read_cid, root_cid); + assert_eq!(read_block, file_contents); + let read_index = reader.read_index().await.unwrap(); + assert_eq!(read_index, written_index); + } +} diff --git a/storage/mater/src/v2/reader.rs b/storage/mater/src/v2/reader.rs new file mode 100644 index 000000000..642f4db5a --- /dev/null +++ b/storage/mater/src/v2/reader.rs @@ -0,0 +1,319 @@ +use ipld_core::cid::Cid; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use super::index::read_index; +use crate::{ + v2::{index::Index, Characteristics, Header, PRAGMA}, + Error, +}; + +/// Low-level CARv2 reader. +pub struct Reader { + reader: R, +} + +impl Reader { + /// Constructs a new [`Reader`]. + pub fn new(reader: R) -> Self { + Self { reader } + } +} + +impl Reader +where + R: AsyncRead + Unpin, +{ + /// Read the CARv2 pragma. + /// + /// This function fails if the pragma does not match the one defined in the + /// [specification](https://ipld.io/specs/transport/car/carv2/#pragma). + pub async fn read_pragma(&mut self) -> Result<(), Error> { + let mut pragma_buffer = vec![0; PRAGMA.len()]; + self.reader.read_exact(&mut pragma_buffer).await?; + if pragma_buffer != PRAGMA { + return Err(Error::InvalidPragmaError(pragma_buffer)); + } + // Since we validate the pragma, there's no point in returning it. + Ok(()) + } + + /// Read the [`Header`]. + /// + /// This function fails if there are set bits that are not covered in the + /// [characteristics specification](https://ipld.io/specs/transport/car/carv2/#characteristics). + /// + /// For more information check the [header specification](https://ipld.io/specs/transport/car/carv2/#header). + pub async fn read_header(&mut self) -> Result { + // Even though the standard doesn't explicitly state endianness, go-car does + // https://github.com/ipld/go-car/blob/45b81c1cc5117b3340dfdb025afeca90bfbe8d86/v2/car.go#L51-L69 + let characteristics_bitfield = self.reader.read_u128_le().await?; + + let characteristics = Characteristics::from_bits(characteristics_bitfield) + .ok_or(Error::UnknownCharacteristicsError(characteristics_bitfield))?; + + let data_offset = self.reader.read_u64_le().await?; + let data_size = self.reader.read_u64_le().await?; + let index_offset = self.reader.read_u64_le().await?; + + Ok(Header { + characteristics, + data_offset, + data_size, + index_offset, + }) + } + + /// Read the [`Header`]. + /// + /// See [`crate::v1::Reader`] for more information. + pub async fn read_v1_header(&mut self) -> Result { + crate::v1::read_header(&mut self.reader).await + } + + /// Read a [`Cid`] and data block. + /// + /// See [`crate::v1::Reader`] for more information. + pub async fn read_block(&mut self) -> Result<(Cid, Vec), Error> { + crate::v1::read_block(&mut self.reader).await + } + + /// Read an [`Index`]. + pub async fn read_index(&mut self) -> Result { + read_index(&mut self.reader).await + } + + /// Get a mutable reference to the inner reader. + /// + /// This is useful to skip padding or perform other operations the + /// [`Reader`] does not natively support. + pub fn get_inner_mut(&mut self) -> &mut R { + &mut self.reader + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use ipld_core::cid::Cid; + use sha2::Sha256; + use tokio::{fs::File, io::AsyncSeekExt}; + + use crate::{ + multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, + v2::{index::Index, reader::Reader}, + Error, + }; + + #[tokio::test] + async fn pragma() { + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + let pragma = reader.read_pragma().await; + assert!(matches!(pragma, Ok(()))); + } + + #[tokio::test] + async fn bad_pragma() { + let mut bad_pragma = vec![0u8; 11]; + bad_pragma.fill_with(rand::random); + let mut reader = Reader::new(Cursor::new(bad_pragma)); + let pragma = reader.read_pragma().await; + assert!(matches!(pragma, Err(Error::InvalidPragmaError(_)))); + } + + #[tokio::test] + async fn header() { + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + let _ = reader.read_pragma().await.unwrap(); + let header = reader.read_header().await.unwrap(); + + // `car inspect tests/fixtures/car_v2/lorem.car` to get the values + assert_eq!(header.characteristics.bits(), 0); + assert_eq!(header.data_offset, 51); + assert_eq!(header.data_size, 7661); + assert_eq!(header.index_offset, 7712); + } + + #[tokio::test] + async fn inner_car() { + // Read the original file to get the multihash + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let contents_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + let _ = reader.read_pragma().await.unwrap(); + let header = reader.read_header().await.unwrap(); + + let inner = reader.get_inner_mut(); + inner + .seek(std::io::SeekFrom::Start(header.data_offset)) + .await + .unwrap(); + + let v1_header = reader.read_v1_header().await.unwrap(); + assert_eq!(v1_header.roots, vec![contents_cid]); + + loop { + match reader.read_block().await { + Ok((cid, _)) => println!("{:?}", cid), + else_ => { + assert!(matches!(else_, Err(Error::IoError(_)))); + break; + } + } + } + } + + #[tokio::test] + async fn indexes() { + // Read the original file to get the multihash + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + let _ = reader.read_pragma().await.unwrap(); + let header = reader.read_header().await.unwrap(); + + let inner = reader.get_inner_mut(); + inner + .seek(std::io::SeekFrom::Start(header.index_offset)) + .await + .unwrap(); + + let index = reader.read_index().await.unwrap(); + assert!(matches!(index, Index::MultihashIndexSorted(_))); + if let Index::MultihashIndexSorted(mh) = index { + assert_eq!(mh.0.len(), 1); + assert!(mh.0.contains_key(&SHA_256_CODE)); + let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(fst.len(), 1); + assert_eq!(fst[0].count, 1); + assert_eq!(fst[0].width, 40); + assert_eq!(fst[0].entries.len(), 1); + assert_eq!(fst[0].entries[0].offset, 59); + assert_eq!(fst[0].entries[0].digest, contents_multihash.digest()); + } + } + + #[tokio::test] + async fn full_file_lorem() { + // Read the original file to get the multihash + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let contents_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + let file = File::open("tests/fixtures/car_v2/lorem.car").await.unwrap(); + let mut reader = Reader::new(file); + reader.read_pragma().await.unwrap(); + + let header = reader.read_header().await.unwrap(); + // `car inspect tests/fixtures/car_v2/lorem.car` to get the values + assert_eq!(header.characteristics.bits(), 0); + assert_eq!(header.data_offset, 51); + assert_eq!(header.data_size, 7661); + assert_eq!(header.index_offset, 7712); + + let v1_header = reader.read_v1_header().await.unwrap(); + assert_eq!(v1_header.roots, vec![contents_cid]); + + loop { + match reader.read_block().await { + Ok((cid, _)) => { + // Kinda hacky, but better than doing a seek later on + let position = reader.get_inner_mut().stream_position().await.unwrap(); + let data_end = header.data_offset + header.data_size; + if position >= data_end { + break; + } + println!("{:?}", cid); + } + else_ => { + assert!(matches!(else_, Err(Error::IoError(_)))); + break; + } + } + } + + let index = reader.read_index().await.unwrap(); + assert!(matches!(index, Index::MultihashIndexSorted(_))); + if let Index::MultihashIndexSorted(mh) = index { + assert_eq!(mh.0.len(), 1); + assert!(mh.0.contains_key(&SHA_256_CODE)); + let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(fst.len(), 1); + assert_eq!(fst[0].count, 1); + assert_eq!(fst[0].width, 40); + assert_eq!(fst[0].entries.len(), 1); + assert_eq!(fst[0].entries[0].offset, 59); + assert_eq!(fst[0].entries[0].digest, contents_multihash.digest()); + } + } + + #[tokio::test] + async fn full_file_glenda() { + let file = File::open("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + let mut reader = Reader::new(file); + reader.read_pragma().await.unwrap(); + + let header = reader.read_header().await.unwrap(); + // `car inspect tests/fixtures/car_v2/lorem.car` to get the values + assert_eq!(header.characteristics.bits(), 0); + assert_eq!(header.data_offset, 51); + assert_eq!(header.data_size, 654402); + assert_eq!(header.index_offset, 654453); + + let v1_header = reader.read_v1_header().await.unwrap(); + assert_eq!(v1_header.roots.len(), 1); + assert_eq!( + v1_header.roots[0] + .to_string_of_base(ipld_core::cid::multibase::Base::Base32Lower) + .unwrap(), + // Taken from `car inspect tests/fixtures/car_v2/spaceglenda.car` + "bafybeiefli7iugocosgirzpny4t6yxw5zehy6khtao3d252pbf352xzx5q" + ); + + loop { + // NOTE(@jmg-duarte,22/05/2024): review this + match reader.read_block().await { + Ok((_, _)) => { + // Kinda hacky, but better than doing a seek later on + let position = reader.get_inner_mut().stream_position().await.unwrap(); + let data_end = header.data_offset + header.data_size; + if position >= data_end { + break; + } + } + else_ => { + // With the length check above this branch should actually be unreachable + assert!(matches!(else_, Err(Error::IoError(_)))); + break; + } + } + } + + let index = reader.read_index().await.unwrap(); + assert!(matches!(index, Index::MultihashIndexSorted(_))); + if let Index::MultihashIndexSorted(mh) = index { + assert_eq!(mh.0.len(), 1); + assert!(mh.0.contains_key(&SHA_256_CODE)); + let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(fst.len(), 1); + assert_eq!(fst[0].count, 4); + assert_eq!(fst[0].width, 40); + assert_eq!(fst[0].entries.len(), 4); + } + } +} diff --git a/storage/mater/src/v2/writer.rs b/storage/mater/src/v2/writer.rs new file mode 100644 index 000000000..3622e7a6c --- /dev/null +++ b/storage/mater/src/v2/writer.rs @@ -0,0 +1,304 @@ +use byteorder::{LittleEndian, WriteBytesExt}; +use ipld_core::cid::Cid; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use super::{Header, PRAGMA}; +use crate::{v2::index::Index, Error}; + +/// Low-level CARv2 writer. +pub struct Writer { + writer: W, +} + +impl Writer { + /// Construct a new [`Writer`]. + /// + /// Takes a write into which the data will be written. + pub fn new(writer: W) -> Self { + Self { writer } + } +} + +impl Writer +where + W: AsyncWrite + Unpin, +{ + /// Write a [`Header`]. + /// + /// Returns the number of bytes written. + pub async fn write_header(&mut self, header: &Header) -> Result { + self.writer.write_all(&PRAGMA).await?; + + let mut buffer = [0; 40]; + let mut handle = &mut buffer[..]; + WriteBytesExt::write_u128::(&mut handle, header.characteristics.bits())?; + WriteBytesExt::write_u64::(&mut handle, header.data_offset)?; + WriteBytesExt::write_u64::(&mut handle, header.data_size)?; + WriteBytesExt::write_u64::(&mut handle, header.index_offset)?; + + self.writer.write_all(&buffer).await?; + Ok(PRAGMA.len() + buffer.len()) + } + + /// Write a [`crate::v1::Header`]. + /// + /// Returns the number of bytes written. + pub async fn write_v1_header(&mut self, v1_header: &crate::v1::Header) -> Result { + crate::v1::write_header(&mut self.writer, v1_header).await + } + + /// Write a [`Cid`] and the respective data block. + /// + /// Returns the number of bytes written. + pub async fn write_block(&mut self, cid: &Cid, block: &Block) -> Result + where + Block: AsRef<[u8]>, + { + crate::v1::write_block(&mut self.writer, cid, block).await + } + + /// Write an [`Index`]. + /// + /// Returns the number of bytes written. + pub async fn write_index(&mut self, index: &Index) -> Result { + crate::v2::index::write_index(&mut self.writer, index).await + } + + /// Write padding (`0x0` bytes). + /// + /// Returns the number of bytes written. + pub async fn write_padding(&mut self, length: usize) -> Result { + for _ in 0..length { + self.writer.write_u8(0).await?; + } + Ok(length) + } + + /// Flushes and returns the inner writer. + pub async fn finish(mut self) -> Result { + self.writer.flush().await?; + Ok(self.writer) + } + + /// Get a mutable reference to the inner writer. + pub fn get_inner_mut(&mut self) -> &mut W { + &mut self.writer + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, io::Cursor}; + + use ipld_core::cid::Cid; + use sha2::Sha256; + use tokio::{ + fs::File, + io::{AsyncSeekExt, BufWriter}, + }; + use tokio_stream::StreamExt; + use tokio_util::io::ReaderStream; + + use crate::{ + multicodec::{generate_multihash, MultihashCode, RAW_CODE}, + test_utils::assert_buffer_eq, + unixfs::stream_balanced_tree, + v2::{ + index::{IndexEntry, IndexSorted, SingleWidthIndex}, + Header, Writer, + }, + }; + + impl Writer>> { + fn test_writer() -> Self { + let buffer = Vec::new(); + let buf_writer = BufWriter::new(buffer); + Writer::new(buf_writer) + } + } + + #[tokio::test] + async fn header_lorem() { + let file_contents = tokio::fs::read("tests/fixtures/car_v2/lorem.car") + .await + .unwrap(); + + let mut writer = Writer::test_writer(); + // To simplify testing, the values were extracted using `car inspect` + writer + .write_header(&Header::new(false, 51, 7661, 7712)) + .await + .unwrap(); + + let inner = writer.finish().await.unwrap().into_inner(); + assert_eq!(inner.len(), 51); + assert_eq!(inner, file_contents[..51]); + } + + #[tokio::test] + async fn header_spaceglenda() { + let file_contents = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + + let mut writer = Writer::test_writer(); + // To simplify testing, the values were extracted using `car inspect` + writer + .write_header(&Header::new(false, 51, 654402, 654453)) + .await + .unwrap(); + + let inner = writer.finish().await.unwrap().into_inner(); + assert_eq!(inner.len(), 51); + assert_eq!(inner, file_contents[..51]); + } + + // Byte to byte comparison to the lorem.car file + #[tokio::test] + async fn full_lorem() { + let cursor = Cursor::new(vec![]); + let buf_writer = BufWriter::new(cursor); + let mut writer = Writer::new(buf_writer); + + let file_contents = tokio::fs::read("tests/fixtures/original/lorem.txt") + .await + .unwrap(); + let contents_multihash = generate_multihash::(&file_contents); + let root_cid = Cid::new_v1(RAW_CODE, contents_multihash); + + // To simplify testing, the values were extracted using `car inspect` + writer + .write_header(&Header::new(false, 51, 7661, 7712)) + .await + .unwrap(); + + // We start writing the CARv1 here and keep the stream positions + // so that we can properly index the blocks later + let start_car_v1 = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + + writer + .write_v1_header(&crate::v1::Header::new(vec![root_cid])) + .await + .unwrap(); + + let start_car_v1_data = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + + // There's only one block + writer.write_block(&root_cid, &file_contents).await.unwrap(); + + let inner = writer.get_inner_mut(); + let written = inner.stream_position().await.unwrap(); + assert_eq!(written, 7712); + + let mut mapping = BTreeMap::new(); + mapping.insert( + Sha256::CODE, + IndexSorted::from(IndexEntry::new( + root_cid.hash().digest().to_vec(), + // This detail is "hidden" in the spec even though it's SO IMPORTANT + // See: https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted + // > Individual index entries are the concatenation of the hash digest + // > an an additional 64-bit unsigned little-endian integer indicating + // > the offset of the block from the begining of the CARv1 data payload. + start_car_v1_data - start_car_v1, + )), + ); + let index = crate::v2::index::Index::multihash(mapping); + writer.write_index(&index).await.unwrap(); + + let mut buf_writer = writer.finish().await.unwrap(); + buf_writer.rewind().await.unwrap(); + + let expected_header = tokio::fs::read("tests/fixtures/car_v2/lorem.car") + .await + .unwrap(); + + assert_buffer_eq!(&expected_header, buf_writer.get_ref().get_ref()) + } + + // Byte to byte comparison to the spaceglenda.car file + // This test also covers the nitty-gritty details of how to write a CARv2 file with indexes. + #[tokio::test] + async fn full_spaceglenda() { + let cursor = Cursor::new(vec![]); + let buf_writer = BufWriter::new(cursor); + let mut writer = Writer::new(buf_writer); + + let file = File::open("tests/fixtures/original/spaceglenda.jpg") + .await + .unwrap(); + // https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 + let file_chunker = ReaderStream::with_capacity(file, 1024 * 256); + let nodes = stream_balanced_tree(file_chunker, 11) + .collect::, _>>() + .await + .unwrap(); + + // To simplify testing, the values were extracted using `car inspect` + writer + .write_header(&Header::new(false, 51, 654402, 654453)) + .await + .unwrap(); + + // We start writing the CARv1 here and keep the stream positions + // so that we can properly index the blocks later + let start_car_v1 = { + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() + }; + + writer + .write_v1_header(&crate::v1::Header::new(vec![nodes.last().unwrap().0])) + .await + .unwrap(); + + let mut offsets = vec![]; + for (cid, block) in &nodes { + // write the blocks, saving their positions for the index + offsets.push({ + let inner = writer.get_inner_mut(); + inner.stream_position().await.unwrap() - start_car_v1 + }); + writer.write_block(cid, block).await.unwrap(); + } + + let inner = writer.get_inner_mut(); + let written = inner.stream_position().await.unwrap(); + assert_eq!(written, 654453); + + let mut mapping = BTreeMap::new(); + mapping.insert( + Sha256::CODE, + IndexSorted::from( + SingleWidthIndex::try_from( + nodes + .iter() + .zip(&offsets) + .map(|((cid, _), offset)| { + IndexEntry::new(cid.hash().digest().to_vec(), *offset) + }) + .collect::>(), + ) + .unwrap(), + ), + ); + + let index = crate::v2::index::Index::multihash(mapping); + writer.write_index(&index).await.unwrap(); + + let mut buf_writer = writer.finish().await.unwrap(); + buf_writer.rewind().await.unwrap(); + + let expected_header = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + + assert_buffer_eq!(&expected_header, buf_writer.get_ref().get_ref()); + } +} diff --git a/storage/mater/tests/fixtures/car_v1/lorem.car b/storage/mater/tests/fixtures/car_v1/lorem.car new file mode 100644 index 000000000..1b38092df Binary files /dev/null and b/storage/mater/tests/fixtures/car_v1/lorem.car differ diff --git a/storage/mater/tests/fixtures/car_v1/lorem_header.car b/storage/mater/tests/fixtures/car_v1/lorem_header.car new file mode 100644 index 000000000..7d2bdf984 Binary files /dev/null and b/storage/mater/tests/fixtures/car_v1/lorem_header.car differ diff --git a/storage/mater/tests/fixtures/car_v1/spaceglenda.car b/storage/mater/tests/fixtures/car_v1/spaceglenda.car new file mode 100644 index 000000000..f1005c79e Binary files /dev/null and b/storage/mater/tests/fixtures/car_v1/spaceglenda.car differ diff --git a/storage/mater/tests/fixtures/car_v2/lorem.car b/storage/mater/tests/fixtures/car_v2/lorem.car new file mode 100644 index 000000000..044aae994 Binary files /dev/null and b/storage/mater/tests/fixtures/car_v2/lorem.car differ diff --git a/storage/mater/tests/fixtures/car_v2/spaceglenda.car b/storage/mater/tests/fixtures/car_v2/spaceglenda.car new file mode 100644 index 000000000..4e2f1cf7e Binary files /dev/null and b/storage/mater/tests/fixtures/car_v2/spaceglenda.car differ diff --git a/storage/mater/tests/fixtures/original/lorem.txt b/storage/mater/tests/fixtures/original/lorem.txt new file mode 100644 index 000000000..8565ec8ba --- /dev/null +++ b/storage/mater/tests/fixtures/original/lorem.txt @@ -0,0 +1,10 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas ultricies mi eget mauris pharetra et ultrices neque ornare. Eu mi bibendum neque egestas. Etiam erat velit scelerisque in dictum non consectetur a. Nulla facilisi cras fermentum odio eu feugiat pretium nibh ipsum. Sagittis purus sit amet volutpat consequat mauris nunc. Turpis egestas pretium aenean pharetra magna ac placerat vestibulum. Nibh tellus molestie nunc non blandit massa enim. Ut tortor pretium viverra suspendisse potenti. +Facilisis magna etiam tempor orci eu lobortis elementum nibh. Volutpat sed cras ornare arcu dui vivamus arcu. Cursus risus at ultrices mi. Nunc eget lorem dolor sed viverra. Sit amet nisl suscipit adipiscing bibendum est ultricies integer. Id nibh tortor id aliquet lectus proin nibh. Adipiscing elit pellentesque habitant morbi tristique senectus et. Enim nec dui nunc mattis enim ut tellus elementum. Dui nunc mattis enim ut tellus. Adipiscing vitae proin sagittis nisl rhoncus mattis rhoncus urna neque. Molestie a iaculis at erat pellentesque adipiscing commodo. +Fames ac turpis egestas sed tempus urna et pharetra. Sagittis purus sit amet volutpat. Consectetur adipiscing elit duis tristique. Sit amet nisl purus in mollis nunc sed id semper. Erat pellentesque adipiscing commodo elit at imperdiet dui accumsan sit. Aliquam vestibulum morbi blandit cursus risus at ultrices mi tempus. Pellentesque dignissim enim sit amet venenatis urna cursus. Amet justo donec enim diam vulputate. Ut placerat orci nulla pellentesque dignissim. Praesent semper feugiat nibh sed pulvinar proin gravida hendrerit. Auctor neque vitae tempus quam pellentesque nec nam aliquam sem. Lectus urna duis convallis convallis tellus id interdum velit laoreet. Hendrerit dolor magna eget est lorem ipsum dolor sit. Ut sem viverra aliquet eget sit amet tellus cras. Cursus metus aliquam eleifend mi in nulla posuere sollicitudin. Dolor purus non enim praesent elementum facilisis. Dignissim sodales ut eu sem integer. Tempor orci eu lobortis elementum nibh tellus molestie nunc non. Ut enim blandit volutpat maecenas volutpat. +Tortor consequat id porta nibh venenatis cras. Sit amet purus gravida quis. Convallis convallis tellus id interdum velit laoreet. Vitae suscipit tellus mauris a diam maecenas. Phasellus egestas tellus rutrum tellus pellentesque eu tincidunt tortor aliquam. Commodo sed egestas egestas fringilla phasellus faucibus scelerisque eleifend. Turpis massa sed elementum tempus egestas sed. Sodales neque sodales ut etiam sit amet. Leo vel fringilla est ullamcorper. Pellentesque pulvinar pellentesque habitant morbi tristique senectus et netus. Dignissim convallis aenean et tortor at risus viverra. Nibh tellus molestie nunc non blandit massa enim nec. Phasellus vestibulum lorem sed risus ultricies tristique nulla aliquet. +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas ultricies mi eget mauris pharetra et ultrices neque ornare. Eu mi bibendum neque egestas. Etiam erat velit scelerisque in dictum non consectetur a. Nulla facilisi cras fermentum odio eu feugiat pretium nibh ipsum. Sagittis purus sit amet volutpat consequat mauris nunc. Turpis egestas pretium aenean pharetra magna ac placerat vestibulum. Nibh tellus molestie nunc non blandit massa enim. Ut tortor pretium viverra suspendisse potenti. +Facilisis magna etiam tempor orci eu lobortis elementum nibh. Volutpat sed cras ornare arcu dui vivamus arcu. Cursus risus at ultrices mi. Nunc eget lorem dolor sed viverra. Sit amet nisl suscipit adipiscing bibendum est ultricies integer. Id nibh tortor id aliquet lectus proin nibh. Adipiscing elit pellentesque habitant morbi tristique senectus et. Enim nec dui nunc mattis enim ut tellus elementum. Dui nunc mattis enim ut tellus. Adipiscing vitae proin sagittis nisl rhoncus mattis rhoncus urna neque. Molestie a iaculis at erat pellentesque adipiscing commodo. +Fames ac turpis egestas sed tempus urna et pharetra. Sagittis purus sit amet volutpat. Consectetur adipiscing elit duis tristique. Sit amet nisl purus in mollis nunc sed id semper. Erat pellentesque adipiscing commodo elit at imperdiet dui accumsan sit. Aliquam vestibulum morbi blandit cursus risus at ultrices mi tempus. Pellentesque dignissim enim sit amet venenatis urna cursus. Amet justo donec enim diam vulputate. Ut placerat orci nulla pellentesque dignissim. Praesent semper feugiat nibh sed pulvinar proin gravida hendrerit. Auctor neque vitae tempus quam pellentesque nec nam aliquam sem. Lectus urna duis convallis convallis tellus id interdum velit laoreet. Hendrerit dolor magna eget est lorem ipsum dolor sit. Ut sem viverra aliquet eget sit amet tellus cras. Cursus metus aliquam eleifend mi in nulla posuere sollicitudin. Dolor purus non enim praesent elementum facilisis. Dignissim sodales ut eu sem integer. Tempor orci eu lobortis elementum nibh tellus molestie nunc non. Ut enim blandit volutpat maecenas volutpat. +Tortor consequat id porta nibh venenatis cras. Sit amet purus gravida quis. Convallis convallis tellus id interdum velit laoreet. Vitae suscipit tellus mauris a diam maecenas. Phasellus egestas tellus rutrum tellus pellentesque eu tincidunt tortor aliquam. Commodo sed egestas egestas fringilla phasellus faucibus scelerisque eleifend. Turpis massa sed elementum tempus egestas sed. Sodales neque sodales ut etiam sit amet. Leo vel fringilla est ullamcorper. Pellentesque pulvinar pellentesque habitant morbi tristique senectus et netus. Dignissim convallis aenean et tortor at risus viverra. Nibh tellus molestie nunc non blandit massa enim nec. Phasellus vestibulum lorem sed risus ultricies tristique nulla aliquet. diff --git a/storage/mater/tests/fixtures/original/lorem_1024.txt b/storage/mater/tests/fixtures/original/lorem_1024.txt new file mode 100644 index 000000000..eb4ec57a2 --- /dev/null +++ b/storage/mater/tests/fixtures/original/lorem_1024.txt @@ -0,0 +1,2 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas \ No newline at end of file diff --git a/storage/mater/tests/fixtures/original/lorem_4096_dup.txt b/storage/mater/tests/fixtures/original/lorem_4096_dup.txt new file mode 100644 index 000000000..cc2bf9e95 --- /dev/null +++ b/storage/mater/tests/fixtures/original/lorem_4096_dup.txt @@ -0,0 +1,5 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Sagittis orci a scelerisque purus semper. Euismod nisi porta lorem mollis aliquam. Nisi vitae suscipit tellus mauris a diam. Purus ut faucibus pulvinar elementum integer enim. Massa sed elementum tempus egestas sed sed. Sagittis id consectetur purus ut faucibus pulvinar elementum integer enim. Eget nunc scelerisque viverra mauris in aliquam sem fringilla ut. Malesuada fames ac turpis egestas. Mi proin sed libero enim sed faucibus turpis in. +Pretium nibh ipsum consequat nisl vel pretium. Viverra suspendisse potenti nullam ac tortor vitae. Aliquet eget sit amet tellus cras adipiscing enim eu. Rhoncus mattis rhoncus urna neque. Est lorem ipsum dolor sit amet consectetur. Nisi quis eleifend quam adipiscing vitae proin sagittis. Risus sed vulputate odio ut. Ultrices eros in cursus turpis massa tincidunt dui ut. Condimentum vitae sapien pellentesque habitant morbi tristique senectus. Maecenas \ No newline at end of file diff --git a/storage/mater/tests/fixtures/original/spaceglenda.jpg b/storage/mater/tests/fixtures/original/spaceglenda.jpg new file mode 100644 index 000000000..d2bab4d66 Binary files /dev/null and b/storage/mater/tests/fixtures/original/spaceglenda.jpg differ