Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sector indexing and data retrieval #688

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions mater/lib/src/cid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use tokio::io::{AsyncRead, AsyncReadExt};

use crate::{async_varint::read_varint, IDENTITY_CODE};

/// Extension trait for Cid
pub trait CidExt {
async fn read_bytes_async<R>(r: R) -> Result<(Self, usize), Error>
/// Reads the bytes from a byte stream.
fn read_bytes_async<R>(r: R) -> impl std::future::Future<Output = Result<(Self, usize), Error>>
where
Self: Sized,
R: AsyncRead + Unpin;
Expand All @@ -13,8 +15,10 @@ pub trait CidExt {
fn get_identity_data(&self) -> Option<&[u8]>;
}

/// Extension trait for Multihash
pub trait MultihashExt {
async fn read_async<R>(r: R) -> Result<(Self, usize), Error>
/// Reads the bytes from a byte stream.
fn read_async<R>(r: R) -> impl std::future::Future<Output = Result<(Self, usize), Error>>
where
Self: Sized,
R: AsyncRead + Unpin;
Expand Down Expand Up @@ -62,7 +66,6 @@ impl<const S: usize> MultihashExt for Multihash<S> {
/// https://github.com/multiformats/rust-multihash/blob/90a6c19ec71ced09469eec164a3586aafeddfbbd/src/multihash.rs#L271
async fn read_async<R>(mut r: R) -> Result<(Self, usize), Error>
where
Self: Sized,
R: AsyncRead + Unpin,
{
let (code, code_bytes_read): (u64, usize) = read_varint(&mut r).await?;
Expand Down
3 changes: 2 additions & 1 deletion mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ mod v1;
mod v2;

// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use cid::{CidExt, MultihashExt};
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v1::{BlockMetadata, Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
MultihashIndexSorted, Reader as CarV2Reader, SingleWidthIndex, Writer as CarV2Writer,
Expand Down
6 changes: 3 additions & 3 deletions mater/lib/src/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ impl Default for Header {
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct BlockMetadata {
/// Cid of the block
cid: Cid,
pub cid: Cid,
/// Offset of the data section relative to the start of the underlying
/// reader.
data_offset_source: u64,
pub data_offset_source: u64,
/// Size of the data section of the block
data_size: u64,
pub data_size: u64,
}

#[cfg(test)]
Expand Down
51 changes: 36 additions & 15 deletions storage-provider/common/src/sector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ pub struct UnsealedSector {
/// Indexes match with corresponding deals in [`Sector::deals`].
pub piece_infos: Vec<PieceInfo>,

/// Tracks locations of the actual pieces added to the unsealed sector. This
/// vector does not contain padding pieces. It contains the actual pieces
/// corresponding with the deals from the users.
pub pieces_locations: Vec<(Commitment<CommP>, PathBuf)>,

/// Tracks all of the deals that have been added to the sector.
pub deals: Vec<(DealId, DealProposal)>,

Expand Down Expand Up @@ -94,6 +99,7 @@ impl UnsealedSector {
occupied_sector_space: 0,
piece_infos: vec![],
deals: vec![],
pieces_locations: vec![],
unsealed_path,
})
}
Expand All @@ -108,24 +114,29 @@ impl UnsealedSector {
self.deals.push((deal_id, deal));
let sealer = select_sealer(self.seal_proof);

// would love to use something like scoped spawn blocking
let pieces = self.piece_infos.clone();
let unsealed_path = self.unsealed_path.clone();
let handle: JoinHandle<Result<(PieceInfo, u64), SectorError>> =
tokio::task::spawn_blocking(move || {
let unsealed_sector = std::fs::File::options().append(true).open(unsealed_path)?;

tracing::info!("Preparing piece...");
let (padded_reader, piece_info) = prepare_piece(piece_path, commitment)?;
tracing::info!("Adding piece...");
let occupied_piece_space =
sealer.add_piece(padded_reader, piece_info, &pieces, unsealed_sector)?;

Ok((piece_info, occupied_piece_space))
tokio::task::spawn_blocking({
let pieces = self.piece_infos.clone();
let unsealed_path = self.unsealed_path.clone();
let piece_path = piece_path.clone();

move || {
let unsealed_sector =
std::fs::File::options().append(true).open(unsealed_path)?;

tracing::info!("Preparing piece...");
let (padded_reader, piece_info) = prepare_piece(piece_path, commitment)?;
tracing::info!("Adding piece...");
let occupied_piece_space =
sealer.add_piece(padded_reader, piece_info, &pieces, unsealed_sector)?;

Ok((piece_info, occupied_piece_space))
}
});

let (piece_info, occupied_piece_space) = handle.await??;
self.piece_infos.push(piece_info);
self.pieces_locations.push((commitment, piece_path));
self.occupied_sector_space = self.occupied_sector_space + occupied_piece_space;

Ok(())
Expand Down Expand Up @@ -263,6 +274,11 @@ pub struct PreCommittedSector {
/// Indexes match with corresponding deals in [`Sector::deals`].
pub piece_infos: Vec<PieceInfo>,

/// Tracks locations of the actual pieces added to the unsealed sector. This
/// vector does not contain padding pieces. It contains the actual pieces
/// corresponding with the deals from the users.
pub pieces_locations: Vec<(Commitment<CommP>, PathBuf)>,

/// Tracks all of the deals that have been added to the sector.
pub deals: Vec<(DealId, DealProposal)>,

Expand Down Expand Up @@ -311,12 +327,11 @@ impl PreCommittedSector {
seal_randomness_height: u64,
precommit_block: u64,
) -> Result<Self, std::io::Error> {
tokio::fs::remove_file(unsealed.unsealed_path).await?;

Ok(Self {
seal_proof: unsealed.seal_proof,
sector_number: unsealed.sector_number,
piece_infos: unsealed.piece_infos,
pieces_locations: unsealed.pieces_locations,
deals: unsealed.deals,
cache_path,
sealed_path,
Expand Down Expand Up @@ -478,6 +493,11 @@ pub struct ProvenSector {
/// Indexes match with corresponding deals in [`Sector::deals`].
pub piece_infos: Vec<PieceInfo>,

/// Tracks locations of the actual pieces added to the unsealed sector. This
/// vector does not contain padding pieces. It contains the actual pieces
/// corresponding with the deals from the users.
pub pieces_locations: Vec<(Commitment<CommP>, PathBuf)>,

/// Tracks all of the deals that have been added to the sector.
pub deals: Vec<(DealId, DealProposal)>,

Expand All @@ -501,6 +521,7 @@ impl ProvenSector {
Self {
sector_number: sector.sector_number,
piece_infos: sector.piece_infos,
pieces_locations: sector.pieces_locations,
deals: sector.deals,
cache_path: sector.cache_path,
sealed_path: sector.sealed_path,
Expand Down
3 changes: 3 additions & 0 deletions storage-provider/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ delia = []
mater = { workspace = true }
polka-storage-proofs = { workspace = true, features = ["std", "substrate"] }
polka-storage-provider-common = { workspace = true }
polka-storage-retrieval = { workspace = true }
primitives = { workspace = true, features = ["clap", "serde", "std"] }
storagext = { workspace = true, features = ["clap"] }

async-stream.workspace = true
async-trait = { workspace = true }
axum = { workspace = true, features = ["macros", "multipart"] }
base64 = { workspace = true }
blockstore = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
ciborium = { workspace = true }
cid = { workspace = true, features = ["serde", "std"] }
Expand Down
12 changes: 12 additions & 0 deletions storage-provider/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ fn default_node_address() -> Url {
Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url")
}

fn default_retrieval_address() -> Multiaddr {
"/ip4/127.0.0.1/tcp/8002"
.to_string()
.parse()
.expect("multiaddres is correct")
}

#[derive(Debug, Clone, Deserialize, Args)]
#[group(multiple = true, conflicts_with = "config")]
#[serde(deny_unknown_fields)]
Expand All @@ -61,6 +68,11 @@ pub struct ConfigurationArgs {
#[arg(long, default_value_t = default_node_address())]
pub(crate) node_url: Url,

/// Storage provider listen address.
#[serde(default = "default_retrieval_address")]
#[arg(long, default_value_t = default_retrieval_address())]
pub(crate) retrieval_listen_address: Multiaddr,

/// RocksDB storage directory.
/// Defaults to a temporary random directory, like `/tmp/<random>/deals_database`.
#[arg(long)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use cid::{
multihash::{self, Multihash},
Cid,
};
use primitives::{sector::SectorNumber, DealId};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -182,16 +183,6 @@ impl Default for PieceInfo {
}
}

/// Identifier for a retrieval deal (unique to a client)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DealId(u64);

impl From<u64> for DealId {
fn from(value: u64) -> Self {
Self(value)
}
}

// TODO(@jmg-duarte,14/06/2024): validate miner address

/// The storage provider address.
Expand All @@ -217,19 +208,6 @@ impl From<String> for StorageProviderAddress {
}
}

/// Numeric identifier for a sector. It is usually relative to a storage provider.
///
/// For more information on sectors, see:
/// <https://spec.filecoin.io/#section-systems.filecoin_mining.sector>
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SectorNumber(u64);

impl From<u64> for SectorNumber {
fn from(value: u64) -> Self {
Self(value)
}
}

/// Information about a single *storage* deal for a given piece.
///
/// Source:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl RocksDBLid {
/// * If the column families ([`PIECE_CID_TO_CURSOR_CF`],
/// [`MULTIHASH_TO_PIECE_CID_CF`], [`PIECE_CID_TO_FLAGGED_CF`],
/// [`CURSOR_TO_OFFSET_SIZE_CF`]) do not exist, they will be created.
/// * If the cursor is not initialized. It will be initialized with the 100 value.
pub fn new(config: RocksDBStateStoreConfig) -> Result<Self, LidError>
where
Self: Sized,
Expand All @@ -123,11 +124,17 @@ impl RocksDBLid {
opts.create_if_missing(true);
opts.create_missing_column_families(true);

Ok(Self {
// Initialize the db
let database = Self {
database: RocksDB::open_cf_descriptors(&opts, config.path, column_families)?,
offset: 0,
checked: HashMap::new(),
})
};

// Initialize the cursor
database.init_cursor()?;

Ok(database)
}

/// Get the column family handle for the given column family name.
Expand Down Expand Up @@ -264,6 +271,18 @@ impl RocksDBLid {
Ok(multihash)
}

/// Initialize cursor with the default value if not set already.
fn init_cursor(&self) -> Result<(), LidError> {
if let Err(err) = self.get_next_cursor() {
if matches!(err, LidError::CursorNotFound) {
self.set_next_cursor(100)?;
} else {
return Err(err);
}
}
Ok(())
}

/// Get the next available cursor.
///
/// Returns [`LidError::CursorNotFound`] if no cursor has been set.
Expand All @@ -274,7 +293,8 @@ impl RocksDBLid {
/// Source:
/// * <https://github.com/filecoin-project/boost/blob/16a4de2af416575f60f88c723d84794f785d2825/extern/boostd-data/ldb/db.go#L109-L118>
fn get_next_cursor(&self) -> Result<(u64, String), LidError> {
let pinned_slice = self.database.get_pinned(NEXT_CURSOR_KEY)?;
let pinned_slice: Option<rocksdb::DBPinnableSlice> =
self.database.get_pinned(NEXT_CURSOR_KEY)?;
let Some(pinned_slice) = pinned_slice else {
// In most places the original source code has some special handling for the missing key,
// however, that does not apply for a missing cursor
Expand Down Expand Up @@ -600,6 +620,12 @@ impl Service for RocksDBLid {
let mut records = vec![];
for it in iterator {
let (key, value) = it?;

// TODO(@cernicc,31/01/2025): The NEXT_CURSOR_KEY is returned as a key. Not sure why.
if key.as_ref() == NEXT_CURSOR_KEY.as_bytes() {
continue;
}

// With some trickery, we can probably get rid of this allocation
let key = key
.to_vec()
Expand Down Expand Up @@ -932,13 +958,13 @@ mod test {
use sha2::{Digest, Sha256};
use tempfile::tempdir;

use super::{key_flag_piece, RocksDBLid, RocksDBStateStoreConfig};
use crate::local_index_directory::{
use super::{RocksDBLid, RocksDBStateStoreConfig};
use crate::indexer::local_index_directory::{
rdb::{
key_cursor_prefix, MULTIHASH_TO_PIECE_CID_CF, PIECE_CID_TO_CURSOR_CF,
key_cursor_prefix, key_flag_piece, MULTIHASH_TO_PIECE_CID_CF, PIECE_CID_TO_CURSOR_CF,
PIECE_CID_TO_FLAGGED_CF, RAW_CODEC,
},
DealId, DealInfo, FlaggedPiece, FlaggedPiecesListFilter, IndexRecord, LidError, OffsetSize,
DealInfo, FlaggedPiece, FlaggedPiecesListFilter, IndexRecord, LidError, OffsetSize,
PieceInfo, Service, StorageProviderAddress,
};

Expand All @@ -947,6 +973,7 @@ mod test {
let config = RocksDBStateStoreConfig {
path: tmp_dir.path().join("rocksdb"),
};

RocksDBLid::new(config).unwrap()
}

Expand All @@ -962,7 +989,7 @@ mod test {
DealInfo {
deal_uuid: uuid::Uuid::new_v4(),
is_legacy: false,
chain_deal_id: 1337.into(),
chain_deal_id: 1337,
storage_provider_address: "address".to_string().into(),
sector_number: 42.into(),
piece_offset: 10,
Expand Down Expand Up @@ -1711,7 +1738,7 @@ mod test {
piece_info.deals.push(DealInfo {
deal_uuid: uuid::Uuid::new_v4(),
is_legacy: false,
chain_deal_id: DealId(i),
chain_deal_id: i,
storage_provider_address: storage_provider_address.clone(),
sector_number: 0.into(),
piece_offset: 0,
Expand Down
Loading
Loading