Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Filestore #57

Merged
merged 8 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
593 changes: 293 additions & 300 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion storage/mater/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tokio-util = { workspace = true, features = ["io"] }

[dev-dependencies]
rand.workspace = true

tempfile.workspace = true

[lints]
workspace = true
19 changes: 17 additions & 2 deletions storage/mater/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

A Rust library to read and write CAR files.

This library is based on [beetle](https://github.com/n0-computer/beetle).

## Specifications

CARv1 specification: https://ipld.io/specs/transport/car/carv1/
Expand All @@ -27,3 +25,20 @@ The [`unixfs_pb.rs`](src/unixfs/unixfs_pb.rs) was automatically generated using
The file was generated and checked-in instead of making `pb-rs` part of the build
because the definition file ([`unixfs.proto`](src/unixfs/unixfs.proto)) does not
change frequently, hence, there is no need to add complexity to the build process.

## Acknowledgements

We'd like to thank all the people that participated in the projects mentioned in this section.
In a way or another, they were all instrumental in the implementation of the present library.

* [go-car](https://github.com/ipld/go-car) — the original implementation.
* [beetle](https://github.com/n0-computer/beetle) — the library `mater` is based on.
We've gutted out the important bits for this project, but without it, this work would've been much harder.
* [ImHex](https://github.com/WerWolv/ImHex) — for saving hours when comparing binary files.

### Similar libraries/sources

* [Forest](https://github.com/ChainSafe/forest/blob/62e55df27a091ba7993a60cc1e72622ad8e25151/src/utils/db/car_stream.rs#L155)
* [rust-car](https://github.com/jaeaster/rust-car)
* [rs-car](https://github.com/dapplion/rs-car)
* [car-utils](https://github.com/blocklessnetwork/car-utils)
21 changes: 19 additions & 2 deletions storage/mater/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
#![deny(rustdoc::private_intra_doc_links)]
#![deny(unsafe_code)]

mod blockstore;
mod multicodec;
mod stores;
mod unixfs;
mod v1;
mod v2;

pub use blockstore::Blockstore;
// We need to expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use stores::{Blockstore, Filestore};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, MultihashIndexSorted,
Expand Down Expand Up @@ -127,6 +127,23 @@ pub(crate) mod test_utils {
}
}};
}
use std::path::Path;

pub(crate) use assert_buffer_eq;
use tokio::{fs::File, io::AsyncWriteExt};

/// Dump a byte slice into a file.
///
/// * If *anything* goes wrong, the function will panic.
/// * If the file doesn't exist, it will be created.
/// * If the file exists, it will be overwritten and truncated.
#[allow(dead_code)] // This function is supposed to be a debugging helper
jmg-duarte marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn dump<P, B>(path: P, bytes: B)
where
P: AsRef<Path>,
B: AsRef<[u8]>,
{
let mut file = File::create(path).await.unwrap();
file.write_all(bytes.as_ref()).await.unwrap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,12 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;

use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH};
use crate::{
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
};

/// The default block size, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13).
const DEFAULT_BLOCK_SIZE: usize = 1024 * 256;

/// The default tree width, also called links per block, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30).
const DEFAULT_TREE_WIDTH: usize = 174;

/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory.
///
/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children.
Expand Down Expand Up @@ -228,8 +221,8 @@ mod tests {
use tokio::fs::File;

use crate::{
blockstore::Blockstore,
multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE},
stores::blockstore::Blockstore,
test_utils::assert_buffer_eq,
CarV2Header, CarV2Reader, Index,
};
Expand Down
148 changes: 148 additions & 0 deletions storage/mater/src/stores/filestore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::path::PathBuf;

use sha2::{Digest, Sha256};
use tokio::{fs::File, io::AsyncSeekExt};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;

use super::Config;
use crate::{
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
};

/// A file-backed CAR store.
pub struct Filestore {
source_path: PathBuf,
output_path: PathBuf,

config: Config,
}

impl Filestore {
/// Create a new [`Filestore`] with the passed [`Config`].
pub fn new<Src, Out>(source: Src, output: Out, config: Config) -> Self
where
Src: Into<PathBuf>,
Out: Into<PathBuf>,
{
Self {
source_path: source.into(),
jmg-duarte marked this conversation as resolved.
Show resolved Hide resolved
output_path: output.into(),
config,
}
}

async fn balanced_import(&self, chunk_size: usize, tree_width: usize) -> Result<(), Error> {
let mut source = File::open(&self.source_path).await?;
let chunker = ReaderStream::with_capacity(&mut source, chunk_size);
let nodes = stream_balanced_tree(chunker, tree_width);
tokio::pin!(nodes);
let mut nodes = nodes.peekable();

let mut output = File::create(&self.output_path).await?;
let mut writer = CarV2Writer::new(&mut output);
let mut position = 0;

let placeholder_header = CarV2Header::default();
position += writer.write_header(&placeholder_header).await?;
let car_v1_start = position;

let placeholder_header_v1 = CarV1Header::default();
position += writer.write_v1_header(&placeholder_header_v1).await?;

let mut root = None;
let mut entries = vec![];
while let Some(node) = nodes.next().await {
let (node_cid, node_bytes) = node?;
let digest = node_cid.hash().digest().to_owned();
let entry = IndexEntry::new(digest, (position - car_v1_start) as u64);
entries.push(entry);
position += writer.write_block(&node_cid, &node_bytes).await?;

if nodes.peek().await.is_none() {
root = Some(node_cid);
}
}

let index_offset = position;
let single_width_index =
SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries);
let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width(
SHA_256_CODE,
single_width_index.into(),
));
writer.write_index(&index).await?;

// Go back to the beginning of the file
writer.get_inner_mut().rewind().await?;
let header = CarV2Header::new(
false,
(car_v1_start) as u64,
(index_offset - car_v1_start) as u64,
(index_offset) as u64,
);
writer.write_header(&header).await?;

// If the length of the roots doesn't match the previous one, you WILL OVERWRITE parts of the file
let header_v1 = CarV1Header::new(vec![root.expect("root should have been set")]);
writer.write_v1_header(&header_v1).await?;

Ok(())
}

/// Read the input file, creating a new CARv2 file
// NOTE(@jmg-duarte,31/05/2024): Looking for a better name
pub async fn convert(&self) -> Result<(), Error> {
th7nder marked this conversation as resolved.
Show resolved Hide resolved
match self.config {
Config::Balanced {
chunk_size,
tree_width,
} => self.balanced_import(chunk_size, tree_width).await,
}
}
}

#[cfg(test)]
mod test {
use std::path::Path;

use tempfile::tempdir;

use crate::{stores::Config, test_utils::assert_buffer_eq, Filestore};

async fn test_filestore_roundtrip<P1, P2>(original: P1, expected: P2)
where
P1: AsRef<Path>,
P2: AsRef<Path>,
{
let temp_dir = tempdir().unwrap();
let temp_path = temp_dir.path().join("lorem.car");

let filestore = Filestore::new(original.as_ref(), &temp_path, Config::default());
filestore.convert().await.unwrap();

let expected = tokio::fs::read(expected.as_ref()).await.unwrap();
let result = tokio::fs::read(temp_path).await.unwrap();

assert_buffer_eq!(&expected, &result);
}

#[tokio::test]
async fn test_filestore_lorem() {
test_filestore_roundtrip(
"tests/fixtures/original/lorem.txt",
"tests/fixtures/car_v2/lorem.car",
)
.await
}

#[tokio::test]
async fn test_filestore_spaceglenda() {
test_filestore_roundtrip(
"tests/fixtures/original/spaceglenda.jpg",
"tests/fixtures/car_v2/spaceglenda.car",
)
.await
}
}
43 changes: 43 additions & 0 deletions storage/mater/src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
mod blockstore;
mod filestore;

pub use blockstore::Blockstore;
pub use filestore::Filestore;

/// The default block size, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13).
pub(crate) const DEFAULT_BLOCK_SIZE: usize = 1024 * 256;

/// The default tree width, also called links per block, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30).
pub(crate) const DEFAULT_TREE_WIDTH: usize = 174;

/// Store configuration options.
pub enum Config {
/// The store should use the balanced tree layout,
/// generating byte chunks of `chunk_size` and
/// generating parent nodes every `tree_width` nodes.
Balanced {
chunk_size: usize,
tree_width: usize,
},
}

impl Config {
/// Create a new [`Config::Balanced`].
pub fn balanced(chunk_size: usize, tree_width: usize) -> Self {
Self::Balanced {
chunk_size,
tree_width,
}
}
}

impl Default for Config {
fn default() -> Self {
Self::Balanced {
chunk_size: DEFAULT_BLOCK_SIZE,
tree_width: DEFAULT_TREE_WIDTH,
}
}
}
31 changes: 30 additions & 1 deletion storage/mater/src/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
mod reader;
mod writer;

use ipld_core::cid::Cid;
use ipld_core::cid::{multihash::Multihash, Cid};
use serde::{Deserialize, Serialize};

use crate::multicodec::{RAW_CODE, SHA_256_CODE};
pub use crate::v1::{reader::Reader, writer::Writer};
pub(crate) use crate::v1::{
reader::{read_block, read_header},
writer::{write_block, write_header},
};

/// The SHA256 hash over a 32-byte array filled with zeroes.
const DEFAULT_HASH: [u8; 32] = [
0x66, 0x68, 0x7a, 0xad, 0xf8, 0x62, 0xbd, 0x77, 0x6c, 0x8f, 0xc1, 0x8b, 0x8e, 0x9f, 0x8e, 0x20,
0x08, 0x97, 0x14, 0x85, 0x6e, 0xe2, 0x33, 0xb3, 0x90, 0x2a, 0x59, 0x1d, 0x0d, 0x5f, 0x29, 0x25,
];

/// Low-level CARv1 header.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Header {
Expand All @@ -33,6 +40,28 @@ impl Header {
}
}

impl Default for Header {
/// Creates a "placeholder" [`Header`].
///
/// This is useful when converting a regular file
/// to a CARv1 file, where you don't know the root beforehand.
///
/// If you need more than one root, please use [`Self::new`] instead.
// NOTE(@jmg-duarte,29/05/2024): why tf doesn't the previous intradoc link work??
jmg-duarte marked this conversation as resolved.
Show resolved Hide resolved
jmg-duarte marked this conversation as resolved.
Show resolved Hide resolved
fn default() -> Self {
// Multihash::default does not return a multihash with the usually expected length
// thus, we wrap a default SHA256. We're required to do this because otherwise writing
// placeholder headers will fail
let default_multihash =
Multihash::wrap(SHA_256_CODE, &DEFAULT_HASH).expect("default hash to be valid");
serg-temchenko marked this conversation as resolved.
Show resolved Hide resolved
let default_cid = Cid::new_v1(RAW_CODE, default_multihash);
Self {
version: 1,
roots: vec![default_cid],
}
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand Down
16 changes: 16 additions & 0 deletions storage/mater/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ impl Header {
pub const SIZE: usize = PRAGMA.len() + 40;
}

impl Default for Header {
/// Creates a "placeholder" [`Header`].
///
/// This is useful when converting from an arbitrary file
/// to a CARv2 file, where the header contains information
/// that is only available after processing the whole input.
fn default() -> Self {
Self {
characteristics: Characteristics::empty(),
data_offset: 0,
data_size: 0,
index_offset: 0,
}
}
}

#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, io::Cursor};
Expand Down