diff --git a/storage-provider/server/src/pipeline/mod.rs b/storage-provider/server/src/pipeline/mod.rs index 47a092c67..c51629417 100644 --- a/storage-provider/server/src/pipeline/mod.rs +++ b/storage-provider/server/src/pipeline/mod.rs @@ -11,7 +11,7 @@ use polka_storage_proofs::{ }; use polka_storage_provider_common::rpc::ServerInfo; use primitives::{ - commitment::{CommD, CommP, CommR, Commitment}, + commitment::{CommP, Commitment}, proofs::derive_prover_id, randomness::{draw_randomness, DomainSeparationTag}, sector::SectorNumber, @@ -20,7 +20,7 @@ use storagext::{ types::{ market::DealProposal, storage_provider::{ - PartitionState, PoStProof, ProveCommitSector, SectorPreCommitInfo, + PartitionState, PoStProof, ProveCommitSector, SubmitWindowedPoStParams, }, }, @@ -41,8 +41,6 @@ use types::{ use crate::db::{DBError, DealDB}; -// TODO(@th7nder,#622,02/12/2024): query it from the chain. -const SECTOR_EXPIRATION_MARGIN: u64 = 20; #[derive(Debug, thiserror::Error)] pub enum PipelineError { @@ -337,122 +335,23 @@ async fn precommit( ) -> Result<(), PipelineError> { tracing::info!("Starting pre-commit"); - let sealer = Sealer::new(state.server_info.seal_proof); - let Some(mut sector) = state.db.get_sector::(sector_number)? else { + let Some(sector) = state.db.get_sector::(sector_number)? else { tracing::error!("Tried to precommit non-existing sector"); return Err(PipelineError::SectorNotFound); }; - // Pad sector so CommD can be properly calculated. - sector.piece_infos = sealer.pad_sector(§or.piece_infos, sector.occupied_sector_space)?; - tracing::debug!("piece_infos: {:?}", sector.piece_infos); - - tracing::info!("Padded sector, commencing pre-commit and getting last finalized block"); - - let current_block = state.xt_client.height(true).await?; - tracing::info!("Current block: {current_block}"); - - let digest = state - .xt_client - .get_randomness(current_block) - .await? - .expect("randomness to be available as we wait for it"); - let entropy = state.xt_keypair.account_id().encode(); - // Must match pallet's logic or otherwise proof won't be verified: - // https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539 - let ticket = draw_randomness( - &digest, - DomainSeparationTag::SealRandomness, - current_block, - &entropy, - ); - - let cache_path = state.sealing_cache_dir.join(sector_number.to_string()); + let cache_dir_path = state.sealing_cache_dir.join(sector_number.to_string()); let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string()); - tokio::fs::create_dir_all(&cache_path).await?; - tokio::fs::File::create_new(&sealed_path).await?; - - // TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries? - let sealing_handle: JoinHandle> = { - let prover_id = derive_prover_id(state.xt_keypair.account_id()); - let cache_dir = cache_path.clone(); - let unsealed_path = sector.unsealed_path.clone(); - let sealed_path = sealed_path.clone(); - - let piece_infos = sector.piece_infos.clone(); - tokio::task::spawn_blocking(move || { - sealer.precommit_sector( - cache_dir, - unsealed_path, - sealed_path, - prover_id, - sector_number, - ticket, - &piece_infos, - ) - }) - }; - let sealing_output = sealing_handle.await??; - tracing::info!( - "Created sector's replica, CommD: {}, CommR: {}", - sealing_output.comm_d.cid(), - sealing_output.comm_r.cid() - ); - let sealing_output_commr = Commitment::::from(sealing_output.comm_r); - let sealing_output_commd = Commitment::::from(sealing_output.comm_d); + let sector = sector.pre_commit( + state.xt_client.clone(), + &state.xt_keypair, + cache_dir_path, + sealed_path + ).await?; - tracing::debug!("Precommiting at block: {}", current_block); - let result = state - .xt_client - .pre_commit_sectors( - &state.xt_keypair, - vec![SectorPreCommitInfo { - deal_ids: sector.deals.iter().map(|(id, _)| *id).collect(), - expiration: sector - .deals - .iter() - .map(|(_, deal)| deal.end_block) - .max() - .expect("always at least 1 deal in a sector") - + SECTOR_EXPIRATION_MARGIN, - sector_number: sector_number, - seal_proof: state.server_info.seal_proof, - sealed_cid: sealing_output_commr.cid(), - unsealed_cid: sealing_output_commd.cid(), - seal_randomness_height: current_block, - }], - true, - ) - .await? - .expect("we're waiting for the result"); - - let precommited_sectors = result - .events - .find::() - // `.find` returns subxt_core::Error which while it is convertible to subxt::Error as shown - // it can't be converted by a single ? on the collect, so the type system tries instead - // subxt_core::Error -> PipelineError - .map(|result| result.map_err(|err| subxt::Error::from(err))) - .collect::, _>>()?; - - let sector = PreCommittedSector::create( - sector, - cache_path, - sealed_path, - sealing_output_commr, - sealing_output_commd, - current_block, - precommited_sectors[0].block, - ) - .await?; state.db.save_sector(sector.sector_number, §or)?; - tracing::info!( - "Successfully pre-commited sectors on-chain: {:?}", - precommited_sectors - ); - state .pipeline_sender .send(PipelineMessage::ProveCommit(ProveCommitMessage { diff --git a/storage-provider/server/src/pipeline/types.rs b/storage-provider/server/src/pipeline/types.rs index 7b6c33399..00be46c6d 100644 --- a/storage-provider/server/src/pipeline/types.rs +++ b/storage-provider/server/src/pipeline/types.rs @@ -1,15 +1,13 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; -use polka_storage_proofs::porep::{sealer::{prepare_piece, Sealer}, PoRepError}; +use polka_storage_proofs::porep::{sealer::{prepare_piece, PreCommitOutput, Sealer}, PoRepError}; use primitives::{ - commitment::{piece::PieceInfo, CommD, CommP, CommR, Commitment}, - proofs::RegisteredSealProof, - sector::SectorNumber, - DealId, + commitment::{piece::PieceInfo, CommD, CommP, CommR, Commitment}, proofs::{derive_prover_id, RegisteredSealProof}, randomness::{draw_randomness, DomainSeparationTag}, sector::SectorNumber, DealId }; use serde::{Deserialize, Serialize}; -use storagext::types::market::DealProposal; +use storagext::{types::{market::DealProposal, storage_provider::SectorPreCommitInfo}, RandomnessClientExt, StorageProviderClientExt, SystemClientExt}; use tokio::task::{JoinError, JoinHandle}; +use subxt::{tx::Signer, ext::codec::Encode}; /// Represents a task to be executed on the Storage Provider Pipeline #[derive(Debug)] @@ -65,6 +63,8 @@ pub enum SectorError { Io(#[from] std::io::Error), #[error(transparent)] Join(#[from] JoinError), + #[error(transparent)] + Subxt(#[from] subxt::Error), } /// Unsealed Sector which still accepts deals and pieces. @@ -145,6 +145,10 @@ pub struct PreCommittedSector { pub precommit_block: u64, } + +// TODO(@th7nder,#622,02/12/2024): query it from the chain. +const SECTOR_EXPIRATION_MARGIN: u64 = 20; + impl UnsealedSector { /// Creates a new sector and empty file at the provided path. /// @@ -199,6 +203,122 @@ impl UnsealedSector { Ok(()) } + + pub async fn pre_commit(mut self, + xt_client: Arc, + xt_keypair: &storagext::multipair::MultiPairSigner, + cache_dir_path: PathBuf, + sealed_path: PathBuf, + ) -> Result { + let sealer: Sealer = Sealer::new(self.seal_proof); + + tokio::fs::create_dir_all(&cache_dir_path).await?; + tokio::fs::File::create_new(&sealed_path).await?; + + // Pad sector so CommD can be properly calculated. + self.piece_infos = sealer.pad_sector(&self.piece_infos, self.occupied_sector_space)?; + tracing::debug!("piece_infos: {:?}", self.piece_infos); + tracing::info!("Padded sector, commencing pre-commit and getting last finalized block"); + + let current_block = xt_client.height(true).await?; + tracing::info!("Current block: {current_block}"); + + let digest = xt_client + .get_randomness(current_block) + .await? + .expect("randomness to be available as we wait for it"); + + let entropy = xt_keypair.account_id().encode(); + // Must match pallet's logic or otherwise proof won't be verified: + // https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539 + let ticket = draw_randomness( + &digest, + DomainSeparationTag::SealRandomness, + current_block, + &entropy, + ); + + let sealing_handle: JoinHandle> = { + let prover_id = derive_prover_id(xt_keypair.account_id()); + let cache_dir = cache_dir_path.clone(); + let unsealed_path = self.unsealed_path.clone(); + let sealed_path = sealed_path.clone(); + let sector_number = self.sector_number; + + let piece_infos = self.piece_infos.clone(); + tokio::task::spawn_blocking(move || { + sealer.precommit_sector( + cache_dir, + unsealed_path, + sealed_path, + prover_id, + sector_number, + ticket, + &piece_infos, + ) + }) + }; + let sealing_output = sealing_handle.await??; + + tracing::info!( + "Created sector's replica, CommD: {}, CommR: {}", + sealing_output.comm_d.cid(), + sealing_output.comm_r.cid() + ); + + let sealing_output_commr = Commitment::::from(sealing_output.comm_r); + let sealing_output_commd = Commitment::::from(sealing_output.comm_d); + + + tracing::debug!("Precommiting at block: {}", current_block); + let result = xt_client + .pre_commit_sectors( + xt_keypair, + vec![SectorPreCommitInfo { + deal_ids: self.deals.iter().map(|(id, _)| *id).collect(), + expiration: self + .deals + .iter() + .map(|(_, deal)| deal.end_block) + .max() + .expect("always at least 1 deal in a sector") + + SECTOR_EXPIRATION_MARGIN, + sector_number: self.sector_number, + seal_proof: self.seal_proof, + sealed_cid: sealing_output_commr.cid(), + unsealed_cid: sealing_output_commd.cid(), + seal_randomness_height: current_block, + }], + true, + ) + .await? + .expect("we're waiting for the result"); + + let precommited_sectors = result + .events + .find::() + // `.find` returns subxt_core::Error which while it is convertible to subxt::Error as shown + // it can't be converted by a single ? on the collect, so the type system tries instead + // subxt_core::Error -> PipelineError + .map(|result| result.map_err(|err| subxt::Error::from(err))) + .collect::, _>>()?; + + tracing::info!( + "Successfully pre-commited sectors on-chain: {:?}", + precommited_sectors + ); + + Ok(PreCommittedSector::create( + self, + cache_dir_path, + sealed_path, + sealing_output_commr, + sealing_output_commd, + current_block, + precommited_sectors[0].block, + ) + .await?) + } } impl PreCommittedSector {