Skip to content

Commit

Permalink
Merge branch 'master' into fix-pruner-query
Browse files Browse the repository at this point in the history
  • Loading branch information
hal3e committed Jan 7, 2025
2 parents d8c71ca + 87cd95c commit 41cade0
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ concurrency:
env:
DASEL_VERSION: https://github.com/TomWright/dasel/releases/download/v1.24.3/dasel_linux_amd64
RUST_VERSION: 1.82
RUSTFLAGS: "-D warnings"
FUEL_CORE_VERSION: 0.40.0
IMAGE_NAME: ${{ github.repository }}
REPO_NAME: ${{ github.event.repository.name }}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions committer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,17 @@ async fn metrics(registry: web::Data<Arc<Registry>>) -> impl Responder {
std::result::Result::<_, InternalError<_>>::Ok(text)
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
enum HeightVariant {
Latest,
Specific,
}

#[derive(Deserialize)]
struct CostQueryParams {
from_height: u32,
variant: HeightVariant,
value: Option<u32>,
limit: Option<usize>,
}

Expand All @@ -103,8 +111,18 @@ async fn costs(
) -> impl Responder {
let limit = query.limit.unwrap_or(100);

match data.get_costs(query.from_height, limit).await {
Ok(bundle_costs) => HttpResponse::Ok().json(bundle_costs),
let response = match query.variant {
HeightVariant::Latest => data.get_latest_costs(limit).await,
HeightVariant::Specific => match query.value {
Some(height) => data.get_costs(height, limit).await,
None => Err(services::Error::Other(
"height value is required".to_string(),
)),
},
};

match response {
Ok(costs) => HttpResponse::Ok().json(costs),
Err(services::Error::Other(e)) => {
HttpResponse::from_error(InternalError::new(e, StatusCode::BAD_REQUEST))
}
Expand Down
1 change: 1 addition & 0 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ where
})
}

#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct App {
/// Port used by the started server
Expand Down
18 changes: 11 additions & 7 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,22 @@ async fn main() -> Result<()> {
finalization_metric,
);

let state_pruner_handle = setup::state_pruner(
storage.clone(),
cancel_token.clone(),
&metrics_registry,
&config,
);
// Enable pruner once the issue is resolved
//TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173
// let state_pruner_handle = setup::state_pruner(
// storage.clone(),
// cancel_token.clone(),
// &metrics_registry,
// &config,
// );

handles.push(state_committer_handle);
handles.push(state_importer_handle);
handles.push(block_bundler);
handles.push(state_listener_handle);
handles.push(state_pruner_handle);
// Enable pruner once the issue is resolved
//TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173
// handles.push(state_pruner_handle);
}

launch_api_server(
Expand Down
2 changes: 1 addition & 1 deletion committer/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub fn state_listener(
)
}

