Skip to content

Commit

Permalink
refactor: prove commit
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Jan 20, 2025
1 parent 009bf53 commit d90d0e5
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 150 deletions.
163 changes: 21 additions & 142 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::BTreeSet, path::PathBuf, sync::Arc};

use polka_storage_proofs::{
porep::{
sealer::{BlstrsProof, PreCommitOutput, Sealer, SubstrateProof},
sealer::{BlstrsProof, SubstrateProof},
PoRepError, PoRepParameters,
},
post::{self, PoStError, PoStParameters, ReplicaInfo},
Expand All @@ -19,10 +19,7 @@ use primitives::{
use storagext::{
types::{
market::DealProposal,
storage_provider::{
PartitionState, PoStProof, ProveCommitSector,
SubmitWindowedPoStParams,
},
storage_provider::{PartitionState, PoStProof, SubmitWindowedPoStParams},
},
RandomnessClientExt, StorageProviderClientExt, SystemClientExt,
};
Expand All @@ -36,12 +33,12 @@ use tokio::{
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use types::{
AddPieceMessage, PipelineMessage, PreCommitMessage, PreCommittedSector, ProveCommitMessage, ProvenSector, SectorError, SubmitWindowedPoStMessage, UnsealedSector
AddPieceMessage, PipelineMessage, PreCommitMessage, PreCommittedSector, ProveCommitMessage,
ProvenSector, SectorError, SubmitWindowedPoStMessage, UnsealedSector,
};

use crate::db::{DBError, DealDB};


#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
#[error(transparent)]
Expand Down Expand Up @@ -70,8 +67,6 @@ pub enum PipelineError {
SendError(#[from] SendError<PipelineMessage>),
#[error("failed to schedule windowed PoSt")]
SchedulingError,
#[error("Proving cancelled")]
ProvingCancelled,
#[error("Custom error: {0}")]
CustomError(String),
}
Expand Down Expand Up @@ -284,7 +279,8 @@ async fn find_sector_for_piece(
.next_sector_number()
.map_err(|err| PipelineError::CustomError(err.to_string()))?;
let unsealed_path = state.unsealed_sectors_dir.join(sector_number.to_string());
let sector = UnsealedSector::create(state.server_info.seal_proof, sector_number, unsealed_path).await?;
let sector =
UnsealedSector::create(state.server_info.seal_proof, sector_number, unsealed_path).await?;

Ok(sector)
}
Expand Down Expand Up @@ -343,12 +339,14 @@ async fn precommit(
let cache_dir_path = state.sealing_cache_dir.join(sector_number.to_string());
let sealed_path = state.sealed_sectors_dir.join(sector_number.to_string());

let sector = sector.pre_commit(
state.xt_client.clone(),
&state.xt_keypair,
cache_dir_path,
sealed_path
).await?;
let sector = sector
.pre_commit(
state.xt_client.clone(),
&state.xt_keypair,
cache_dir_path,
sealed_path,
)
.await?;

state.db.save_sector(sector.sector_number, &sector)?;

Expand All @@ -368,140 +366,21 @@ async fn prove_commit(
token: CancellationToken,
) -> Result<(), PipelineError> {
tracing::info!("Starting prove commit");

let sealer = Sealer::new(state.server_info.seal_proof);
let Some(sector) = state.db.get_sector::<PreCommittedSector>(sector_number)? else {
tracing::error!("Tried to precommit non-existing sector");
return Err(PipelineError::SectorNotFound);
};

let seal_randomness_height = sector.seal_randomness_height;
let Some(digest) = state
.xt_client
.get_randomness(seal_randomness_height)
.await?
else {
tracing::error!("Out-of-the-state transition, this SHOULD NOT happen");
return Err(PipelineError::RandomnessNotAvailable);
};
let entropy = state.xt_keypair.account_id().encode();
// Must match pallet's logic or otherwise proof won't be verified:
// https://github.com/eigerco/polka-storage/blob/af51a9b121c9b02e0bf6f02f5e835091ab46af76/pallets/storage-provider/src/lib.rs#L1539
let ticket = draw_randomness(
&digest,
DomainSeparationTag::SealRandomness,
seal_randomness_height,
&entropy,
);

// TODO(@th7nder,04/11/2024):
// https://github.com/eigerco/polka-storage/blob/5edd4194f08f29d769c277577ccbb70bb6ff63bc/runtime/src/configs/mod.rs#L360
// 10 blocks = 1 minute, only testnet
const PRECOMMIT_CHALLENGE_DELAY: u64 = 10;
let prove_commit_block = sector.precommit_block + PRECOMMIT_CHALLENGE_DELAY;

tracing::info!("Wait for block {} to get randomness", prove_commit_block);
tokio::select! {
res = state.xt_client.wait_for_height(prove_commit_block, true) => {
res?;
},
() = token.cancelled() => {
tracing::warn!("Cancelled while waiting to get randomness at block {}", prove_commit_block);
return Err(PipelineError::ProvingCancelled);
}
};

let Some(digest) = state.xt_client.get_randomness(prove_commit_block).await? else {
tracing::error!("Randomness for the block not available.");
return Err(PipelineError::RandomnessNotAvailable);
};
let seed = draw_randomness(
&digest,
DomainSeparationTag::InteractiveSealChallengeSeed,
prove_commit_block,
&entropy,
);

let prover_id = derive_prover_id(state.xt_keypair.account_id());
tracing::debug!("Performing prove commit for, seal_randomness_height {}, pre_commit_block: {}, prove_commit_block: {}, entropy: {}, ticket: {}, seed: {}, prover id: {}, sector_number: {}",
seal_randomness_height, sector.precommit_block, prove_commit_block, hex::encode(entropy), hex::encode(ticket), hex::encode(seed), hex::encode(prover_id), sector_number);

tracing::debug!("Acquiring sempahore...");
let proofs = {
let _permit = state
.prove_commit_throttle
.acquire()
.await
.expect("semaphore to not be closed");
tracing::debug!("Acquired sempahore.");

let sealing_handle: JoinHandle<Result<Vec<BlstrsProof>, _>> = {
let porep_params = state.porep_parameters.clone();
let cache_dir = sector.cache_path.clone();
let sealed_path = sector.sealed_path.clone();
let piece_infos = sector.piece_infos.clone();

tokio::task::spawn_blocking(move || {
sealer.prove_sector(
porep_params.as_ref(),
cache_dir,
sealed_path,
prover_id,
sector_number,
ticket,
Some(seed),
PreCommitOutput {
comm_r: sector.comm_r,
comm_d: sector.comm_d,
},
&piece_infos,
)
})
};

tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
}
}
};

// We use sector size 2KiB only at this point, which guarantees to have 1 proof, because it has 1 partition in the config.
// That's why `prove_commit` will always generate a 1 proof.
let proof: SubstrateProof = proofs[0]
.clone()
.try_into()
.expect("converstion between rust-fil-proofs and polka-storage-proofs to work");
let proof = codec::Encode::encode(&proof);
tracing::info!("Proven sector: {}", sector_number);

let result = state
.xt_client
.prove_commit_sectors(
let sector = sector
.prove_commit(
state.xt_client.clone(),
&state.xt_keypair,
vec![ProveCommitSector {
sector_number,
proof,
}],
true,
state.porep_parameters.clone(),
state.prove_commit_throttle.clone(),
token,
)
.await?
.expect("waiting for finalization should always give results");

let proven_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsProven>()
.map(|result| result.map_err(|err| subxt::Error::from(err)))
.collect::<Result<Vec<_>, _>>()?;

tracing::info!("Successfully proven sectors on-chain: {:?}", proven_sectors);
.await?;

let sector = ProvenSector::create(sector);
state.db.save_sector(sector.sector_number, &sector)?;

Ok(())
Expand Down
Loading

0 comments on commit d90d0e5

Please sign in to comment.