diff --git a/.sqlx/query-14a630268f676e961c35d4804724aa35ac1d5ae9763f9383506cd05b42bb0213.json b/.sqlx/query-14a630268f676e961c35d4804724aa35ac1d5ae9763f9383506cd05b42bb0213.json new file mode 100644 index 00000000..79c91f94 --- /dev/null +++ b/.sqlx/query-14a630268f676e961c35d4804724aa35ac1d5ae9763f9383506cd05b42bb0213.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n f.bundle_id,\n SUM(f.total_bytes)::BIGINT AS total_bytes,\n SUM(f.unused_bytes)::BIGINT AS unused_bytes,\n COUNT(*)::BIGINT AS fragment_count\n FROM l1_blob_transaction t\n JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id\n JOIN l1_fragments f ON tf.fragment_id = f.id\n WHERE t.hash = $1\n GROUP BY f.bundle_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "total_bytes", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "unused_bytes", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "fragment_count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false, + null, + null, + null + ] + }, + "hash": "14a630268f676e961c35d4804724aa35ac1d5ae9763f9383506cd05b42bb0213" +} diff --git a/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json b/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json deleted file mode 100644 index 20e6491f..00000000 --- a/.sqlx/query-417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n f.bundle_id,\n SUM(f.total_bytes)::BIGINT AS total_bytes,\n SUM(f.unused_bytes)::BIGINT AS unused_bytes\n FROM\n l1_blob_transaction t\n JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id\n JOIN l1_fragments f ON tf.fragment_id = f.id\n WHERE\n t.hash = $1\n GROUP BY\n f.bundle_id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "bundle_id", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "total_bytes", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "unused_bytes", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Bytea" - ] - }, - "nullable": [ - false, - null, - null - ] - }, - "hash": "417e5df74ff8190faec540e78ecad735e226634f70e12df55a43df5c10b1b4da" -} diff --git a/packages/adapters/storage/src/lib.rs b/packages/adapters/storage/src/lib.rs index e523e82d..a8ec7adb 100644 --- a/packages/adapters/storage/src/lib.rs +++ b/packages/adapters/storage/src/lib.rs @@ -199,7 +199,14 @@ mod tests { use clock::TestClock; use itertools::Itertools; use rand::{thread_rng, Rng}; - use services::types::{nonempty, CollectNonEmpty}; + use services::{ + block_bundler::port::Storage as BundlerStorage, + block_importer::port::Storage, + cost_reporter::port::Storage as CostStorage, + state_committer::port::Storage as CommitterStorage, + state_listener::port::Storage as ListenerStorage, + types::{nonempty, CollectNonEmpty, L1Tx, TransactionCostUpdate, TransactionState}, + }; use super::*; @@ -457,9 +464,6 @@ mod tests { #[tokio::test] async fn can_get_last_time_a_fragment_was_finalized() { - use services::state_committer::port::Storage; - use services::state_listener::port::Storage as ListenerStorage; - // given let storage = start_db().await; @@ -667,10 +671,6 @@ mod tests { #[tokio::test] async fn excludes_fragments_from_bundles_ending_before_starting_height() { - use services::{ - block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, - }; - // given let storage = start_db().await; let starting_height = 10; @@ -720,10 +720,6 @@ mod tests { #[tokio::test] async fn includes_fragments_from_bundles_ending_at_starting_height() { - use services::{ - block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, - }; - // given let storage = start_db().await; let starting_height = 10; @@ -757,10 +753,6 @@ mod tests { #[tokio::test] async fn can_get_next_bundle_id() { - use services::{ - block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, - }; - // given let storage = start_db().await; let starting_height = 10; @@ -794,8 +786,6 @@ mod tests { #[tokio::test] async fn empty_db_reports_missing_heights() -> Result<()> { - use services::block_importer::port::Storage; - // given let current_height = 10; let storage = start_db().await; @@ -811,8 +801,6 @@ mod tests { #[tokio::test] async fn missing_blocks_no_holes() -> Result<()> { - use services::block_importer::port::Storage; - // given let current_height = 10; let storage = start_db().await; @@ -830,8 +818,6 @@ mod tests { #[tokio::test] async fn reports_holes_in_blocks() -> Result<()> { - use services::block_importer::port::Storage; - // given let current_height = 15; let storage = start_db().await; @@ -850,8 +836,6 @@ mod tests { #[tokio::test] async fn can_retrieve_fragments_submitted_by_tx() -> Result<()> { - use services::state_committer::port::Storage; - // given let storage = start_db().await; @@ -876,8 +860,6 @@ mod tests { #[tokio::test] async fn can_get_latest_pending_txs() -> Result<()> { - use services::state_committer::port::Storage; - // given let storage = start_db().await; @@ -921,10 +903,6 @@ mod tests { #[tokio::test] async fn can_update_costs() -> Result<()> { - use services::cost_reporter::port::Storage; - use services::state_committer::port::Storage as StateStorage; - use services::state_listener::port::Storage as ListenerStorage; - // given let storage = start_db().await; @@ -1020,8 +998,6 @@ mod tests { #[tokio::test] async fn costs_returned_only_for_finalized_bundles() { - use services::cost_reporter::port::Storage; - // given let storage = start_db().await; let cost = 1000u128; @@ -1061,8 +1037,6 @@ mod tests { #[tokio::test] async fn costs_returned_only_for_finalized_with_replacement_txs() { - use services::cost_reporter::port::Storage; - // given let storage = start_db().await; let cost = 1000u128; @@ -1100,8 +1074,6 @@ mod tests { #[tokio::test] async fn respects_from_block_height_and_limit_in_get_finalized_costs() -> Result<()> { - use services::cost_reporter::port::Storage; - // given let storage = start_db().await; @@ -1138,8 +1110,6 @@ mod tests { #[tokio::test] async fn get_finalized_costs_from_middle_of_range() -> Result<()> { - use services::cost_reporter::port::Storage; - // given let storage = start_db().await; @@ -1176,8 +1146,6 @@ mod tests { #[tokio::test] async fn get_latest_finalized_costs() -> Result<()> { - use services::cost_reporter::port::Storage; - // given let storage = start_db().await; @@ -1207,4 +1175,120 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_fee_split_across_multiple_bundles() { + let storage = start_db().await; + + let bundle_a_id = storage.next_bundle_id().await.unwrap(); + let fragment_a = Fragment { + data: nonempty![0xaa], + unused_bytes: 0, + total_bytes: 10.try_into().unwrap(), + }; + storage + .insert_bundle_and_fragments(bundle_a_id, 1..=5, nonempty!(fragment_a.clone())) + .await + .unwrap(); + + let fragment_a_id = storage + .oldest_nonfinalized_fragments(0, 10) + .await + .unwrap() + .into_iter() + .find(|bf| bf.fragment.data == fragment_a.data) + .expect("Should have inserted fragment into BUNDLE A") + .id; + + let bundle_b_id = storage.next_bundle_id().await.unwrap(); + + let random_frag = || { + let data: [u8; 2] = thread_rng().gen(); + Fragment { + data: nonempty![data[0], data[1]], + unused_bytes: 0, + total_bytes: 20.try_into().unwrap(), + } + }; + + let b_fragments = std::iter::repeat_with(random_frag).take(3).collect_vec(); + + storage + .insert_bundle_and_fragments( + bundle_b_id, + 6..=10, // Another arbitrary range + NonEmpty::from_vec(b_fragments.clone()).unwrap(), + ) + .await + .unwrap(); + + let all_b_fragments = storage.oldest_nonfinalized_fragments(0, 10).await.unwrap(); + let find_id = |data: &NonEmpty| { + all_b_fragments + .iter() + .find(|bf| bf.fragment.data == *data) + .expect("Should have inserted fragment B1") + .id + }; + + let fragment_b1_id = find_id(&b_fragments[0].data); + let fragment_b2_id = find_id(&b_fragments[1].data); + let fragment_b3_id = find_id(&b_fragments[2].data); + + let tx_hash = [0; 32]; + let tx = L1Tx { + hash: tx_hash, + ..Default::default() + }; + + let all_frag_ids = nonempty![ + fragment_a_id, + fragment_b1_id, + fragment_b2_id, + fragment_b3_id + ]; + storage + .record_pending_tx(tx.clone(), all_frag_ids, Utc::now()) + .await + .unwrap(); + + let total_fee = 1000u128; + let changes = vec![(tx.hash, tx.nonce, TransactionState::Finalized(Utc::now()))]; + let cost_update = TransactionCostUpdate { + tx_hash, + total_fee, + da_block_height: 9999, + }; + storage + .update_tx_states_and_costs(vec![], changes, vec![cost_update.clone()]) + .await + .unwrap(); + + let all_costs = storage.get_finalized_costs(0, 10).await.unwrap(); + + let cost_a = all_costs + .iter() + .find(|bc| bc.id == bundle_a_id.get() as u64); + let cost_b = all_costs + .iter() + .find(|bc| bc.id == bundle_b_id.get() as u64); + + assert!( + cost_a.is_some(), + "Should have cost info for first bundle (A)" + ); + assert!( + cost_b.is_some(), + "Should have cost info for second bundle (B)" + ); + + let cost_a = cost_a.unwrap().cost; + let cost_b = cost_b.unwrap().cost; + + // - A has 1 fragment + // - B has 3 fragments + // => total 4 fragments, so we expect 1/4 of the fee for A (250) and 3/4 (750) for B. + assert_eq!(cost_a, 250, "Bundle A should get 25% of the 1000 fee"); + assert_eq!(cost_b, 750, "Bundle B should get 75% of the 1000 fee"); + } } diff --git a/packages/adapters/storage/src/postgres.rs b/packages/adapters/storage/src/postgres.rs index bef23139..014e599f 100644 --- a/packages/adapters/storage/src/postgres.rs +++ b/packages/adapters/storage/src/postgres.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, ops::RangeInclusive}; -use crate::postgres::tables::u128_to_bigdecimal; use itertools::Itertools; use metrics::{prometheus::IntGauge, RegistersMetrics}; use services::types::{ @@ -14,7 +13,10 @@ use sqlx::{ }; use super::error::{Error, Result}; -use crate::mappings::tables::{self, L1TxState}; +use crate::{ + mappings::tables::{self, L1TxState}, + postgres::tables::u128_to_bigdecimal, +}; #[derive(Debug, Clone)] struct Metrics { @@ -758,41 +760,60 @@ impl Postgres { da_block_height, } in cost_per_tx { - let row = sqlx::query!( + let rows = sqlx::query!( r#" - SELECT - f.bundle_id, - SUM(f.total_bytes)::BIGINT AS total_bytes, - SUM(f.unused_bytes)::BIGINT AS unused_bytes - FROM - l1_blob_transaction t - JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id - JOIN l1_fragments f ON tf.fragment_id = f.id - WHERE - t.hash = $1 - GROUP BY - f.bundle_id - "#, + SELECT + f.bundle_id, + SUM(f.total_bytes)::BIGINT AS total_bytes, + SUM(f.unused_bytes)::BIGINT AS unused_bytes, + COUNT(*)::BIGINT AS fragment_count + FROM l1_blob_transaction t + JOIN l1_transaction_fragments tf ON t.id = tf.transaction_id + JOIN l1_fragments f ON tf.fragment_id = f.id + WHERE t.hash = $1 + GROUP BY f.bundle_id + "#, tx_hash.as_slice() ) - .fetch_one(&mut *tx) + .fetch_all(&mut *tx) .await?; - let bundle_id = row.bundle_id; - let total_bytes: i64 = row.total_bytes.unwrap_or(0); - let unused_bytes: i64 = row.unused_bytes.unwrap_or(0); - let size_contribution = total_bytes.saturating_sub(unused_bytes) as u64; - - let entry = bundle_updates.entry(bundle_id).or_insert(BundleCostUpdate { - cost_contribution: 0, - size_contribution: 0, - latest_da_block_height: 0, - }); - - entry.cost_contribution = entry.cost_contribution.saturating_add(*total_fee); - entry.size_contribution = entry.size_contribution.saturating_add(size_contribution); - // Update with the latest da_block_height - entry.latest_da_block_height = *da_block_height; + let total_fragments_in_tx = rows + .iter() + .map(|r| r.fragment_count.unwrap_or(0) as u64) + .sum::(); + + for row in rows { + let bundle_id = row.bundle_id; + + let frag_count_in_bundle = row.fragment_count.unwrap_or(0) as u64; + let total_bytes = row.total_bytes.unwrap_or(0).max(0) as u64; + let unused_bytes = row.unused_bytes.unwrap_or(0).max(0) as u64; + let used_bytes = total_bytes.saturating_sub(unused_bytes); + + const PPM: u128 = 1_000_000; + let fraction_in_ppm = if total_fragments_in_tx == 0 { + 0u128 + } else { + u128::from(frag_count_in_bundle) + .saturating_mul(PPM) + .saturating_div(u128::from(total_fragments_in_tx)) + }; + + let cost_contribution = fraction_in_ppm + .saturating_mul(*total_fee) + .saturating_div(PPM); + + let entry = bundle_updates.entry(bundle_id).or_insert(BundleCostUpdate { + cost_contribution: 0, + size_contribution: 0, + latest_da_block_height: 0, + }); + + entry.cost_contribution = entry.cost_contribution.saturating_add(cost_contribution); + entry.size_contribution = entry.size_contribution.saturating_add(used_bytes); + entry.latest_da_block_height = entry.latest_da_block_height.max(*da_block_height); + } } Ok(bundle_updates) @@ -1169,15 +1190,13 @@ fn create_ranges(heights: Vec) -> Vec> { mod tests { use std::{env, fs, path::Path}; + use rand::Rng; + use services::types::{CollectNonEmpty, Fragment, L1Tx, TransactionState}; use sqlx::{Executor, PgPool, Row}; use tokio::time::Instant; - use crate::test_instance; - use super::*; - - use rand::Rng; - use services::types::{CollectNonEmpty, Fragment, L1Tx, TransactionState}; + use crate::test_instance; #[tokio::test] async fn test_second_migration_applies_successfully() { @@ -1440,9 +1459,10 @@ mod tests { #[tokio::test] async fn stress_test_update_costs() -> Result<()> { - use services::block_bundler::port::Storage; - use services::state_committer::port::Storage as CommitterStorage; - use services::state_listener::port::Storage as ListenerStorage; + use services::{ + block_bundler::port::Storage, state_committer::port::Storage as CommitterStorage, + state_listener::port::Storage as ListenerStorage, + }; let mut rng = rand::thread_rng(); diff --git a/packages/adapters/storage/src/test_instance.rs b/packages/adapters/storage/src/test_instance.rs index 4056593e..6d9d70d7 100644 --- a/packages/adapters/storage/src/test_instance.rs +++ b/packages/adapters/storage/src/test_instance.rs @@ -1,3 +1,9 @@ +use std::{ + borrow::Cow, + ops::RangeInclusive, + sync::{Arc, Weak}, +}; + use delegate::delegate; use services::{ block_bundler, block_committer, block_importer, @@ -8,11 +14,6 @@ use services::{ }, }; use sqlx::Executor; -use std::{ - borrow::Cow, - ops::RangeInclusive, - sync::{Arc, Weak}, -}; use testcontainers::{ core::{ContainerPort, WaitFor}, runners::AsyncRunner,