pub fn state_pruner(
pub fn _state_pruner(
storage: Database,
cancel_token: CancellationToken,
registry: &Registry,
Expand Down
2 changes: 1 addition & 1 deletion e2e/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl CommitterProcess {
limit: usize,
) -> anyhow::Result<Vec<BundleCost>> {
let response = reqwest::get(format!(
"http://localhost:{}/v1/costs?from_height={}&limit={}",
"http://localhost:{}/v1/costs?variant=specific&value={}&limit={}",
self.port, from_height, limit
))
.await?
Expand Down
3 changes: 3 additions & 0 deletions e2e/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ mod tests {
Ok(())
}

// Enable test once the issue is resolved
//TODO: https://github.com/FuelLabs/fuel-block-committer/issues/173
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn old_state_will_be_pruned() -> Result<()> {
use services::state_pruner::port::Storage;
Expand Down
28 changes: 21 additions & 7 deletions packages/adapters/eth/src/websocket/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ pub struct WsConnection {
metrics: Metrics,
}

const MAX_BLOB_FEE_HORIZON: u32 = 5;

impl WsConnection {
async fn get_next_blob_fee(&self, provider: &WsProvider) -> Result<u128> {
provider
async fn get_next_blob_fee(&self, horizon: u32) -> Result<u128> {
let mut next_block_blob_fee = self
.provider
.get_block_by_number(BlockNumberOrTag::Latest, false)
.await?
.ok_or(Error::Network(
Expand All @@ -93,15 +96,21 @@ impl WsConnection {
.next_block_blob_fee()
.ok_or(Error::Network(
"next_block_blob_fee returned None".to_string(),
))
))?;

for _ in 0..horizon {
// multiply by 1.125 = multiply by 9, then divide by 8
next_block_blob_fee = next_block_blob_fee.saturating_mul(9).saturating_div(8);
}
Ok(next_block_blob_fee)
}

async fn get_bumped_fees(
&self,
previous_tx: &L1Tx,
provider: &WsProvider,
) -> Result<(u128, u128, u128)> {
let next_blob_fee = self.get_next_blob_fee(provider).await?;
let next_blob_fee = self.get_next_blob_fee(MAX_BLOB_FEE_HORIZON).await?;
let max_fee_per_blob_gas = max(next_blob_fee, previous_tx.blob_fee.saturating_mul(2));

let Eip1559Estimation {
Expand Down Expand Up @@ -277,9 +286,14 @@ impl EthApi for WsConnection {
.with_blob_sidecar(sidecar)
.with_to(*blob_signer_address)
}
_ => TransactionRequest::default()
.with_blob_sidecar(sidecar)
.with_to(*blob_signer_address),
_ => {
let blob_fee = self.get_next_blob_fee(MAX_BLOB_FEE_HORIZON).await?;

TransactionRequest::default()
.with_blob_sidecar(sidecar)
.with_max_fee_per_blob_gas(blob_fee)
.with_to(*blob_signer_address)
}
};

let blob_tx = blob_provider.fill(blob_tx).await?;
Expand Down
44 changes: 44 additions & 0 deletions packages/adapters/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ impl services::state_listener::port::Storage for Postgres {
async fn has_pending_txs(&self) -> Result<bool> {
self._has_pending_txs().await.map_err(Into::into)
}

async fn earliest_submission_attempt(&self, nonce: u32) -> Result<Option<DateTime<Utc>>> {
self._earliest_submission_attempt(nonce)
.await
.map_err(Into::into)
}
}

impl services::cost_reporter::port::Storage for Postgres {
Expand All @@ -50,6 +56,10 @@ impl services::cost_reporter::port::Storage for Postgres {
.await
.map_err(Into::into)
}

async fn get_latest_costs(&self, limit: usize) -> Result<Vec<BundleCost>> {
self._get_latest_costs(limit).await.map_err(Into::into)
}
}

impl services::status_reporter::port::Storage for Postgres {
Expand Down Expand Up @@ -1163,4 +1173,38 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn get_latest_finalized_costs() -> Result<()> {
use services::cost_reporter::port::Storage;

// given
let storage = start_db().await;

for i in 0..5 {
let start_height = i * 10 + 1;
let end_height = start_height + 9;
let block_range = start_height..=end_height;

ensure_finalized_fragments_exist_in_the_db(
storage.clone(),
block_range,
1000u128,
5000u64,
)
.await;
}

// when
let finalized_costs = storage.get_latest_costs(1).await?;

// then
assert_eq!(finalized_costs.len(), 1);
let finalized_cost = &finalized_costs[0];

assert_eq!(finalized_cost.start_height, 41);
assert_eq!(finalized_cost.end_height, 50);

Ok(())
}
}
51 changes: 51 additions & 0 deletions packages/adapters/storage/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,27 @@ impl Postgres {
Ok(response)
}

pub(crate) async fn _earliest_submission_attempt(
&self,
nonce: u32,
) -> Result<Option<DateTime<Utc>>> {
let response = sqlx::query!(
r#"SELECT
MIN(l1_blob_transaction.created_at) AS earliest_tx_time
FROM
l1_blob_transaction
WHERE
l1_blob_transaction.nonce = $1;
"#,
nonce as i64
)
.fetch_optional(&self.connection_pool)
.await?
.and_then(|response| response.earliest_tx_time);

Ok(response)
}

pub(crate) async fn _lowest_unbundled_blocks(
&self,
starting_height: u32,
Expand Down Expand Up @@ -861,6 +882,36 @@ impl Postgres {
.collect::<Result<Vec<_>>>()
}

pub(crate) async fn _get_latest_costs(&self, limit: usize) -> Result<Vec<BundleCost>> {
sqlx::query_as!(
tables::BundleCost,
r#"
SELECT
bc.bundle_id,
bc.cost,
bc.size,
bc.da_block_height,
bc.is_finalized,
b.start_height,
b.end_height
FROM
bundle_cost bc
JOIN bundles b ON bc.bundle_id = b.id
WHERE
bc.is_finalized = TRUE
ORDER BY
b.start_height DESC
LIMIT $1
"#,
limit as i64
)
.fetch_all(&self.connection_pool)
.await?
.into_iter()
.map(BundleCost::try_from)
.collect::<Result<Vec<_>>>()
}

pub(crate) async fn _next_bundle_id(&self) -> Result<NonNegative<i32>> {
let next_id = sqlx::query!("SELECT nextval(pg_get_serial_sequence('bundles', 'id'))")
.fetch_one(&self.connection_pool)
Expand Down
Loading

0 comments on commit 41cade0

Please sign in to comment.