diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a588f38b..d4d648e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,6 +15,7 @@ concurrency: env: DASEL_VERSION: https://github.com/TomWright/dasel/releases/download/v1.24.3/dasel_linux_amd64 RUST_VERSION: 1.82 + RUSTFLAGS: "-D warnings" FUEL_CORE_VERSION: 0.40.0 IMAGE_NAME: ${{ github.repository }} REPO_NAME: ${{ github.event.repository.name }} diff --git a/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json b/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json new file mode 100644 index 00000000..abd5fc53 --- /dev/null +++ b/.sqlx/query-39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n bc.bundle_id,\n bc.cost,\n bc.size,\n bc.da_block_height,\n bc.is_finalized,\n b.start_height,\n b.end_height\n FROM\n bundle_cost bc\n JOIN bundles b ON bc.bundle_id = b.id\n WHERE\n bc.is_finalized = TRUE\n ORDER BY\n b.start_height DESC\n LIMIT $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "cost", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "size", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "da_block_height", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "is_finalized", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "start_height", + "type_info": "Int8" + }, + { + "ordinal": 6, + "name": "end_height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "39d3fae6fdd67a2324fae4d5e828f69f2298cd5b0f7eb1609ed189269c6f677c" +} diff --git a/.sqlx/query-851e5744c6d1c35341d1314e9485b4bdd6bf19af580170da626ab9740a4f4c02.json b/.sqlx/query-851e5744c6d1c35341d1314e9485b4bdd6bf19af580170da626ab9740a4f4c02.json new file mode 100644 index 00000000..941169a4 --- /dev/null +++ b/.sqlx/query-851e5744c6d1c35341d1314e9485b4bdd6bf19af580170da626ab9740a4f4c02.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n MIN(l1_blob_transaction.created_at) AS earliest_tx_time\n FROM\n l1_blob_transaction\n WHERE\n l1_blob_transaction.nonce = $1;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "earliest_tx_time", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "851e5744c6d1c35341d1314e9485b4bdd6bf19af580170da626ab9740a4f4c02" +} diff --git a/committer/src/api.rs b/committer/src/api.rs index cd97a59c..721a7742 100644 --- a/committer/src/api.rs +++ b/committer/src/api.rs @@ -90,9 +90,17 @@ async fn metrics(registry: web::Data>) -> impl Responder { std::result::Result::<_, InternalError<_>>::Ok(text) } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "lowercase")] +enum HeightVariant { + Latest, + Specific, +} + #[derive(Deserialize)] struct CostQueryParams { - from_height: u32, + variant: HeightVariant, + value: Option, limit: Option, } @@ -103,8 +111,18 @@ async fn costs( ) -> impl Responder { let limit = query.limit.unwrap_or(100); - match data.get_costs(query.from_height, limit).await { - Ok(bundle_costs) => HttpResponse::Ok().json(bundle_costs), + let response = match query.variant { + HeightVariant::Latest => data.get_latest_costs(limit).await, + HeightVariant::Specific => match query.value { + Some(height) => data.get_costs(height, limit).await, + None => Err(services::Error::Other( + "height value is required".to_string(), + )), + }, + }; + + match response { + Ok(costs) => HttpResponse::Ok().json(costs), Err(services::Error::Other(e)) => { HttpResponse::from_error(InternalError::new(e, StatusCode::BAD_REQUEST)) } diff --git a/committer/src/config.rs b/committer/src/config.rs index fbc6cf6b..54de1294 100644 --- a/committer/src/config.rs +++ b/committer/src/config.rs @@ -79,6 +79,7 @@ where }) } +#[allow(dead_code)] #[derive(Debug, Clone, Deserialize)] pub struct App { /// Port used by the started server diff --git a/committer/src/main.rs b/committer/src/main.rs index 0473d0c7..9b9bf2d6 100644 --- a/committer/src/main.rs +++ b/committer/src/main.rs @@ -92,18 +92,22 @@ async fn main() -> Result<()> { finalization_metric, ); - let state_pruner_handle = setup::state_pruner( - storage.clone(), - cancel_token.clone(), - &metrics_registry, - &config, - ); + // Enable pruner once the issue is resolved + //TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173 + // let state_pruner_handle = setup::state_pruner( + // storage.clone(), + // cancel_token.clone(), + // &metrics_registry, + // &config, + // ); handles.push(state_committer_handle); handles.push(state_importer_handle); handles.push(block_bundler); handles.push(state_listener_handle); - handles.push(state_pruner_handle); + // Enable pruner once the issue is resolved + //TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173 + // handles.push(state_pruner_handle); } launch_api_server( diff --git a/committer/src/setup.rs b/committer/src/setup.rs index 3101e8fe..db771797 100644 --- a/committer/src/setup.rs +++ b/committer/src/setup.rs @@ -194,7 +194,7 @@ pub fn state_listener( ) } -pub fn state_pruner( +pub fn _state_pruner( storage: Database, cancel_token: CancellationToken, registry: &Registry, diff --git a/e2e/src/committer.rs b/e2e/src/committer.rs index ada77606..f2323a83 100644 --- a/e2e/src/committer.rs +++ b/e2e/src/committer.rs @@ -295,7 +295,7 @@ impl CommitterProcess { limit: usize, ) -> anyhow::Result> { let response = reqwest::get(format!( - "http://localhost:{}/v1/costs?from_height={}&limit={}", + "http://localhost:{}/v1/costs?variant=specific&value={}&limit={}", self.port, from_height, limit )) .await? diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs index 2394a1e7..94ee8bd9 100644 --- a/e2e/src/lib.rs +++ b/e2e/src/lib.rs @@ -84,6 +84,9 @@ mod tests { Ok(()) } + // Enable test once the issue is resolved + //TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173 + #[ignore] #[tokio::test(flavor = "multi_thread")] async fn old_state_will_be_pruned() -> Result<()> { use services::state_pruner::port::Storage; diff --git a/packages/adapters/eth/src/websocket/connection.rs b/packages/adapters/eth/src/websocket/connection.rs index 8a718b9f..3461b6f6 100644 --- a/packages/adapters/eth/src/websocket/connection.rs +++ b/packages/adapters/eth/src/websocket/connection.rs @@ -81,9 +81,12 @@ pub struct WsConnection { metrics: Metrics, } +const MAX_BLOB_FEE_HORIZON: u32 = 5; + impl WsConnection { - async fn get_next_blob_fee(&self, provider: &WsProvider) -> Result { - provider + async fn get_next_blob_fee(&self, horizon: u32) -> Result { + let mut next_block_blob_fee = self + .provider .get_block_by_number(BlockNumberOrTag::Latest, false) .await? .ok_or(Error::Network( @@ -93,7 +96,13 @@ impl WsConnection { .next_block_blob_fee() .ok_or(Error::Network( "next_block_blob_fee returned None".to_string(), - )) + ))?; + + for _ in 0..horizon { + // multiply by 1.125 = multiply by 9, then divide by 8 + next_block_blob_fee = next_block_blob_fee.saturating_mul(9).saturating_div(8); + } + Ok(next_block_blob_fee) } async fn get_bumped_fees( @@ -101,7 +110,7 @@ impl WsConnection { previous_tx: &L1Tx, provider: &WsProvider, ) -> Result<(u128, u128, u128)> { - let next_blob_fee = self.get_next_blob_fee(provider).await?; + let next_blob_fee = self.get_next_blob_fee(MAX_BLOB_FEE_HORIZON).await?; let max_fee_per_blob_gas = max(next_blob_fee, previous_tx.blob_fee.saturating_mul(2)); let Eip1559Estimation { @@ -277,9 +286,14 @@ impl EthApi for WsConnection { .with_blob_sidecar(sidecar) .with_to(*blob_signer_address) } - _ => TransactionRequest::default() - .with_blob_sidecar(sidecar) - .with_to(*blob_signer_address), + _ => { + let blob_fee = self.get_next_blob_fee(MAX_BLOB_FEE_HORIZON).await?; + + TransactionRequest::default() + .with_blob_sidecar(sidecar) + .with_max_fee_per_blob_gas(blob_fee) + .with_to(*blob_signer_address) + } }; let blob_tx = blob_provider.fill(blob_tx).await?; diff --git a/packages/adapters/storage/src/lib.rs b/packages/adapters/storage/src/lib.rs index 8ea34bbc..e523e82d 100644 --- a/packages/adapters/storage/src/lib.rs +++ b/packages/adapters/storage/src/lib.rs @@ -38,6 +38,12 @@ impl services::state_listener::port::Storage for Postgres { async fn has_pending_txs(&self) -> Result { self._has_pending_txs().await.map_err(Into::into) } + + async fn earliest_submission_attempt(&self, nonce: u32) -> Result>> { + self._earliest_submission_attempt(nonce) + .await + .map_err(Into::into) + } } impl services::cost_reporter::port::Storage for Postgres { @@ -50,6 +56,10 @@ impl services::cost_reporter::port::Storage for Postgres { .await .map_err(Into::into) } + + async fn get_latest_costs(&self, limit: usize) -> Result> { + self._get_latest_costs(limit).await.map_err(Into::into) + } } impl services::status_reporter::port::Storage for Postgres { @@ -1163,4 +1173,38 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn get_latest_finalized_costs() -> Result<()> { + use services::cost_reporter::port::Storage; + + // given + let storage = start_db().await; + + for i in 0..5 { + let start_height = i * 10 + 1; + let end_height = start_height + 9; + let block_range = start_height..=end_height; + + ensure_finalized_fragments_exist_in_the_db( + storage.clone(), + block_range, + 1000u128, + 5000u64, + ) + .await; + } + + // when + let finalized_costs = storage.get_latest_costs(1).await?; + + // then + assert_eq!(finalized_costs.len(), 1); + let finalized_cost = &finalized_costs[0]; + + assert_eq!(finalized_cost.start_height, 41); + assert_eq!(finalized_cost.end_height, 50); + + Ok(()) + } } diff --git a/packages/adapters/storage/src/postgres.rs b/packages/adapters/storage/src/postgres.rs index af6b9dad..8b705571 100644 --- a/packages/adapters/storage/src/postgres.rs +++ b/packages/adapters/storage/src/postgres.rs @@ -448,6 +448,27 @@ impl Postgres { Ok(response) } + pub(crate) async fn _earliest_submission_attempt( + &self, + nonce: u32, + ) -> Result>> { + let response = sqlx::query!( + r#"SELECT + MIN(l1_blob_transaction.created_at) AS earliest_tx_time + FROM + l1_blob_transaction + WHERE + l1_blob_transaction.nonce = $1; + "#, + nonce as i64 + ) + .fetch_optional(&self.connection_pool) + .await? + .and_then(|response| response.earliest_tx_time); + + Ok(response) + } + pub(crate) async fn _lowest_unbundled_blocks( &self, starting_height: u32, @@ -861,6 +882,36 @@ impl Postgres { .collect::>>() } + pub(crate) async fn _get_latest_costs(&self, limit: usize) -> Result> { + sqlx::query_as!( + tables::BundleCost, + r#" + SELECT + bc.bundle_id, + bc.cost, + bc.size, + bc.da_block_height, + bc.is_finalized, + b.start_height, + b.end_height + FROM + bundle_cost bc + JOIN bundles b ON bc.bundle_id = b.id + WHERE + bc.is_finalized = TRUE + ORDER BY + b.start_height DESC + LIMIT $1 + "#, + limit as i64 + ) + .fetch_all(&self.connection_pool) + .await? + .into_iter() + .map(BundleCost::try_from) + .collect::>>() + } + pub(crate) async fn _next_bundle_id(&self) -> Result> { let next_id = sqlx::query!("SELECT nextval(pg_get_serial_sequence('bundles', 'id'))") .fetch_one(&self.connection_pool) diff --git a/packages/adapters/storage/src/test_instance.rs b/packages/adapters/storage/src/test_instance.rs index b4baa4ea..4056593e 100644 --- a/packages/adapters/storage/src/test_instance.rs +++ b/packages/adapters/storage/src/test_instance.rs @@ -1,9 +1,3 @@ -use std::{ - borrow::Cow, - ops::RangeInclusive, - sync::{Arc, Weak}, -}; - use delegate::delegate; use services::{ block_bundler, block_committer, block_importer, @@ -14,6 +8,11 @@ use services::{ }, }; use sqlx::Executor; +use std::{ + borrow::Cow, + ops::RangeInclusive, + sync::{Arc, Weak}, +}; use testcontainers::{ core::{ContainerPort, WaitFor}, runners::AsyncRunner, @@ -201,6 +200,16 @@ impl services::state_listener::port::Storage for DbWithProcess { async fn has_pending_txs(&self) -> services::Result { self.db._has_pending_txs().await.map_err(Into::into) } + + async fn earliest_submission_attempt( + &self, + nonce: u32, + ) -> services::Result>> { + self.db + ._earliest_submission_attempt(nonce) + .await + .map_err(Into::into) + } } impl block_importer::port::Storage for DbWithProcess { @@ -351,4 +360,8 @@ impl services::cost_reporter::port::Storage for DbWithProcess { .await .map_err(Into::into) } + + async fn get_latest_costs(&self, limit: usize) -> services::Result> { + self.db._get_latest_costs(limit).await.map_err(Into::into) + } } diff --git a/packages/services/src/cost_reporter.rs b/packages/services/src/cost_reporter.rs index b564c879..600b699e 100644 --- a/packages/services/src/cost_reporter.rs +++ b/packages/services/src/cost_reporter.rs @@ -36,6 +36,17 @@ pub mod service { .get_finalized_costs(from_block_height, limit) .await } + + pub async fn get_latest_costs(&self, limit: usize) -> Result> { + if limit > self.request_limit { + return Err(Error::Other(format!( + "requested: {} items, but limit is: {}", + limit, self.request_limit + ))); + } + + self.storage.get_latest_costs(limit).await + } } } @@ -50,5 +61,7 @@ pub mod port { from_block_height: u32, limit: usize, ) -> Result>; + + async fn get_latest_costs(&self, limit: usize) -> Result>; } } diff --git a/packages/services/src/state_listener.rs b/packages/services/src/state_listener.rs index 86a16cbf..b7aa6ae4 100644 --- a/packages/services/src/state_listener.rs +++ b/packages/services/src/state_listener.rs @@ -135,6 +135,17 @@ pub mod service { info!("blob tx {} finalized", hex::encode(tx.hash)); + let earliest_submission_attempt = + self.storage.earliest_submission_attempt(tx.nonce).await?; + + self.metrics.last_finalization_interval.set( + earliest_submission_attempt + .map(|earliest_submission_attempt| { + (now - earliest_submission_attempt).num_seconds() + }) + .unwrap_or(0), + ); + self.metrics .last_eth_block_w_blob .set(i64::try_from(tx_response.block_number()).unwrap_or(i64::MAX)) @@ -177,6 +188,7 @@ pub mod service { struct Metrics { last_eth_block_w_blob: IntGauge, last_finalization_time: IntGauge, + last_finalization_interval: IntGauge, } impl RegistersMetrics for StateListener { @@ -184,6 +196,7 @@ pub mod service { vec![ Box::new(self.metrics.last_eth_block_w_blob.clone()), Box::new(self.metrics.last_finalization_time.clone()), + Box::new(self.metrics.last_finalization_interval.clone()), ] } } @@ -196,9 +209,18 @@ pub mod service { )) .expect("last_eth_block_w_blob metric to be correctly configured"); + let last_finalization_interval = IntGauge::new( + "seconds_from_earliest_submission_to_finalization", + "The number of seconds from the earliest submission to finalization", + ) + .expect( + "seconds_from_earliest_submission_to_finalization gauge to be correctly configured", + ); + Self { last_eth_block_w_blob, last_finalization_time, + last_finalization_interval, } } } @@ -240,6 +262,7 @@ pub mod port { cost_per_tx: Vec, ) -> Result<()>; async fn has_pending_txs(&self) -> Result; + async fn earliest_submission_attempt(&self, nonce: u32) -> Result>>; } pub trait Clock {