Skip to content

Commit

Permalink
Merge branch 'develop' into feat/32/polka-storage-component-implement…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
jmg-duarte authored Jun 19, 2024
2 parents a0711b5 + d5afa03 commit 319c872
Show file tree
Hide file tree
Showing 20 changed files with 2,698 additions and 590 deletions.
198 changes: 157 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ panic = 'abort' # Use abort on panic to reduce binary size
substrate-build-script-utils = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" }
substrate-wasm-builder = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-v1.11.0" }

async-channel = "2.3.1"
async-stream = "0.3.5"
base64 = "0.22.1"
bitflags = "2.5.0"
byteorder = "1.5.0"
bytes = "1.6.0"
Expand All @@ -38,6 +40,7 @@ cid = { version = "0.11.1" }
clap = { version = "4.5.3" }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false }
color-print = "0.3.4"
criterion = "0.5.1"
digest = "0.10.7"
env_logger = "0.11.2"
futures = "0.3.28"
Expand Down Expand Up @@ -74,8 +77,9 @@ tokio = "1.37.0"
tokio-stream = "0.1.15"
tokio-util = "0.7.11"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18" }
tracing-subscriber = "0.3.18"
url = "2.5.0"
uuid = "1.8.0"

# Local
cli-primitives = { path = "primitives/cli" }
Expand Down
40 changes: 40 additions & 0 deletions storage/mater/BENCHMARK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
### How to run

Execute the benchmarks with `cargo bench`.

### Results

The benchmarks below use Median times after 100 runs. The duplication
percentages show the proportion of duplicated content in the file. The
benchmarks were performed on a machine with a `Ryzen 9 5950X` processor and
`64GB DDR4` memory.

#### read

Benchmark checks what is the time needed to fully read a content buffer into the `BlockStore`.

| Size / Duplication | 0% | 10% | 20% | 40% | 80% |
| ------------------ | --------- | --------- | --------- | --------- | --------- |
| 10 MB | 4.6776 ms | 4.5806 ms | 4.6977 ms | 4.5534 ms | 4.5038 ms |
| 100 MB | 62.419 ms | 60.895 ms | 59.461 ms | 55.355 ms | 46.792 ms |
| 1 GB | 632.34 ms | 650.01 ms | 631.49 ms | 600.01 ms | 505.58 ms |

#### write

Checks the time needed to write the CARv2 to the buffer from `BlockStore`.

| Size / Duplication | 0% | 10% | 20% | 40% | 80% |
| ------------------ | --------- | --------- | --------- | --------- | --------- |
| 10 MB | 1.6516 ms | 1.0342 ms | 875.68 µs | 772.26 µs | 354.77 µs |
| 100 MB | 12.689 ms | 10.707 ms | 9.4533 ms | 6.7805 ms | 1.7487 ms |
| 1 GB | 123.34 ms | 102.39 ms | 91.712 ms | 69.273 ms | 23.140 ms |

#### filestore

Converts a source file to the CARv2 and writes it to the output file.

| Size / Duplication | 0% | 10% | 20% | 40% | 80% |
| ------------------ | --------- | --------- | --------- | --------- | --------- |
| 10 MB | 15.145 ms | 15.179 ms | 15.162 ms | 14.501 ms | 14.836 ms |
| 100 MB | 203.85 ms | 210.14 ms | 220.38 ms | 216.34 ms | 211.12 ms |
| 1 GB | 1.7674 s | 1.8174 s | 1.8396 s | 1.8496 s | 1.8774 s |
7 changes: 6 additions & 1 deletion storage/mater/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ serde = { workspace = true, features = ["derive"] }
serde_ipld_dagcbor.workspace = true
sha2.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "rt"] }
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["io"] }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
rand.workspace = true
tempfile.workspace = true

[lints]
workspace = true

[[bench]]
harness = false
name = "benchmark"
24 changes: 14 additions & 10 deletions storage/mater/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ A Rust library to read and write CAR files.

## Specifications

CARv1 specification: https://ipld.io/specs/transport/car/carv1/
CARv2 specification: https://ipld.io/specs/transport/car/carv2/
UnixFS specification: https://github.com/ipfs/specs/blob/e4e5754ad4a4bfbb2ebe63f4c27631f573703de0/UNIXFS.md
- CARv1 specification: https://ipld.io/specs/transport/car/carv1/
- CARv2 specification: https://ipld.io/specs/transport/car/carv2/
- UnixFS specification: https://github.com/ipfs/specs/blob/e4e5754ad4a4bfbb2ebe63f4c27631f573703de0/UNIXFS.md

## Developing

Expand All @@ -26,19 +26,23 @@ The file was generated and checked-in instead of making `pb-rs` part of the buil
because the definition file ([`unixfs.proto`](src/unixfs/unixfs.proto)) does not
change frequently, hence, there is no need to add complexity to the build process.

### Benchmarks

