Skip to content

Commit

Permalink
refactor: precommit
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Jan 20, 2025
1 parent 9a772f1 commit 009bf53
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 118 deletions.
121 changes: 10 additions & 111 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use polka_storage_proofs::{
};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives::{
commitment::{CommD, CommP, CommR, Commitment},
commitment::{CommP, Commitment},
proofs::derive_prover_id,
randomness::{draw_randomness, DomainSeparationTag},
sector::SectorNumber,
Expand All @@ -20,7 +20,7 @@ use storagext::{
types::{
market::DealProposal,
storage_provider::{
PartitionState, PoStProof, ProveCommitSector, SectorPreCommitInfo,
PartitionState, PoStProof, ProveCommitSector,
SubmitWindowedPoStParams,
},
},
Expand All @@ -41,8 +41,6 @@ use types::{

use crate::db::{DBError, DealDB};

// TODO(@th7nder,#622,02/12/2024): query it from the chain.
const SECTOR_EXPIRATION_MARGIN: u64 = 20;

#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
Expand Down Expand Up @@ -337,122 +335,23 @@ async fn precommit(
) -> Result<(), PipelineError> {
tracing::info!("Starting pre-commit");

let sealer = Sealer::new(state.server_info.seal_proof);
let Some(mut sector) = state.db.get_sector::<UnsealedSector>(sector_number)? else {
let Some(sector) = state.db.get_sector::<UnsealedSector>(sector_number)? else {
tracing::error!("Tried to precommit non-existing sector");
return Err(PipelineError::SectorNotFound);
};
// Pad sector so CommD can be properly calculated.
sector.piece_infos = sealer.pad_sector(&sector.piece_infos, sector.occupied_sector_space)?;
tracing::debug!("piece_infos: {:?}", sector.piece_infos);

tracing::info!("Padded sector, commencing pre-commit and getting last finalized block");

let current_block = state.xt_client.height(true).await?;
tracing::info!("Current block: {current_block}");

let digest = state
.xt_client
.get_randomness(current_block)
.await?
.expect("randomness to be available as we wait for it");

let entropy = state.xt_keypair.account_id().encode();
// Must match pallet's logic or otherwise proof won't be verified:
// https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539
let ticket = draw_randomness(
&digest,
DomainSeparationTag::SealRandomness,
current_block,
&entropy,
);

let cache_path = state.sealing_cache_dir.join(sector_number.to_string());
let cache_dir_path = state.sealing_cache_dir.join(sector_number.to_string());
let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string());
tokio::fs::create_dir_all(&cache_path).await?;
tokio::fs::File::create_new(&sealed_path).await?;

// TODO(@th7nder,31/10/2024): what happens if some of the process fails? SP will be slashed, and there is no error reporting? what about retries?
let sealing_handle: JoinHandle<Result<PreCommitOutput, _>> = {
let prover_id = derive_prover_id(state.xt_keypair.account_id());
let cache_dir = cache_path.clone();
let unsealed_path = sector.unsealed_path.clone();
let sealed_path = sealed_path.clone();

let piece_infos = sector.piece_infos.clone();
tokio::task::spawn_blocking(move || {
sealer.precommit_sector(
cache_dir,
unsealed_path,
sealed_path,
prover_id,
sector_number,
ticket,
&piece_infos,
)
})
};
let sealing_output = sealing_handle.await??;
tracing::info!(
"Created sector's replica, CommD: {}, CommR: {}",
sealing_output.comm_d.cid(),
sealing_output.comm_r.cid()
);

let sealing_output_commr = Commitment::<CommR>::from(sealing_output.comm_r);
let sealing_output_commd = Commitment::<CommD>::from(sealing_output.comm_d);
let sector = sector.pre_commit(
state.xt_client.clone(),
&state.xt_keypair,
cache_dir_path,
sealed_path
).await?;

tracing::debug!("Precommiting at block: {}", current_block);
let result = state
.xt_client
.pre_commit_sectors(
&state.xt_keypair,
vec![SectorPreCommitInfo {
deal_ids: sector.deals.iter().map(|(id, _)| *id).collect(),
expiration: sector
.deals
.iter()
.map(|(_, deal)| deal.end_block)
.max()
.expect("always at least 1 deal in a sector")
+ SECTOR_EXPIRATION_MARGIN,
sector_number: sector_number,
seal_proof: state.server_info.seal_proof,
sealed_cid: sealing_output_commr.cid(),
unsealed_cid: sealing_output_commd.cid(),
seal_randomness_height: current_block,
}],
true,
)
.await?
.expect("we're waiting for the result");

let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
// `.find` returns subxt_core::Error which while it is convertible to subxt::Error as shown
// it can't be converted by a single ? on the collect, so the type system tries instead
// subxt_core::Error -> PipelineError
.map(|result| result.map_err(|err| subxt::Error::from(err)))
.collect::<Result<Vec<_>, _>>()?;

let sector = PreCommittedSector::create(
sector,
cache_path,
sealed_path,
sealing_output_commr,
sealing_output_commd,
current_block,
precommited_sectors[0].block,
)
.await?;
state.db.save_sector(sector.sector_number, &sector)?;

tracing::info!(
"Successfully pre-commited sectors on-chain: {:?}",
precommited_sectors
);

state
.pipeline_sender
.send(PipelineMessage::ProveCommit(ProveCommitMessage {
Expand Down
134 changes: 127 additions & 7 deletions storage-provider/server/src/pipeline/types.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};

use polka_storage_proofs::porep::{sealer::{prepare_piece, Sealer}, PoRepError};
use polka_storage_proofs::porep::{sealer::{prepare_piece, PreCommitOutput, Sealer}, PoRepError};
use primitives::{
commitment::{piece::PieceInfo, CommD, CommP, CommR, Commitment},
proofs::RegisteredSealProof,
sector::SectorNumber,
DealId,
commitment::{piece::PieceInfo, CommD, CommP, CommR, Commitment}, proofs::{derive_prover_id, RegisteredSealProof}, randomness::{draw_randomness, DomainSeparationTag}, sector::SectorNumber, DealId
};
use serde::{Deserialize, Serialize};
use storagext::types::market::DealProposal;
use storagext::{types::{market::DealProposal, storage_provider::SectorPreCommitInfo}, RandomnessClientExt, StorageProviderClientExt, SystemClientExt};
use tokio::task::{JoinError, JoinHandle};
use subxt::{tx::Signer, ext::codec::Encode};

/// Represents a task to be executed on the Storage Provider Pipeline
#[derive(Debug)]
Expand Down Expand Up @@ -65,6 +63,8 @@ pub enum SectorError {
Io(#[from] std::io::Error),
#[error(transparent)]
Join(#[from] JoinError),
#[error(transparent)]
Subxt(#[from] subxt::Error),
}

/// Unsealed Sector which still accepts deals and pieces.
Expand Down Expand Up @@ -145,6 +145,10 @@ pub struct PreCommittedSector {
pub precommit_block: u64,
}


// TODO(@th7nder,#622,02/12/2024): query it from the chain.
const SECTOR_EXPIRATION_MARGIN: u64 = 20;

impl UnsealedSector {
/// Creates a new sector and empty file at the provided path.
///
Expand Down Expand Up @@ -199,6 +203,122 @@ impl UnsealedSector {

Ok(())
}

pub async fn pre_commit(mut self,
xt_client: Arc<storagext::Client>,
xt_keypair: &storagext::multipair::MultiPairSigner,
cache_dir_path: PathBuf,
sealed_path: PathBuf,
) -> Result<PreCommittedSector, SectorError> {
let sealer: Sealer = Sealer::new(self.seal_proof);

tokio::fs::create_dir_all(&cache_dir_path).await?;
tokio::fs::File::create_new(&sealed_path).await?;

// Pad sector so CommD can be properly calculated.
self.piece_infos = sealer.pad_sector(&self.piece_infos, self.occupied_sector_space)?;
tracing::debug!("piece_infos: {:?}", self.piece_infos);
tracing::info!("Padded sector, commencing pre-commit and getting last finalized block");

let current_block = xt_client.height(true).await?;
tracing::info!("Current block: {current_block}");

let digest = xt_client
.get_randomness(current_block)
.await?
.expect("randomness to be available as we wait for it");

let entropy = xt_keypair.account_id().encode();
// Must match pallet's logic or otherwise proof won't be verified:
// https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539
let ticket = draw_randomness(
&digest,
DomainSeparationTag::SealRandomness,
current_block,
&entropy,
);

let sealing_handle: JoinHandle<Result<PreCommitOutput, _>> = {
let prover_id = derive_prover_id(xt_keypair.account_id());
let cache_dir = cache_dir_path.clone();
let unsealed_path = self.unsealed_path.clone();
let sealed_path = sealed_path.clone();
let sector_number = self.sector_number;

let piece_infos = self.piece_infos.clone();
tokio::task::spawn_blocking(move || {
sealer.precommit_sector(
cache_dir,
unsealed_path,
sealed_path,
prover_id,
sector_number,
ticket,
&piece_infos,
)
})
};
let sealing_output = sealing_handle.await??;

tracing::info!(
"Created sector's replica, CommD: {}, CommR: {}",
sealing_output.comm_d.cid(),
sealing_output.comm_r.cid()
);

let sealing_output_commr = Commitment::<CommR>::from(sealing_output.comm_r);
let sealing_output_commd = Commitment::<CommD>::from(sealing_output.comm_d);


tracing::debug!("Precommiting at block: {}", current_block);
let result = xt_client
.pre_commit_sectors(
xt_keypair,
vec![SectorPreCommitInfo {
deal_ids: self.deals.iter().map(|(id, _)| *id).collect(),
expiration: self
.deals
.iter()
.map(|(_, deal)| deal.end_block)
.max()
.expect("always at least 1 deal in a sector")
+ SECTOR_EXPIRATION_MARGIN,
sector_number: self.sector_number,
seal_proof: self.seal_proof,
sealed_cid: sealing_output_commr.cid(),
unsealed_cid: sealing_output_commd.cid(),
seal_randomness_height: current_block,
}],
true,
)
.await?
.expect("we're waiting for the result");

let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
// `.find` returns subxt_core::Error which while it is convertible to subxt::Error as shown
// it can't be converted by a single ? on the collect, so the type system tries instead
// subxt_core::Error -> PipelineError
.map(|result| result.map_err(|err| subxt::Error::from(err)))
.collect::<Result<Vec<_>, _>>()?;

tracing::info!(
"Successfully pre-commited sectors on-chain: {:?}",
precommited_sectors
);

Ok(PreCommittedSector::create(
self,
cache_dir_path,
sealed_path,
sealing_output_commr,
sealing_output_commd,
current_block,
precommited_sectors[0].block,
)
.await?)
}
}

impl PreCommittedSector {
Expand Down

0 comments on commit 009bf53

Please sign in to comment.