Skip to content

Commit

Permalink
refactor: extract add piece
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Jan 20, 2025
1 parent 60b42db commit 9a772f1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
39 changes: 9 additions & 30 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::BTreeSet, path::PathBuf, sync::Arc};

use polka_storage_proofs::{
porep::{
sealer::{prepare_piece, BlstrsProof, PreCommitOutput, Sealer, SubstrateProof},
sealer::{BlstrsProof, PreCommitOutput, Sealer, SubstrateProof},
PoRepError, PoRepParameters,
},
post::{self, PoStError, PoStParameters, ReplicaInfo},
Expand Down Expand Up @@ -36,8 +36,7 @@ use tokio::{
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use types::{
AddPieceMessage, PipelineMessage, PreCommitMessage, PreCommittedSector, ProveCommitMessage,
ProvenSector, SubmitWindowedPoStMessage, UnsealedSector,
AddPieceMessage, PipelineMessage, PreCommitMessage, PreCommittedSector, ProveCommitMessage, ProvenSector, SectorError, SubmitWindowedPoStMessage, UnsealedSector
};

use crate::db::{DBError, DealDB};
Expand All @@ -50,6 +49,8 @@ pub enum PipelineError {
#[error(transparent)]
PoRepError(#[from] PoRepError),
#[error(transparent)]
SectorError(#[from] SectorError),
#[error(transparent)]
PoStError(#[from] PoStError),
#[error(transparent)]
Join(#[from] JoinError),
Expand Down Expand Up @@ -285,7 +286,7 @@ async fn find_sector_for_piece(
.next_sector_number()
.map_err(|err| PipelineError::CustomError(err.to_string()))?;
let unsealed_path = state.unsealed_sectors_dir.join(sector_number.to_string());
let sector = UnsealedSector::create(sector_number, unsealed_path).await?;
let sector = UnsealedSector::create(state.server_info.seal_proof, sector_number, unsealed_path).await?;

Ok(sector)
}
Expand All @@ -303,35 +304,13 @@ async fn add_piece(
deal_id: u64,
) -> Result<(), PipelineError> {
let mut sector = find_sector_for_piece(&state).await?;
sector.deals.push((deal_id, deal));

tracing::info!("Adding a piece...");

let sealer = Sealer::new(state.server_info.seal_proof);
let handle: JoinHandle<Result<UnsealedSector, PipelineError>> =
tokio::task::spawn_blocking(move || {
let unsealed_sector = std::fs::File::options()
.append(true)
.open(&sector.unsealed_path)?;

tracing::info!("Preparing piece...");
let (padded_reader, piece_info) = prepare_piece(piece_path, commitment)?;
tracing::info!("Adding piece...");
let occupied_piece_space = sealer.add_piece(
padded_reader,
piece_info,
&sector.piece_infos,
unsealed_sector,
)?;

sector.piece_infos.push(piece_info);
sector.occupied_sector_space = sector.occupied_sector_space + occupied_piece_space;

Ok(sector)
});
let sector: UnsealedSector = handle.await??;

sector
.add_piece(deal_id, deal, piece_path, commitment)
.await?;
tracing::info!("Finished adding a piece");

state.db.save_sector(sector.sector_number, &sector)?;

// TODO(@th7nder,30/10/2024): simplification, as we're always scheduling a precommit just after adding a piece and creating a new sector.
Expand Down
50 changes: 50 additions & 0 deletions storage-provider/server/src/pipeline/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::path::PathBuf;

use polka_storage_proofs::porep::{sealer::{prepare_piece, Sealer}, PoRepError};
use primitives::{
commitment::{piece::PieceInfo, CommD, CommP, CommR, Commitment},
proofs::RegisteredSealProof,
sector::SectorNumber,
DealId,
};
use serde::{Deserialize, Serialize};
use storagext::types::market::DealProposal;
use tokio::task::{JoinError, JoinHandle};

/// Represents a task to be executed on the Storage Provider Pipeline
#[derive(Debug)]
Expand Down Expand Up @@ -54,10 +57,22 @@ pub struct SubmitWindowedPoStMessage {
pub deadline_index: u64,
}

#[derive(Debug, thiserror::Error)]
pub enum SectorError {
#[error(transparent)]
PoRepError(#[from] PoRepError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Join(#[from] JoinError),
}

/// Unsealed Sector which still accepts deals and pieces.
/// When sealed it's converted into [`PreCommittedSector`].
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct UnsealedSector {
seal_proof: RegisteredSealProof,

/// [`SectorNumber`] which identifies a sector in the Storage Provider.
///
/// It *should be centrally generated* by the Storage Provider, currently by [`crate::db::DealDB::next_sector_number`].
Expand Down Expand Up @@ -136,19 +151,54 @@ impl UnsealedSector {
/// Sector Number must be unique - generated by [`crate::db::DealDB::next_sector_number`]
/// otherwise the data will be overwritten.
pub async fn create(
seal_proof: RegisteredSealProof,
sector_number: SectorNumber,
unsealed_path: std::path::PathBuf,
) -> Result<UnsealedSector, std::io::Error> {
tokio::fs::File::create_new(&unsealed_path).await?;

Ok(Self {
seal_proof,
sector_number,
occupied_sector_space: 0,
piece_infos: vec![],
deals: vec![],
unsealed_path,
})
}

pub async fn add_piece(
&mut self,
deal_id: u64,
deal: DealProposal,
piece_path: PathBuf,
commitment: Commitment<CommP>,
) -> Result<(), SectorError> {
self.deals.push((deal_id, deal));
let sealer = Sealer::new(self.seal_proof);

// would love to use something like scoped spawn blocking
let pieces = self.piece_infos.clone();
let unsealed_path = self.unsealed_path.clone();
let handle: JoinHandle<Result<(PieceInfo, u64), SectorError>> =
tokio::task::spawn_blocking(move || {
let unsealed_sector = std::fs::File::options().append(true).open(unsealed_path)?;

tracing::info!("Preparing piece...");
let (padded_reader, piece_info) = prepare_piece(piece_path, commitment)?;
tracing::info!("Adding piece...");
let occupied_piece_space =
sealer.add_piece(padded_reader, piece_info, &pieces, unsealed_sector)?;

Ok((piece_info, occupied_piece_space))
});

let (piece_info, occupied_piece_space) = handle.await??;
self.piece_infos.push(piece_info);
self.occupied_sector_space = self.occupied_sector_space + occupied_piece_space;

Ok(())
}
}

impl PreCommittedSector {
Expand Down

0 comments on commit 9a772f1

Please sign in to comment.