[Read more](BENCHMARK.md)

## Acknowledgements

We'd like to thank all the people that participated in the projects mentioned in this section.
In a way or another, they were all instrumental in the implementation of the present library.

* [go-car](https://github.com/ipld/go-car) — the original implementation.
* [beetle](https://github.com/n0-computer/beetle) — the library `mater` is based on.
- [go-car](https://github.com/ipld/go-car) — the original implementation.
- [beetle](https://github.com/n0-computer/beetle) — the library `mater` is based on.
We've gutted out the important bits for this project, but without it, this work would've been much harder.
* [ImHex](https://github.com/WerWolv/ImHex) — for saving hours when comparing binary files.
- [ImHex](https://github.com/WerWolv/ImHex) — for saving hours when comparing binary files.

### Similar libraries/sources

* [Forest](https://github.com/ChainSafe/forest/blob/62e55df27a091ba7993a60cc1e72622ad8e25151/src/utils/db/car_stream.rs#L155)
* [rust-car](https://github.com/jaeaster/rust-car)
* [rs-car](https://github.com/dapplion/rs-car)
* [car-utils](https://github.com/blocklessnetwork/car-utils)
- [Forest](https://github.com/ChainSafe/forest/blob/62e55df27a091ba7993a60cc1e72622ad8e25151/src/utils/db/car_stream.rs#L155)
- [rust-car](https://github.com/jaeaster/rust-car)
- [rs-car](https://github.com/dapplion/rs-car)
- [car-utils](https://github.com/blocklessnetwork/car-utils)
219 changes: 219 additions & 0 deletions storage/mater/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use std::{
fmt::Display,
io::Cursor,
path::{Path, PathBuf},
sync::OnceLock,
};

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use mater::{create_filestore, Blockstore, Config};
use rand::{prelude::SliceRandom, rngs::ThreadRng, Rng};
use tempfile::{tempdir, TempDir};
use tokio::{fs::File, runtime::Runtime as TokioExecutor};

static FILES: OnceLock<Vec<(Params, PathBuf, TempDir)>> = OnceLock::new();
fn get_source_files() -> &'static Vec<(Params, PathBuf, TempDir)> {
FILES.get_or_init(|| {
let params = get_params();
let mut contents = vec![];

for param in params {
// Prepare temporary files
let content = generate_content(&param);
let (temp_dir, source_file) = prepare_source_file(&content);

contents.push((param, source_file, temp_dir));
}

contents
})
}

/// Get content sizes for the benchmarks.
const SIZES: [usize; 3] = [
1024 * 10000, // 10 MB
1024 * 100000, // 100 MB
1024 * 1000000, // 1 GB
];

/// The percentage of duplicated content in the file. e.g. 0.8 means 80% of the file is duplicated.
const DUPLICATIONS: [f64; 5] = [0.0, 0.1, 0.2, 0.4, 0.8];

/// The default block size
const BLOCK_SIZE: usize = 1024 * 256;

/// A chunk of data
#[derive(Debug, Clone, Copy)]
struct Chunk([u8; BLOCK_SIZE]);

impl Chunk {
fn new_random(rng: &mut ThreadRng) -> Self {
Self([0; BLOCK_SIZE].map(|_| rng.gen()))
}

fn new_zeroed() -> Self {
Self([0; BLOCK_SIZE])
}
}

impl IntoIterator for Chunk {
type Item = u8;
type IntoIter = std::array::IntoIter<u8, BLOCK_SIZE>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

#[derive(Debug, Clone, Copy)]
struct Params {
/// The size of the content in bytes.
size: usize,
/// The percentage of duplicated content in the file.
duplication: f64,
}

impl Display for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"content_size: {} bytes, percent_duplicated: {}",
self.size, self.duplication
)
}
}

/// Get combination of parameters for the benchmarks.
fn get_params() -> Vec<Params> {
SIZES
.iter()
.flat_map(|&size| {
DUPLICATIONS
.iter()
.map(move |&duplication| Params { size, duplication })
})
.collect()
}

/// Generate content for the benchmarks. The duplicated data is placed between
/// the random chunks.
fn generate_content(params: &Params) -> Vec<u8> {
let num_chunks = params.size / BLOCK_SIZE;
let mut chunks = Vec::with_capacity(num_chunks);
let mut rng = rand::thread_rng();

// Generate zeroed chunks for the specified percentage of the content. Other
// part is filled with random chunks.
for index in 1..=num_chunks {
let percentage_processed = index as f64 / num_chunks as f64;
if percentage_processed < params.duplication {
chunks.push(Chunk::new_zeroed());
} else {
chunks.push(Chunk::new_random(&mut rng));
}
}

// Shuffle the chunks
chunks.shuffle(&mut rng);

// Flatten the chunks into a single byte array
let mut bytes = chunks.into_iter().flatten().collect::<Vec<u8>>();

// There can be some bytes missing because we are generating data in chunks.
// We append the random data at the end.
let missing_bytes_len = params.size - bytes.len();
bytes.extend(
(0..missing_bytes_len)
.map(|_| rng.gen())
.collect::<Vec<u8>>(),
);

bytes
}

/// Read content to a Blockstore. This function is benchmarked.
async fn read_content_benched(content: &[u8], mut store: Blockstore) {
let cursor = Cursor::new(content);
store.read(cursor).await.unwrap()
}

fn read(c: &mut Criterion) {
let files = get_source_files();

for (params, source_file, _) in files {
let content = std::fs::read(&source_file).unwrap();

c.bench_with_input(BenchmarkId::new("read", params), params, |b, _params| {
b.to_async(TokioExecutor::new().unwrap()).iter(|| {
read_content_benched(
&content,
Blockstore::with_parameters(Some(BLOCK_SIZE), None),
)
});
});
}
}

/// Write content from a Blockstore. This function is benchmarked.
async fn write_contents_benched(buffer: Vec<u8>, store: Blockstore) {
store.write(buffer).await.unwrap();
}

fn write(c: &mut Criterion) {
let runtime = TokioExecutor::new().unwrap();
let files = get_source_files();

for (params, source_file, _) in files {
let mut blockstore = Blockstore::with_parameters(Some(BLOCK_SIZE), None);

// Read file contents to the blockstore
runtime.block_on(async {
let file = File::open(&source_file).await.unwrap();
blockstore.read(file).await.unwrap()
});

c.bench_with_input(BenchmarkId::new("write", params), &(), |b, _: &()| {
b.to_async(TokioExecutor::new().unwrap()).iter_batched(
|| (blockstore.clone(), Vec::with_capacity(params.size)),
|(blockstore, buffer)| write_contents_benched(buffer, blockstore),
BatchSize::SmallInput,
);
});
}
}

/// Prepare temporary file
fn prepare_source_file(content: &[u8]) -> (TempDir, PathBuf) {
let temp_dir = tempdir().unwrap();
let file = temp_dir.path().join("source_file");

// Write content to the file
std::fs::write(&file, &content).unwrap();

(temp_dir, file)
}

/// Create a filestore. This function is benchmarked.
async fn create_filestore_benched(source: &Path, target: &Path) {
create_filestore(source, target, Config::default())
.await
.unwrap();
}

fn filestore(c: &mut Criterion) {
let files = get_source_files();

for (params, source_file, temp_dir) in files {
let target_file = temp_dir.path().join("target");

c.bench_with_input(BenchmarkId::new("filestore", params), &(), |b, _: &()| {
b.to_async(TokioExecutor::new().unwrap())
.iter(|| create_filestore_benched(&source_file, &target_file));
});
}
}

criterion_group!(bench_reading, read);
criterion_group!(bench_writing, write);
criterion_group!(bench_filestore, filestore);
criterion_main!(bench_reading, bench_writing, bench_filestore);
7 changes: 5 additions & 2 deletions storage/mater/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Both version 1 and version 2 are supported.
//!
//! You can make use of the lower-level utilities such as [`CarV2Reader`] to read a CARv2 file,
//! though these utilies were designed to be used in higher-level abstractions, like the [`Blockstore`].
//! though these utilities were designed to be used in higher-level abstractions, like the [`Blockstore`].
#![warn(unused_crate_dependencies)]
#![warn(missing_docs)]
Expand All @@ -18,7 +18,7 @@ mod v2;

// We need to expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use stores::{create_filestore, Blockstore};
pub use stores::{create_filestore, Blockstore, Config};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, MultihashIndexSorted,
Expand Down Expand Up @@ -131,6 +131,9 @@ pub(crate) mod test_utils {
use std::path::Path;

pub(crate) use assert_buffer_eq;
/// This is here so that our build doesn't fail. It thinks that the
/// criterion is not used. But it is used by the benchmarks.
use criterion as _;
use tokio::{fs::File, io::AsyncWriteExt};

/// Dump a byte slice into a file.
Expand Down
1 change: 1 addition & 0 deletions storage/mater/src/stores/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
/// The store keeps track of ([`Cid`], [`Bytes`]) pairs, performing de-duplication based on the [`Cid`].
///
/// **Important note: currently, the blockstore only supports a single file!**
#[derive(Debug, Clone)]
pub struct Blockstore {
root: Option<Cid>,
blocks: IndexMap<Cid, Bytes>,
Expand Down
2 changes: 2 additions & 0 deletions storage/mater/src/stores/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub enum Config {
/// generating byte chunks of `chunk_size` and
/// generating parent nodes every `tree_width` nodes.
Balanced {
/// The size of the byte chunks.
chunk_size: usize,
/// The number of children per parent node.
tree_width: usize,
},
}
Expand Down
Loading

0 comments on commit 319c872

Please sign in to comment.