diff --git a/.rustfmt.toml b/.rustfmt.toml
index 6380de253..a4527b7fc 100644
--- a/.rustfmt.toml
+++ b/.rustfmt.toml
@@ -1,3 +1,5 @@
edition = "2021"
group_imports = "StdExternalCrate"
+hard_tabs = false
imports_granularity = "Crate"
+tab_spaces = 4
diff --git a/Cargo.lock b/Cargo.lock
index dee871c78..45420a625 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -751,6 +751,28 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "async-stream"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite 0.2.14",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.61",
+]
+
[[package]]
name = "async-task"
version = "4.7.1"
@@ -1220,6 +1242,15 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "cbor4ii"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4"
+dependencies = [
+ "serde",
+]
+
[[package]]
name = "cc"
version = "1.0.97"
@@ -4424,6 +4455,16 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
+[[package]]
+name = "integer-encoding"
+version = "4.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf"
+dependencies = [
+ "async-trait",
+ "tokio",
+]
+
[[package]]
name = "integer-sqrt"
version = "0.1.5"
@@ -4462,6 +4503,29 @@ dependencies = [
"winreg",
]
+[[package]]
+name = "ipld-core"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b"
+dependencies = [
+ "cid 0.11.1",
+ "serde",
+ "serde_bytes",
+]
+
+[[package]]
+name = "ipld-dagpb"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c5eb3c08f1508fa62ffe1a805aafa116d2eb034b3542c85596db132f279abc47"
+dependencies = [
+ "bytes",
+ "ipld-core",
+ "quick-protobuf",
+ "thiserror",
+]
+
[[package]]
name = "ipnet"
version = "2.9.0"
@@ -5511,6 +5575,31 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5"
+[[package]]
+name = "mater"
+version = "0.1.0"
+dependencies = [
+ "async-stream",
+ "bitflags 2.5.0",
+ "byteorder",
+ "bytes",
+ "digest 0.10.7",
+ "futures",
+ "indexmap 2.2.6",
+ "integer-encoding 4.0.0",
+ "ipld-core",
+ "ipld-dagpb",
+ "quick-protobuf",
+ "rand 0.8.5",
+ "serde",
+ "serde_ipld_dagcbor",
+ "sha2 0.10.8",
+ "thiserror",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+]
+
[[package]]
name = "matrixmultiply"
version = "0.3.8"
@@ -11705,6 +11794,18 @@ dependencies = [
"syn 2.0.61",
]
+[[package]]
+name = "serde_ipld_dagcbor"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493"
+dependencies = [
+ "cbor4ii",
+ "ipld-core",
+ "scopeguard",
+ "serde",
+]
+
[[package]]
name = "serde_json"
version = "1.0.117"
@@ -13473,7 +13574,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
- "integer-encoding",
+ "integer-encoding 3.0.4",
"log",
"ordered-float",
"threadpool",
diff --git a/Cargo.toml b/Cargo.toml
index 947942c1f..6ec3cc234 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,7 +6,7 @@ license-file = "LICENSE"
repository = "https://github.com/eigerco/polka-storage"
[workspace]
-members = ["node", "runtime", "storage/polka-index"]
+members = ["node", "runtime", "storage/mater", "storage/polka-index"]
resolver = "2"
# FIXME(#@jmg-duarte,#7,14/5/24): remove the patch once something >1.11.0 is released
@@ -28,30 +28,47 @@ panic = 'abort' # Use abort on panic to reduce binary size
substrate-build-script-utils = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" }
substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" }
+async-stream = "0.3.5"
+bitflags = "2.5.0"
+byteorder = "1.5.0"
+bytes = "1.6.0"
ciborium = "0.2.2"
cid = { version = "0.11.1" }
clap = { version = "4.5.3" }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false }
color-print = "0.3.4"
+digest = "0.10.7"
futures = "0.3.28"
hex-literal = { version = "0.4.1" }
+indexmap = "2.2.6"
+integer-encoding = "4.0.0"
+ipld-core = "0.4.1"
+ipld-dagpb = "0.2.1"
+itertools = "0.13.0"
jsonrpsee = { version = "0.22" }
log = { version = "0.4.21", default-features = false }
polkavm = "0.9.3"
polkavm-derive = "0.9.1"
polkavm-linker = "0.9.2"
+quick-protobuf = "0.8.1"
quote = { version = "1.0.33" }
+rand = "0.8.5"
rocksdb = { version = "0.21" }
scale-info = { version = "2.11.1", default-features = false }
serde = { version = "1.0.197", default-features = false }
serde-big-array = { version = "0.3.2" }
serde_derive = { version = "1.0.117" }
+serde_ipld_dagcbor = "0.6.1"
serde_json = { version = "1.0.114", default-features = false }
serde_yaml = { version = "0.9" }
+sha2 = "0.10.8"
smallvec = "1.11.0"
syn = { version = "2.0.53" }
tempfile = "3.10.1"
thiserror = { version = "1.0.48" }
+tokio = "1.37.0"
+tokio-stream = "0.1.15"
+tokio-util = "0.7.11"
tracing-subscriber = { version = "0.3.18" }
# Local
diff --git a/rustfmt.toml b/rustfmt.toml
deleted file mode 100644
index 83f0002f0..000000000
--- a/rustfmt.toml
+++ /dev/null
@@ -1,2 +0,0 @@
-hard_tabs = false
-tab_spaces = 4
diff --git a/storage/mater/Cargo.toml b/storage/mater/Cargo.toml
new file mode 100644
index 000000000..741244cd0
--- /dev/null
+++ b/storage/mater/Cargo.toml
@@ -0,0 +1,35 @@
+[package]
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license-file.workspace = true
+name = "mater" # name WIP
+repository.workspace = true
+version = "0.1.0"
+
+[dependencies]
+async-stream.workspace = true
+bitflags.workspace = true
+byteorder = { workspace = true, features = ["i128"] }
+bytes.workspace = true
+digest.workspace = true
+futures.workspace = true
+indexmap.workspace = true
+integer-encoding = { workspace = true, features = ["tokio_async"] }
+ipld-core = { workspace = true, features = ["serde"] }
+ipld-dagpb.workspace = true
+quick-protobuf.workspace = true
+serde = { workspace = true, features = ["derive"] }
+serde_ipld_dagcbor.workspace = true
+sha2.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, features = ["fs", "macros", "rt"] }
+tokio-stream.workspace = true
+tokio-util = { workspace = true, features = ["io"] }
+
+[dev-dependencies]
+rand.workspace = true
+
+
+[lints]
+workspace = true
diff --git a/storage/mater/README.md b/storage/mater/README.md
new file mode 100644
index 000000000..588d4cfe7
--- /dev/null
+++ b/storage/mater/README.md
@@ -0,0 +1,29 @@
+# Mater
+
+A Rust library to read and write CAR files.
+
+This library is based on [beetle](https://github.com/n0-computer/beetle).
+
+## Specifications
+
+CARv1 specification: https://ipld.io/specs/transport/car/carv1/
+CARv2 specification: https://ipld.io/specs/transport/car/carv2/
+UnixFS specification: https://github.com/ipfs/specs/blob/e4e5754ad4a4bfbb2ebe63f4c27631f573703de0/UNIXFS.md
+
+## Developing
+
+### Overview
+
+This crate is composed of three main modules:
+
+- `unixfs/` — which covers the main UnixFS abstractions
+- `v1/` — which contains the CARv1 implementation and respective abstractions
+- `v2/` — which contains the CARv2 implementation and respective abstractions
+
+### Further notes
+
+The [`unixfs_pb.rs`](src/unixfs/unixfs_pb.rs) was automatically generated using
+[`pb-rs`](https://github.com/tafia/quick-protobuf/tree/master/pb-rs).
+The file was generated and checked-in instead of making `pb-rs` part of the build
+because the definition file ([`unixfs.proto`](src/unixfs/unixfs.proto)) does not
+change frequently, hence, there is no need to add complexity to the build process.
diff --git a/storage/mater/src/blockstore.rs b/storage/mater/src/blockstore.rs
new file mode 100644
index 000000000..024235ab5
--- /dev/null
+++ b/storage/mater/src/blockstore.rs
@@ -0,0 +1,379 @@
+// NOTE(@jmg-duarte,28/05/2024): the blockstore can (and should) evolve to support other backends.
+// At the time of writing, there is no need invest more time in it because the current PR(#25) is delayed enough.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use indexmap::IndexMap;
+use integer_encoding::VarInt;
+use ipld_core::cid::Cid;
+use sha2::{Digest, Sha256};
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
+
+use crate::{
+ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
+ Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
+};
+
+/// The default block size, as defined in
+/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13).
+const DEFAULT_BLOCK_SIZE: usize = 1024 * 256;
+
+/// The default tree width, also called links per block, as defined in
+/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30).
+const DEFAULT_TREE_WIDTH: usize = 174;
+
+/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory.
+///
+/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children.
+/// You can visualize the underlying tree in , using the "Balanced DAG" layout.
+///
+/// It is necessary to keep the blocks somewhere before writing them to a file since the CARv2 header
+/// has data size, index offset and indexes fields, all these requiring information that only becomes
+/// "available" after you process all the blocks.
+///
+/// The store keeps track of ([`Cid`], [`Bytes`]) pairs, performing de-duplication based on the [`Cid`].
+///
+/// **Important note: currently, the blockstore only supports a single file!**
+pub struct Blockstore {
+ root: Option,
+ blocks: IndexMap,
+ indexed: HashSet,
+
+ chunk_size: usize,
+ tree_width: usize,
+}
+
+impl Blockstore {
+ /// The size of the [`Header`] when encoded using [`DagCborCodec`].
+ ///
+ /// The formula is: `overhead + 37 * roots.len()`.
+ /// It is based on reversing the CBOR encoding, see an example:
+ /// ```text
+ /// A2 # map(2)
+ /// 65 # text(5)
+ /// 726F6F7473 # "roots"
+ /// 81 # array(1)
+ /// D8 2A # tag(42)
+ /// 58 25 # bytes(37)
+ /// 00015512206D623B17625E25CBDA46D17AC89C26B3DB63544701E2C0592626320DBEFD515B
+ /// 67 # text(7)
+ /// 76657273696F6E # "version"
+ /// 01 # unsigned(1)
+ /// ```
+ /// In this case we're always doing a single root, so we just use the fixed size: 58
+ ///
+ /// Is this cheating? Yes. The alternative is to encode the CARv1 header twice.
+ /// We can cache it, but for now, this should be better.
+ const V1_HEADER_OVERHEAD: u64 = 58;
+
+ /// Construct a new [`Blockstore`], using the default parameters.
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Construct a new [`Blockstore`], using custom parameters.
+ /// If set to `None`, the corresponding default value will be used.
+ pub fn with_parameters(chunk_size: Option, tree_width: Option) -> Self {
+ // NOTE(@jmg-duarte,28/05/2024): once the time comes, this method should probably be replaced with a builder
+ Self {
+ root: None,
+ blocks: IndexMap::new(),
+ indexed: HashSet::new(),
+ chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE),
+ tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH),
+ }
+ }
+
+ /// Fully read the contents of an arbitrary `reader` into the [`Blockstore`],
+ /// converting the contents into a CARv2 file.
+ pub async fn read(&mut self, reader: R) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin + Send,
+ {
+ let chunks = ReaderStream::with_capacity(reader, self.chunk_size);
+
+ // The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird
+ // but it has to do with two things:
+ // - The fact that the stream can be self-referential:
+ // https://users.rust-lang.org/t/why-is-pin-mut-needed-for-iteration-of-async-stream/51107
+ // - Using a tokio_stream::Peekable instead of futures::Peekable, they differ on who is required to be pinned
+ // - tokio_stream::Peekable::peek(&mut self)
+ // https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37
+ // - futures::Peekable::peek(self: Pin<&mut Self>)
+ // https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40
+ let tree = stream_balanced_tree(chunks, self.tree_width);
+ tokio::pin!(tree);
+ let mut tree = tree.peekable();
+
+ while let Some(block) = tree.next().await {
+ let (cid, bytes) = block?;
+ self.insert(cid, bytes, true);
+
+ // If the stream is exhausted, we know the current block is the root
+ if tree.peek().await.is_none() {
+ // The root should always be indexed, there's no official spec saying it should though, it just makes sense.
+ // So, if the insert line is changed, the root should be placed in the `indexed` structure here
+ self.root = Some(cid);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Write the contents of the [`Blockstore`] as CARv2 to a writer.
+ pub async fn write(mut self, writer: W) -> Result
+ where
+ W: AsyncWrite + Unpin,
+ {
+ let mut position = 0;
+
+ let mut writer = CarV2Writer::new(writer);
+ let header_v2 = self.header_v2();
+
+ // Writing the CARv1 starts where the CARv2 header ends
+ // this value is required for indexing,
+ // whose offset starts at the beginning of the CARv1 header
+ let car_v1_start = writer.write_header(&header_v2).await?;
+ position += car_v1_start;
+
+ // CARv1 files are REQUIRED to have a root
+ let header_v1 = self
+ .root
+ .map(|root| CarV1Header::new(vec![root]))
+ .ok_or(Error::EmptyRootsError)?;
+ position += writer.write_v1_header(&header_v1).await?;
+
+ let mut offsets = HashMap::new();
+ for (cid, block) in self.blocks.drain(..) {
+ if self.indexed.contains(&cid) {
+ offsets.insert(cid, position - car_v1_start);
+ }
+ position += writer.write_block(&cid, &block).await?;
+ }
+
+ let count = offsets.len() as u64;
+ let entries = offsets
+ .into_iter()
+ .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64))
+ .collect();
+ let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width(
+ SHA_256_CODE,
+ SingleWidthIndex::new(Sha256::output_size() as u32, count, entries).into(),
+ ));
+ position += writer.write_index(&index).await?;
+
+ Ok(position)
+ }
+
+ /// Get the [`CarV2Header`] that will be written out.
+ fn header_v2(&self) -> CarV2Header {
+ let data_offset = CarV2Header::SIZE as u64;
+ let data_size: u64 = self
+ .blocks
+ .iter()
+ .map(|(cid, bytes)| {
+ let size = (cid.encoded_len() + bytes.len()) as u64;
+ let varint_size = size.required_space() as u64;
+ size + varint_size
+ })
+ .sum();
+
+ let header_v1_varint = Self::V1_HEADER_OVERHEAD.required_space() as u64;
+ let car_v1_payload_length = Self::V1_HEADER_OVERHEAD + header_v1_varint + data_size;
+
+ // If there is padding, this does not apply, however, the go-car tool doesn't seem to ever add padding
+ let index_offset = data_offset + car_v1_payload_length;
+
+ // NOTE(@jmg-duarte,28/05/2024): the `fully_indexed` field is currently set to `false` as the
+ // go-car tool doesn't seem to ever set it, however, according to the written definition we have from the spec
+ // we're performing full indexing, as all blocks are inserted with `index: true`.
+ CarV2Header::new(false, data_offset, car_v1_payload_length, index_offset)
+ }
+
+ /// Insert a new block into the [`Blockstore`].
+ ///
+ /// If the [`Cid`] has been previously inserted, this function is a no-op.
+ fn insert(&mut self, cid: Cid, data: Bytes, index: bool) {
+ if !self.blocks.contains_key(&cid) {
+ self.blocks.insert_full(cid, data);
+ if index {
+ self.indexed.insert(cid);
+ }
+ }
+ }
+}
+
+impl Default for Blockstore {
+ fn default() -> Self {
+ Self {
+ root: None,
+ blocks: IndexMap::new(),
+ indexed: HashSet::new(),
+ chunk_size: DEFAULT_BLOCK_SIZE,
+ tree_width: DEFAULT_TREE_WIDTH,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{io::Cursor, str::FromStr};
+
+ use ipld_core::{cid::Cid, codec::Codec};
+ use ipld_dagpb::{DagPbCodec, PbNode};
+ use sha2::{Digest, Sha256};
+ use tokio::fs::File;
+
+ use crate::{
+ blockstore::Blockstore,
+ multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE},
+ test_utils::assert_buffer_eq,
+ CarV2Header, CarV2Reader, Index,
+ };
+
+ #[tokio::test]
+ async fn byte_eq_lorem() {
+ let file = File::open("tests/fixtures/original/lorem.txt")
+ .await
+ .unwrap();
+ let mut store = Blockstore::new();
+ store.read(file).await.unwrap();
+ assert_eq!(store.blocks.len(), 1);
+
+ let mut result_buffer = vec![];
+ store.write(&mut result_buffer).await.unwrap();
+
+ let car_contents = tokio::fs::read("tests/fixtures/car_v2/lorem.car")
+ .await
+ .unwrap();
+
+ assert_buffer_eq!(&result_buffer, &car_contents);
+ }
+
+ #[tokio::test]
+ async fn byte_eq_spaceglenda() {
+ let file = File::open("tests/fixtures/original/spaceglenda.jpg")
+ .await
+ .unwrap();
+ let mut store = Blockstore::new();
+ store.read(file).await.unwrap();
+ assert_eq!(store.blocks.len(), 4);
+
+ let mut result_buffer = vec![];
+ store.write(&mut result_buffer).await.unwrap();
+
+ let car_contents = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car")
+ .await
+ .unwrap();
+
+ assert_buffer_eq!(&result_buffer, &car_contents);
+ }
+
+ #[tokio::test]
+ async fn dedup_lorem() {
+ let file = File::open("tests/fixtures/original/lorem_4096_dup.txt")
+ .await
+ .unwrap();
+ let mut store = Blockstore::with_parameters(Some(1024), None);
+ store.read(file).await.unwrap();
+ // We're expecting there to exist a single data block and a root
+ assert_eq!(store.blocks.len(), 2);
+ }
+
+ // We can't fully validate this test using go-car because they don't offer parametrization
+ #[tokio::test]
+ async fn dedup_lorem_roundtrip() {
+ let file = File::open("tests/fixtures/original/lorem_4096_dup.txt")
+ .await
+ .unwrap();
+ let mut store = Blockstore::with_parameters(Some(1024), None);
+ store.read(file).await.unwrap();
+ // We're expecting there to exist a single data block and a root
+ assert_eq!(store.blocks.len(), 2);
+
+ let mut result_buffer = vec![];
+ store.write(&mut result_buffer).await.unwrap();
+
+ let mut cursor = Cursor::new(result_buffer);
+ std::io::Seek::rewind(&mut cursor).unwrap();
+ let mut car_reader = CarV2Reader::new(cursor);
+
+ car_reader.read_pragma().await.unwrap();
+
+ let car_v2_header = car_reader.read_header().await.unwrap();
+ assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE as u64);
+ // Extracted with go-car and validated with an hex viewer
+ // to extract the values, run the following commands:
+ // $ car inspect