Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add chain id columns #3635

Open
wants to merge 5 commits into
base: afo/web-socket-gateway
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use strum::{Display, EnumString};

use crate::{
basic_fri_types::AggregationRound, protocol_version::ProtocolVersionId, L1BatchNumber,
L2ChainId,
};

#[derive(Debug, Clone, Copy)]
pub struct FriProverJobMetadata {
pub id: u32,
pub block_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u8,
pub aggregation_round: AggregationRound,
pub sequence_number: usize,
Expand Down Expand Up @@ -74,6 +76,7 @@ impl JobCountStatistics {
#[derive(Debug)]
pub struct StuckJobs {
pub id: u64,
pub chain_id: L2ChainId,
pub status: String,
pub attempts: u64,
pub circuit_id: Option<u32>,
Expand Down Expand Up @@ -107,6 +110,7 @@ impl From<std::net::SocketAddr> for SocketAddress {
pub struct LeafAggregationJobMetadata {
pub id: u32,
pub block_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u8,
pub prover_job_ids_for_proofs: Vec<u32>,
}
Expand All @@ -115,6 +119,7 @@ pub struct LeafAggregationJobMetadata {
pub struct NodeAggregationJobMetadata {
pub id: u32,
pub block_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u8,
pub depth: u16,
pub prover_job_ids_for_proofs: Vec<u32>,
Expand Down Expand Up @@ -212,6 +217,7 @@ pub enum WitnessJobStatus {
#[derive(Debug)]
pub struct WitnessJobInfo {
pub block_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub status: WitnessJobStatus,
Expand All @@ -222,6 +228,7 @@ pub struct WitnessJobInfo {
pub struct ProverJobInfo {
pub id: u32,
pub block_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_type: String,
pub position: JobPosition,
pub input_length: u64,
Expand Down Expand Up @@ -269,6 +276,7 @@ impl FromStr for GpuProverInstanceStatus {
pub struct ProverJobFriInfo {
pub id: u32,
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u32,
pub circuit_blob_url: String,
pub aggregation_round: AggregationRound,
Expand All @@ -295,6 +303,7 @@ pub trait Stallable {
#[derive(Debug, Clone)]
pub struct BasicWitnessGeneratorJobInfo {
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub witness_inputs_blob_url: Option<String>,
pub attempts: u32,
pub status: WitnessJobStatus,
Expand All @@ -321,6 +330,7 @@ impl Stallable for BasicWitnessGeneratorJobInfo {
pub struct LeafWitnessGeneratorJobInfo {
pub id: u32,
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u32,
pub closed_form_inputs_blob_url: Option<String>,
pub attempts: u32,
Expand Down Expand Up @@ -349,6 +359,7 @@ impl Stallable for LeafWitnessGeneratorJobInfo {
pub struct NodeWitnessGeneratorJobInfo {
pub id: u32,
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub circuit_id: u32,
pub depth: u32,
pub status: WitnessJobStatus,
Expand Down Expand Up @@ -377,6 +388,7 @@ impl Stallable for NodeWitnessGeneratorJobInfo {
#[derive(Debug, Clone)]
pub struct RecursionTipWitnessGeneratorJobInfo {
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub status: WitnessJobStatus,
pub attempts: u32,
pub processing_started_at: Option<NaiveDateTime>,
Expand All @@ -402,6 +414,7 @@ impl Stallable for RecursionTipWitnessGeneratorJobInfo {
#[derive(Debug, Clone)]
pub struct SchedulerWitnessGeneratorJobInfo {
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub scheduler_partial_input_blob_url: String,
pub status: WitnessJobStatus,
pub processing_started_at: Option<NaiveDateTime>,
Expand Down Expand Up @@ -443,6 +456,7 @@ pub enum ProofCompressionJobStatus {
#[derive(Debug, Clone)]
pub struct ProofCompressionJobInfo {
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub attempts: u32,
pub status: ProofCompressionJobStatus,
pub fri_proof_blob_url: Option<String>,
Expand All @@ -460,6 +474,7 @@ pub struct ProofCompressionJobInfo {
#[derive(Debug, Clone)]
pub struct ProofGenerationTime {
pub l1_batch_number: L1BatchNumber,
pub chain_id: L2ChainId,
pub time_taken: NaiveTime,
pub created_at: NaiveDateTime,
}
32 changes: 20 additions & 12 deletions prover/crates/bin/proof_fri_compressor/src/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use zksync_prover_interface::outputs::{
};
use zksync_prover_keystore::keystore::Keystore;
use zksync_queued_job_processor::JobProcessor;
use zksync_types::{protocol_version::ProtocolSemanticVersion, L1BatchNumber};
use zksync_types::{protocol_version::ProtocolSemanticVersion, L1BatchNumber, L2ChainId};

use crate::metrics::METRICS;

Expand Down Expand Up @@ -69,7 +69,7 @@ impl ProofCompressor {
#[async_trait]
impl JobProcessor for ProofCompressor {
type Job = ZkSyncRecursionLayerProof;
type JobId = L1BatchNumber;
type JobId = (L2ChainId, L1BatchNumber);

type JobArtifacts = SnarkWrapperProof;

Expand All @@ -78,7 +78,7 @@ impl JobProcessor for ProofCompressor {
async fn get_next_job(&self) -> anyhow::Result<Option<(Self::JobId, Self::Job)>> {
let mut conn = self.pool.connection().await.unwrap();
let pod_name = get_current_pod_name();
let Some(l1_batch_number) = conn
let Some((chain_id, l1_batch_number)) = conn
.fri_proof_compressor_dal()
.get_next_proof_compression_job(&pod_name, self.protocol_version)
.await
Expand All @@ -87,7 +87,7 @@ impl JobProcessor for ProofCompressor {
};
let Some(fri_proof_id) = conn
.fri_prover_jobs_dal()
.get_scheduler_proof_job_id(l1_batch_number)
.get_scheduler_proof_job_id(l1_batch_number, chain_id)
.await
else {
anyhow::bail!("Scheduler proof is missing from database for batch {l1_batch_number}");
Expand All @@ -99,15 +99,15 @@ impl JobProcessor for ProofCompressor {
let observer = METRICS.blob_fetch_time.start();

let fri_proof: FriProofWrapper = self.blob_store.get(fri_proof_id)
.await.with_context(|| format!("Failed to get fri proof from blob store for {l1_batch_number} with id {fri_proof_id}"))?;
.await.with_context(|| format!("Failed to get fri proof from blob store for batch {l1_batch_number}, chain {} with id {fri_proof_id}", chain_id.as_u64()))?;

observer.observe();

let scheduler_proof = match fri_proof {
FriProofWrapper::Base(_) => anyhow::bail!("Must be a scheduler proof not base layer"),
FriProofWrapper::Recursive(proof) => proof,
};
Ok(Some((l1_batch_number, scheduler_proof)))
Ok(Some(((chain_id, l1_batch_number), scheduler_proof)))
}

async fn save_failure(&self, job_id: Self::JobId, _started_at: Instant, error: String) {
Expand All @@ -116,13 +116,13 @@ impl JobProcessor for ProofCompressor {
.await
.unwrap()
.fri_proof_compressor_dal()
.mark_proof_compression_job_failed(&error, job_id)
.mark_proof_compression_job_failed(&error, job_id.1, job_id.0)
.await;
}

async fn process_job(
&self,
_job_id: &L1BatchNumber,
_job_id: &(L2ChainId, L1BatchNumber),
job: ZkSyncRecursionLayerProof,
_started_at: Instant,
) -> JoinHandle<anyhow::Result<Self::JobArtifacts>> {
Expand Down Expand Up @@ -150,7 +150,9 @@ impl JobProcessor for ProofCompressor {
) -> anyhow::Result<()> {
METRICS.compression_time.observe(started_at.elapsed());
tracing::info!(
"Finished fri proof compression for job: {job_id} took: {:?}",
"Finished fri proof compression for job with id {}, chain {} took: {:?}",
job_id.1,
job_id.0.as_u64(),
started_at.elapsed()
);

Expand Down Expand Up @@ -192,7 +194,12 @@ impl JobProcessor for ProofCompressor {
.await
.unwrap()
.fri_proof_compressor_dal()
.mark_proof_compression_job_successful(job_id, started_at.elapsed(), &blob_url)
.mark_proof_compression_job_successful(
job_id.1,
job_id.0,
started_at.elapsed(),
&blob_url,
)
.await;
Ok(())
}
Expand All @@ -201,15 +208,16 @@ impl JobProcessor for ProofCompressor {
self.max_attempts
}

async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result<u32> {
async fn get_job_attempts(&self, job_id: &(L2ChainId, L1BatchNumber)) -> anyhow::Result<u32> {
let mut prover_storage = self
.pool
.connection()
.await
.context("failed to acquire DB connection for ProofCompressor")?;
let (chain_id, l1_batch_number) = *job_id;
prover_storage
.fri_proof_compressor_dal()
.get_proof_compression_job_attempts(*job_id)
.get_proof_compression_job_attempts(l1_batch_number, chain_id)
.await
.map(|attempts| attempts.unwrap_or(0))
.context("failed to get job attempts for ProofCompressor")
Expand Down
3 changes: 3 additions & 0 deletions prover/crates/bin/witness_generator/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Instant};
use async_trait::async_trait;
use zksync_object_store::ObjectStore;
use zksync_prover_dal::{ConnectionPool, Prover};
use zksync_types::L2ChainId;

#[derive(Debug)]
pub struct AggregationBlobUrls {
Expand All @@ -24,6 +25,7 @@ pub trait ArtifactsManager {

async fn save_to_bucket(
job_id: u32,
chain_id: L2ChainId,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
shall_save_to_public_bucket: bool,
Expand All @@ -33,6 +35,7 @@ pub trait ArtifactsManager {
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
chain_id: L2ChainId,
started_at: Instant,
blob_urls: Self::BlobUrls,
artifacts: Self::OutputArtifacts,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{sync::Arc, time::Instant};
use std::{iter::chain, sync::Arc, time::Instant};

use async_trait::async_trait;
use zksync_object_store::ObjectStore;
use zksync_prover_dal::{ConnectionPool, Prover, ProverDal};
use zksync_prover_fri_types::AuxOutputWitnessWrapper;
use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer;
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber, L2ChainId};

use crate::{
artifacts::ArtifactsManager,
Expand All @@ -18,7 +18,7 @@ use crate::{

#[async_trait]
impl ArtifactsManager for BasicCircuits {
type InputMetadata = L1BatchNumber;
type InputMetadata = (L2ChainId, L1BatchNumber);
type InputArtifacts = BasicWitnessGeneratorJob;
type OutputArtifacts = BasicCircuitArtifacts;
type BlobUrls = String;
Expand All @@ -27,16 +27,18 @@ impl ArtifactsManager for BasicCircuits {
metadata: &Self::InputMetadata,
object_store: &dyn ObjectStore,
) -> anyhow::Result<Self::InputArtifacts> {
let l1_batch_number = *metadata;
let data = object_store.get(l1_batch_number).await.unwrap();
let (chain_id, l1_batch_number) = *metadata;
let data = object_store.get((chain_id, l1_batch_number)).await.unwrap();
Ok(BasicWitnessGeneratorJob {
chain_id,
block_number: l1_batch_number,
data,
})
}

async fn save_to_bucket(
job_id: u32,
chain_id: L2ChainId,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
shall_save_to_public_bucket: bool,
Expand All @@ -47,18 +49,21 @@ impl ArtifactsManager for BasicCircuits {
if shall_save_to_public_bucket {
public_blob_store.as_deref()
.expect("public_object_store shall not be empty while running with shall_save_to_public_bucket config")
.put(L1BatchNumber(job_id), &aux_output_witness_wrapper)
.put((chain_id, L1BatchNumber(job_id)), &aux_output_witness_wrapper)
.await
.unwrap();
}

object_store
.put(L1BatchNumber(job_id), &aux_output_witness_wrapper)
.put(
(chain_id, L1BatchNumber(job_id)),
&aux_output_witness_wrapper,
)
.await
.unwrap();
let wrapper = SchedulerPartialInputWrapper(artifacts.scheduler_witness);
object_store
.put(L1BatchNumber(job_id), &wrapper)
.put((chain_id, L1BatchNumber(job_id)), &wrapper)
.await
.unwrap()
}
Expand All @@ -67,6 +72,7 @@ impl ArtifactsManager for BasicCircuits {
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
chain_id: L2ChainId,
started_at: Instant,
blob_urls: String,
artifacts: Self::OutputArtifacts,
Expand All @@ -81,12 +87,13 @@ impl ArtifactsManager for BasicCircuits {
.expect("failed to get database transaction");
let protocol_version_id = transaction
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(L1BatchNumber(job_id))
.protocol_version_for_l1_batch_and_chain(L1BatchNumber(job_id), chain_id)
.await;
transaction
.fri_prover_jobs_dal()
.insert_prover_jobs(
L1BatchNumber(job_id),
chain_id,
artifacts.circuit_urls,
AggregationRound::BasicCircuits,
0,
Expand All @@ -97,6 +104,7 @@ impl ArtifactsManager for BasicCircuits {
create_aggregation_jobs(
&mut transaction,
L1BatchNumber(job_id),
chain_id,
&artifacts.queue_urls,
&blob_urls,
get_recursive_layer_circuit_id_for_base_layer,
Expand All @@ -107,7 +115,7 @@ impl ArtifactsManager for BasicCircuits {

transaction
.fri_basic_witness_generator_dal()
.mark_witness_job_as_successful(L1BatchNumber(job_id), started_at.elapsed())
.mark_witness_job_as_successful(L1BatchNumber(job_id), chain_id, started_at.elapsed())
.await;
transaction
.commit()
Expand Down
Loading
Loading