From 4ca8b1fcafaec863018a64a515c3d518f3156e36 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Mon, 6 Jan 2025 22:46:21 +0100 Subject: [PATCH 1/6] MTG-1110 Add request time out for DB query - Added configurable timeout for database queries in the API service. - Added new test cases to verify the behavior of the timeout mechanism. - Added a readme file with a second option for a global query execution time limit. --- integration_tests/src/common.rs | 1 + nft_ingester/readme.md | 16 +++++++ nft_ingester/src/api/api_impl.rs | 24 +++++++--- nft_ingester/src/api/error.rs | 4 +- nft_ingester/src/api/service.rs | 2 + nft_ingester/src/bin/api/main.rs | 1 + nft_ingester/src/bin/ingester/main.rs | 1 + nft_ingester/src/config.rs | 7 +++ nft_ingester/tests/api_tests.rs | 68 ++++++++++++++++++++++++++- nft_ingester/tests/batch_mint_test.rs | 5 ++ nft_ingester/tests/bubblegum_tests.rs | 3 ++ nft_ingester/tests/decompress.rs | 5 ++ nft_ingester/tests/dump_tests.rs | 2 + postgre-client/src/lib.rs | 2 +- 14 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 nft_ingester/readme.md diff --git a/integration_tests/src/common.rs b/integration_tests/src/common.rs index 9b988b777..59fb3fe1f 100644 --- a/integration_tests/src/common.rs +++ b/integration_tests/src/common.rs @@ -179,6 +179,7 @@ impl TestSetup { None, )), "11111111111111111111111111111111".to_string(), + 120 ); let message_parser = MessageParser::new(); diff --git a/nft_ingester/readme.md b/nft_ingester/readme.md new file mode 100644 index 000000000..06bbd7be7 --- /dev/null +++ b/nft_ingester/readme.md @@ -0,0 +1,16 @@ + + + + + + + +## Tips + +To set a global limit on request execution for PG DB, you can use the URL parameter **statement_timeout**. + +Example: +`postgres://user:password@localhost/dbname?statement_timeout=2000` + +To limit only API requests use config option **api_query_max_statement_timeout_sec** + diff --git a/nft_ingester/src/api/api_impl.rs b/nft_ingester/src/api/api_impl.rs index a70a6f098..23dbe0b5c 100644 --- a/nft_ingester/src/api/api_impl.rs +++ b/nft_ingester/src/api/api_impl.rs @@ -5,6 +5,7 @@ use interface::processing_possibility::ProcessingPossibilityChecker; use interface::proofs::ProofChecker; use postgre_client::PgClient; use std::{sync::Arc, time::Instant}; +use std::time::Duration; use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; @@ -32,6 +33,7 @@ use metrics_utils::ApiMetricsConfig; use rocks_db::Storage; use serde_json::{json, Value}; use solana_sdk::pubkey::Pubkey; +use tokio::time::timeout; use usecase::validation::{validate_opt_pubkey, validate_pubkey}; const MAX_ITEMS_IN_BATCH_REQ: usize = 1000; @@ -60,6 +62,7 @@ where storage_service_base_path: Option, token_price_fetcher: Arc, native_mint_pubkey: String, + max_query_statement_timeout_sec: u64, } pub fn not_found() -> DasApiError { @@ -90,6 +93,7 @@ where storage_service_base_path: Option, token_price_fetcher: Arc, native_mint_pubkey: String, + max_query_statement_timeout_sec: u64, ) -> Self { DasApi { pg_client, @@ -105,6 +109,7 @@ where storage_service_base_path, token_price_fetcher, native_mint_pubkey, + max_query_statement_timeout_sec, } } @@ -116,7 +121,7 @@ where self.pg_client .check_health() .await - .map_err(|_| DasApiError::InternalDdError)?; + .map_err(|e| DasApiError::InternalDdError(e.to_string()))?; self.metrics .set_latency(label, latency_timer.elapsed().as_millis() as f64); @@ -714,7 +719,7 @@ where Self::validate_basic_pagination(&pagination, self.max_page_limit)?; Self::validate_options(&options, &query)?; - let res = search_assets( + let res = timeout(Duration::from_secs(self.max_query_statement_timeout_sec), search_assets( pg_client, rocks_db, query, @@ -735,9 +740,16 @@ where self.metrics.clone(), &self.tree_gaps_checker, &self.native_mint_pubkey, - ) - .await?; - - Ok(res) + )).await; + + match res { + Ok(Ok(res)) => Ok(res), + Ok(Err(e)) => { + Err(DasApiError::InternalDdError(e.to_string())) + }, + Err(_) => { + Err(DasApiError::QueryTimedOut) + } + } } } diff --git a/nft_ingester/src/api/error.rs b/nft_ingester/src/api/error.rs index 767193d36..54256cfc7 100644 --- a/nft_ingester/src/api/error.rs +++ b/nft_ingester/src/api/error.rs @@ -39,11 +39,13 @@ pub enum DasApiError { #[error("Page number is too big. Up to {0} pages are supported with this kind of pagination. Please use a different pagination(before/after/cursor).")] PageTooBig(usize), #[error("Internal DB error")] - InternalDdError, + InternalDdError(String), #[error("CannotServiceRequest")] CannotServiceRequest, #[error("MissingOwnerAddress")] MissingOwnerAddress, + #[error("QueryTimedOut")] + QueryTimedOut, } impl From for jsonrpc_core::Error { diff --git a/nft_ingester/src/api/service.rs b/nft_ingester/src/api/service.rs index 236423da4..f860aa3b1 100644 --- a/nft_ingester/src/api/service.rs +++ b/nft_ingester/src/api/service.rs @@ -71,6 +71,7 @@ pub async fn start_api( account_balance_getter: Arc, storage_service_base_url: Option, native_mint_pubkey: String, + api_query_max_statement_timeout_sec: u64, ) -> Result<(), DasApiError> { let response_middleware = RpcResponseMiddleware {}; let request_middleware = RpcRequestMiddleware::new(archives_dir); @@ -127,6 +128,7 @@ pub async fn start_api( red_metrics, )), native_mint_pubkey, + api_query_max_statement_timeout_sec ); run_api( diff --git a/nft_ingester/src/bin/api/main.rs b/nft_ingester/src/bin/api/main.rs index 7061c044a..7c6b6c133 100644 --- a/nft_ingester/src/bin/api/main.rs +++ b/nft_ingester/src/bin/api/main.rs @@ -155,6 +155,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, + args.api_query_max_statement_timeout_sec ) .await { diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 6fb854696..d9656f70c 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -306,6 +306,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, + args.api_max_query_statement_timeout_sec ) .await { diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 6ff2057ef..461c3b69a 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -110,6 +110,7 @@ pub struct IngesterClapArgs { requires = "rocks_backup_archives_dir" )] pub is_restore_rocks_db: bool, + #[clap(long, env, help = "Rocks backup url")] pub rocks_backup_url: Option, #[clap(long, env, help = "Rocks backup archives dir")] @@ -251,6 +252,9 @@ pub struct IngesterClapArgs { #[clap(long, env, default_value = "500", help = "#grpc retry interval millis")] pub rpc_retry_interval_millis: u64, + #[clap(long, env, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")] + pub api_max_query_statement_timeout_sec: u64, + #[clap( long, env = "INGESTER_METRICS_PORT", @@ -430,6 +434,9 @@ pub struct ApiClapArgs { #[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")] pub heap_path: String, + #[clap(long, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")] + pub api_query_max_statement_timeout_sec: u64, + #[clap( long, env = "API_METRICS_PORT", diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 63dfe31d4..a030d6799 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -12,7 +12,7 @@ mod tests { use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; - + use std::time::Duration; use blockbuster::token_metadata::accounts::Metadata; use entities::api_req_params::{ DisplayOptions, GetAssetProof, GetAssetSignatures, GetByMethodsOptions, GetCoreFees, @@ -66,12 +66,16 @@ mod tests { use spl_pod::optional_keys::OptionalNonZeroPubkey; use spl_pod::primitives::{PodU16, PodU64}; use spl_token_2022::extension::interest_bearing_mint::BasisPoints; - use sqlx::QueryBuilder; + use sqlx::{query, QueryBuilder}; use testcontainers::clients::Cli; use tokio::{sync::Mutex, task::JoinSet}; + use tokio::time::timeout; use usecase::proofs::MaybeProofChecker; const SLOT_UPDATED: u64 = 100; + // api_query_max_statement_timeout_sec + const API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC: u64 = 120; + // corresponds to So11111111111111111111111111111111111111112 pub const NATIVE_MINT_PUBKEY: Pubkey = Pubkey::new_from_array([ 6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220, @@ -104,6 +108,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -505,6 +510,43 @@ mod tests { env.teardown().await; } + #[tokio::test] + async fn test_api_query_timeout_expected() { + let cnt = 20; + let cli = Cli::default(); + let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let pg_pool = env.pg_env.pool.clone(); + + let query_timeout = Duration::from_secs(2); + + let result = timeout(query_timeout, query("SELECT pg_sleep(3)").execute(&pg_pool)).await; + + match result { + Ok(Ok(res)) => assert!(false, "Query should have timed out, but completed successfully: {:?}", res), + Ok(Err(e)) => assert!(false, "Query should have timed out, but failed: {:?}", e), + Err(_) => println!("Query timed out as expected"), + } + } + + #[tokio::test] + async fn test_api_query_timeout_not_expected() { + let cnt = 20; + let cli = Cli::default(); + let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; + let pg_pool = env.pg_env.pool.clone(); + + let query_timeout = Duration::from_secs(3); + + let result = timeout(query_timeout, query("SELECT pg_sleep(1)").execute(&pg_pool)).await; + + match result { + Ok(Ok(res)) => println!("Query completed successfully: {:?}", res), + Ok(Err(e)) => assert!(false, "Query should completed successfully, but failed: {:?}", e), + Err(_) => assert!(false, "Query should completed successfully but have timedout",), + } + } + + #[tokio::test] #[tracing_test::traced_test] async fn test_asset_none_grouping_with_token_standard() { @@ -532,6 +574,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -686,6 +729,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -816,6 +860,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -989,6 +1034,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -1164,6 +1210,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -1323,6 +1370,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let first_tree = Pubkey::new_unique(); @@ -1544,6 +1592,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let token_updates_processor = @@ -1768,6 +1817,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let token_updates_processor = @@ -2033,6 +2083,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2097,6 +2148,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2158,6 +2210,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2219,6 +2272,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2332,6 +2386,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let pb = Pubkey::new_unique(); @@ -2490,6 +2545,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let asset_id = Pubkey::new_unique(); let tree_id = Pubkey::new_unique(); @@ -2550,6 +2606,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let asset_fees_count = 1000; let mut asset_ids = Vec::with_capacity(asset_fees_count); @@ -2641,6 +2698,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2731,6 +2789,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2862,6 +2921,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3033,6 +3093,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3249,6 +3310,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3543,6 +3605,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3729,6 +3792,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); diff --git a/nft_ingester/tests/batch_mint_test.rs b/nft_ingester/tests/batch_mint_test.rs index 2f722527d..ca3857247 100644 --- a/nft_ingester/tests/batch_mint_test.rs +++ b/nft_ingester/tests/batch_mint_test.rs @@ -64,6 +64,8 @@ use tokio::sync::broadcast; use usecase::proofs::MaybeProofChecker; use uuid::Uuid; + +pub const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; #[test] #[cfg(feature = "batch_mint_tests")] fn test_generate_1_000_batch_mint() { @@ -427,6 +429,7 @@ async fn batch_mint_with_verified_creators_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let payload = GetAssetProof { @@ -583,6 +586,7 @@ async fn batch_mint_with_unverified_creators_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let payload = GetAssetProof { @@ -682,6 +686,7 @@ async fn batch_mint_persister_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let leaf_index = 4u32; diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 201ad4bba..4f9eabc3e 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -31,6 +31,7 @@ mod tests { 6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220, 26, 235, 59, 85, 152, 160, 240, 0, 0, 0, 0, 1, ]); + pub const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; #[tokio::test] #[tracing_test::traced_test] @@ -91,6 +92,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let _buffer = Arc::new(Buffer::new()); @@ -203,6 +205,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let _buffer = Arc::new(Buffer::new()); diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index c37789245..b77b64e55 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -38,6 +38,7 @@ mod tests { 6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220, 26, 235, 59, 85, 152, 160, 240, 0, 0, 0, 0, 1, ]); + pub const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; // 242856151 slot when decompress happened @@ -236,6 +237,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let buffer = Arc::new(Buffer::new()); @@ -331,6 +333,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let buffer = Arc::new(Buffer::new()); @@ -426,6 +429,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let buffer = Arc::new(Buffer::new()); @@ -521,6 +525,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ); let buffer = Arc::new(Buffer::new()); diff --git a/nft_ingester/tests/dump_tests.rs b/nft_ingester/tests/dump_tests.rs index c57ad472f..4af08d1e7 100644 --- a/nft_ingester/tests/dump_tests.rs +++ b/nft_ingester/tests/dump_tests.rs @@ -153,6 +153,7 @@ mod mtg_441_tests { use crate::tests::NATIVE_MINT_PUBKEY; const SLOT_UPDATED: u64 = 100; + pub const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; fn get_das_api( env: &TestEnvironment, @@ -185,6 +186,7 @@ mod mtg_441_tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC ) } diff --git a/postgre-client/src/lib.rs b/postgre-client/src/lib.rs index 69869f9f0..063228154 100644 --- a/postgre-client/src/lib.rs +++ b/postgre-client/src/lib.rs @@ -60,7 +60,7 @@ impl PgClient { base_dump_path: Option, metrics: Arc, ) -> Result { - let mut options: PgConnectOptions = url.parse().unwrap(); + let mut options: PgConnectOptions = url.parse()?; options.log_statements(LevelFilter::Off); options.log_slow_statements(LevelFilter::Off, Duration::from_secs(100)); From d166c8de6b900cd7ccb54d80af1fc683cc2d02bb Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:45:20 +0100 Subject: [PATCH 2/6] MTG-1110 Add request time out for DB query - Fix Lint issues --- integration_tests/src/common.rs | 2 +- nft_ingester/src/api/api_impl.rs | 58 +++++++++++++-------------- nft_ingester/src/api/service.rs | 2 +- nft_ingester/src/bin/api/main.rs | 2 +- nft_ingester/src/bin/ingester/main.rs | 2 +- nft_ingester/src/config.rs | 15 +++++-- nft_ingester/tests/api_tests.rs | 30 +++++++++----- nft_ingester/tests/batch_mint_test.rs | 7 ++-- nft_ingester/tests/bubblegum_tests.rs | 4 +- nft_ingester/tests/decompress.rs | 8 ++-- nft_ingester/tests/dump_tests.rs | 2 +- 11 files changed, 75 insertions(+), 57 deletions(-) diff --git a/integration_tests/src/common.rs b/integration_tests/src/common.rs index 59fb3fe1f..43c4ddf99 100644 --- a/integration_tests/src/common.rs +++ b/integration_tests/src/common.rs @@ -179,7 +179,7 @@ impl TestSetup { None, )), "11111111111111111111111111111111".to_string(), - 120 + 120, ); let message_parser = MessageParser::new(); diff --git a/nft_ingester/src/api/api_impl.rs b/nft_ingester/src/api/api_impl.rs index 23dbe0b5c..8e5deae61 100644 --- a/nft_ingester/src/api/api_impl.rs +++ b/nft_ingester/src/api/api_impl.rs @@ -4,8 +4,8 @@ use interface::json::{JsonDownloader, JsonPersister}; use interface::processing_possibility::ProcessingPossibilityChecker; use interface::proofs::ProofChecker; use postgre_client::PgClient; -use std::{sync::Arc, time::Instant}; use std::time::Duration; +use std::{sync::Arc, time::Instant}; use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; @@ -719,37 +719,37 @@ where Self::validate_basic_pagination(&pagination, self.max_page_limit)?; Self::validate_options(&options, &query)?; - let res = timeout(Duration::from_secs(self.max_query_statement_timeout_sec), search_assets( - pg_client, - rocks_db, - query, - sort_by, - pagination.limit.map_or(DEFAULT_LIMIT as u64, |l| l as u64), - pagination.page.map(|p| p as u64), - pagination.before, - pagination.after, - pagination.cursor, - options, - self.json_downloader.clone(), - self.json_persister.clone(), - self.json_middleware_config.max_urls_to_parse, - tasks, - self.account_balance_getter.clone(), - self.storage_service_base_path.clone(), - self.token_price_fetcher.clone(), - self.metrics.clone(), - &self.tree_gaps_checker, - &self.native_mint_pubkey, - )).await; + let res = timeout( + Duration::from_secs(self.max_query_statement_timeout_sec), + search_assets( + pg_client, + rocks_db, + query, + sort_by, + pagination.limit.map_or(DEFAULT_LIMIT as u64, |l| l as u64), + pagination.page.map(|p| p as u64), + pagination.before, + pagination.after, + pagination.cursor, + options, + self.json_downloader.clone(), + self.json_persister.clone(), + self.json_middleware_config.max_urls_to_parse, + tasks, + self.account_balance_getter.clone(), + self.storage_service_base_path.clone(), + self.token_price_fetcher.clone(), + self.metrics.clone(), + &self.tree_gaps_checker, + &self.native_mint_pubkey, + ), + ) + .await; match res { Ok(Ok(res)) => Ok(res), - Ok(Err(e)) => { - Err(DasApiError::InternalDdError(e.to_string())) - }, - Err(_) => { - Err(DasApiError::QueryTimedOut) - } + Ok(Err(e)) => Err(DasApiError::InternalDdError(e.to_string())), + Err(_) => Err(DasApiError::QueryTimedOut), } } } diff --git a/nft_ingester/src/api/service.rs b/nft_ingester/src/api/service.rs index f860aa3b1..3f51d2423 100644 --- a/nft_ingester/src/api/service.rs +++ b/nft_ingester/src/api/service.rs @@ -128,7 +128,7 @@ pub async fn start_api( red_metrics, )), native_mint_pubkey, - api_query_max_statement_timeout_sec + api_query_max_statement_timeout_sec, ); run_api( diff --git a/nft_ingester/src/bin/api/main.rs b/nft_ingester/src/bin/api/main.rs index 7c6b6c133..3e06d9120 100644 --- a/nft_ingester/src/bin/api/main.rs +++ b/nft_ingester/src/bin/api/main.rs @@ -155,7 +155,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, - args.api_query_max_statement_timeout_sec + args.api_query_max_statement_timeout_sec, ) .await { diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index d9656f70c..fa2acbdf3 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -306,7 +306,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, - args.api_max_query_statement_timeout_sec + args.api_max_query_statement_timeout_sec, ) .await { diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 461c3b69a..701a1d91c 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -110,7 +110,7 @@ pub struct IngesterClapArgs { requires = "rocks_backup_archives_dir" )] pub is_restore_rocks_db: bool, - + #[clap(long, env, help = "Rocks backup url")] pub rocks_backup_url: Option, #[clap(long, env, help = "Rocks backup archives dir")] @@ -252,7 +252,12 @@ pub struct IngesterClapArgs { #[clap(long, env, default_value = "500", help = "#grpc retry interval millis")] pub rpc_retry_interval_millis: u64, - #[clap(long, env, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")] + #[clap( + long, + env, + default_value = "120", + help = "Specifies the maximum execution time of a SQL query for API." + )] pub api_max_query_statement_timeout_sec: u64, #[clap( @@ -434,7 +439,11 @@ pub struct ApiClapArgs { #[clap(long, env, default_value = "/usr/src/app/heaps", help = "Heap path")] pub heap_path: String, - #[clap(long, default_value = "120", help= "Specifies the maximum execution time of a SQL query for API.")] + #[clap( + long, + default_value = "120", + help = "Specifies the maximum execution time of a SQL query for API." + )] pub api_query_max_statement_timeout_sec: u64, #[clap( diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index a030d6799..a1279aa3c 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -10,9 +10,6 @@ mod tests { use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; use rocks_db::column::TypedColumn; - use std::str::FromStr; - use std::{collections::HashMap, sync::Arc}; - use std::time::Duration; use blockbuster::token_metadata::accounts::Metadata; use entities::api_req_params::{ DisplayOptions, GetAssetProof, GetAssetSignatures, GetByMethodsOptions, GetCoreFees, @@ -67,15 +64,18 @@ mod tests { use spl_pod::primitives::{PodU16, PodU64}; use spl_token_2022::extension::interest_bearing_mint::BasisPoints; use sqlx::{query, QueryBuilder}; + use std::str::FromStr; + use std::time::Duration; + use std::{collections::HashMap, sync::Arc}; use testcontainers::clients::Cli; - use tokio::{sync::Mutex, task::JoinSet}; use tokio::time::timeout; + use tokio::{sync::Mutex, task::JoinSet}; use usecase::proofs::MaybeProofChecker; const SLOT_UPDATED: u64 = 100; // api_query_max_statement_timeout_sec const API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC: u64 = 120; - + // corresponds to So11111111111111111111111111111111111111112 pub const NATIVE_MINT_PUBKEY: Pubkey = Pubkey::new_from_array([ 6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220, @@ -517,12 +517,16 @@ mod tests { let (env, _) = setup::TestEnvironment::create(&cli, cnt, SLOT_UPDATED).await; let pg_pool = env.pg_env.pool.clone(); - let query_timeout = Duration::from_secs(2); + let query_timeout = Duration::from_secs(2); let result = timeout(query_timeout, query("SELECT pg_sleep(3)").execute(&pg_pool)).await; match result { - Ok(Ok(res)) => assert!(false, "Query should have timed out, but completed successfully: {:?}", res), + Ok(Ok(res)) => assert!( + false, + "Query should have timed out, but completed successfully: {:?}", + res + ), Ok(Err(e)) => assert!(false, "Query should have timed out, but failed: {:?}", e), Err(_) => println!("Query timed out as expected"), } @@ -541,12 +545,18 @@ mod tests { match result { Ok(Ok(res)) => println!("Query completed successfully: {:?}", res), - Ok(Err(e)) => assert!(false, "Query should completed successfully, but failed: {:?}", e), - Err(_) => assert!(false, "Query should completed successfully but have timedout",), + Ok(Err(e)) => assert!( + false, + "Query should completed successfully, but failed: {:?}", + e + ), + Err(_) => assert!( + false, + "Query should completed successfully but have timedout", + ), } } - #[tokio::test] #[tracing_test::traced_test] async fn test_asset_none_grouping_with_token_standard() { diff --git a/nft_ingester/tests/batch_mint_test.rs b/nft_ingester/tests/batch_mint_test.rs index ca3857247..b07259e13 100644 --- a/nft_ingester/tests/batch_mint_test.rs +++ b/nft_ingester/tests/batch_mint_test.rs @@ -64,7 +64,6 @@ use tokio::sync::broadcast; use usecase::proofs::MaybeProofChecker; use uuid::Uuid; - pub const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; #[test] #[cfg(feature = "batch_mint_tests")] @@ -429,7 +428,7 @@ async fn batch_mint_with_verified_creators_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let payload = GetAssetProof { @@ -586,7 +585,7 @@ async fn batch_mint_with_unverified_creators_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let payload = GetAssetProof { @@ -686,7 +685,7 @@ async fn batch_mint_persister_test() { None, Arc::new(RaydiumTokenPriceFetcher::default()), "".to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let leaf_index = 4u32; diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 4f9eabc3e..26b3721fa 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -92,7 +92,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let _buffer = Arc::new(Buffer::new()); @@ -205,7 +205,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let _buffer = Arc::new(Buffer::new()); diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index b77b64e55..38240da4c 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -237,7 +237,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let buffer = Arc::new(Buffer::new()); @@ -333,7 +333,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let buffer = Arc::new(Buffer::new()); @@ -429,7 +429,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let buffer = Arc::new(Buffer::new()); @@ -525,7 +525,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let buffer = Arc::new(Buffer::new()); diff --git a/nft_ingester/tests/dump_tests.rs b/nft_ingester/tests/dump_tests.rs index 4af08d1e7..c50c025a3 100644 --- a/nft_ingester/tests/dump_tests.rs +++ b/nft_ingester/tests/dump_tests.rs @@ -186,7 +186,7 @@ mod mtg_441_tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ) } From 0695806b8288afcb555f2c33ccfe4786e46048b4 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:28:54 +0100 Subject: [PATCH 3/6] MTG-1110 Add request time out for DB query - Implemented suggestions from the PR review. - Added readme file with some tips. --- README.md | 18 +++ grpc/src/asseturls.rs | 126 ++++++++++------ grpc/src/gapfiller.rs | 206 +++++++++++++++++--------- nft_ingester/readme.md | 2 +- nft_ingester/src/api/api_impl.rs | 4 +- nft_ingester/src/api/error.rs | 2 +- nft_ingester/src/api/service.rs | 4 +- nft_ingester/src/bin/api/main.rs | 2 +- nft_ingester/src/bin/ingester/main.rs | 2 +- nft_ingester/src/config.rs | 4 +- nft_ingester/tests/api_tests.rs | 49 +++--- 11 files changed, 267 insertions(+), 152 deletions(-) diff --git a/README.md b/README.md index fc4a10176..61a8282d8 100644 --- a/README.md +++ b/README.md @@ -60,5 +60,23 @@ The project's structure is a reflection of the following clean architecture prin The API specification is compatible with the standard DAS specification here https://github.com/metaplex-foundation/api-specifications + ### Developing and running + +#### PR/Code requirements +1) CI/CD has code formating checker so use FTM before code commit: `cargo fmt` +2) + +#### Run Integration tests +Integration tests require Postgres db url, devnet and mainnet rpc + +How to run with CLI: +```cml +DATABASE_TEST_URL='postgres://solana:solana@localhost:5432/aura_db' DEVNET_RPC_URL='https://devnet-aura.metaplex.com/{YOUR_TOKEN_ACCESS}' MAINNET_RPC_URL='https://mainnet-aura.metaplex.com/{YOUR_TOKEN_ACCESS}' cargo test --features integration_tests +``` + + + Full documentation and contribution guidelines coming soon… + + diff --git a/grpc/src/asseturls.rs b/grpc/src/asseturls.rs index 02d0c76ae..ad9079f36 100644 --- a/grpc/src/asseturls.rs +++ b/grpc/src/asseturls.rs @@ -90,8 +90,8 @@ impl DownloadError { /// Generated client implementations. pub mod asset_url_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct AssetUrlServiceClient { inner: tonic::client::Grpc, @@ -135,8 +135,9 @@ pub mod asset_url_service_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { AssetUrlServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -175,22 +176,31 @@ pub mod asset_url_service_client { pub async fn get_asset_urls_to_download( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/asseturls.AssetUrlService/GetAssetUrlsToDownload", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "asseturls.AssetUrlService", - "GetAssetUrlsToDownload", - )); + req.extensions_mut() + .insert( + GrpcMethod::new( + "asseturls.AssetUrlService", + "GetAssetUrlsToDownload", + ), + ); self.inner.unary(req, path, codec).await } /// Used to notify about asset download results @@ -198,21 +208,24 @@ pub mod asset_url_service_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/asseturls.AssetUrlService/SubmitDownloadResult", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "asseturls.AssetUrlService", - "SubmitDownloadResult", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("asseturls.AssetUrlService", "SubmitDownloadResult"), + ); self.inner.unary(req, path, codec).await } } @@ -228,7 +241,10 @@ pub mod asset_url_service_server { async fn get_asset_urls_to_download( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Used to notify about asset download results async fn submit_download_result( &self, @@ -258,7 +274,10 @@ pub mod asset_url_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -314,18 +333,25 @@ pub mod asset_url_service_server { "/asseturls.AssetUrlService/GetAssetUrlsToDownload" => { #[allow(non_camel_case_types)] struct GetAssetUrlsToDownloadSvc(pub Arc); - impl tonic::server::UnaryService - for GetAssetUrlsToDownloadSvc - { + impl< + T: AssetUrlService, + > tonic::server::UnaryService + for GetAssetUrlsToDownloadSvc { type Response = super::AssetsToDownload; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_asset_urls_to_download(&inner, request) + ::get_asset_urls_to_download( + &inner, + request, + ) .await }; Box::pin(fut) @@ -357,19 +383,25 @@ pub mod asset_url_service_server { "/asseturls.AssetUrlService/SubmitDownloadResult" => { #[allow(non_camel_case_types)] struct SubmitDownloadResultSvc(pub Arc); - impl - tonic::server::UnaryService - for SubmitDownloadResultSvc - { + impl< + T: AssetUrlService, + > tonic::server::UnaryService + for SubmitDownloadResultSvc { type Response = (); - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::submit_download_result(&inner, request) + ::submit_download_result( + &inner, + request, + ) .await }; Box::pin(fut) @@ -398,14 +430,18 @@ pub mod asset_url_service_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 0d6e0cfac..4aa7e228a 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -449,10 +449,16 @@ impl SpecificationAssetClass { pub fn as_str_name(&self) -> &'static str { match self { SpecificationAssetClass::Unknown => "SPECIFICATION_ASSET_CLASS_UNKNOWN", - SpecificationAssetClass::FungibleToken => "SPECIFICATION_ASSET_CLASS_FUNGIBLE_TOKEN", - SpecificationAssetClass::FungibleAsset => "SPECIFICATION_ASSET_CLASS_FUNGIBLE_ASSET", + SpecificationAssetClass::FungibleToken => { + "SPECIFICATION_ASSET_CLASS_FUNGIBLE_TOKEN" + } + SpecificationAssetClass::FungibleAsset => { + "SPECIFICATION_ASSET_CLASS_FUNGIBLE_ASSET" + } SpecificationAssetClass::Nft => "SPECIFICATION_ASSET_CLASS_NFT", - SpecificationAssetClass::PrintableNft => "SPECIFICATION_ASSET_CLASS_PRINTABLE_NFT", + SpecificationAssetClass::PrintableNft => { + "SPECIFICATION_ASSET_CLASS_PRINTABLE_NFT" + } SpecificationAssetClass::ProgrammableNft => { "SPECIFICATION_ASSET_CLASS_PROGRAMMABLE_NFT" } @@ -463,8 +469,12 @@ impl SpecificationAssetClass { SpecificationAssetClass::NonTransferableNft => { "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" } - SpecificationAssetClass::IdentityNft => "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT", - SpecificationAssetClass::MplCoreAsset => "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET", + SpecificationAssetClass::IdentityNft => { + "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT" + } + SpecificationAssetClass::MplCoreAsset => { + "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET" + } SpecificationAssetClass::MplCoreCollection => { "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" } @@ -483,10 +493,14 @@ impl SpecificationAssetClass { "SPECIFICATION_ASSET_CLASS_TRANSFER_RESTRICTED_NFT" => { Some(Self::TransferRestrictedNft) } - "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" => Some(Self::NonTransferableNft), + "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" => { + Some(Self::NonTransferableNft) + } "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT" => Some(Self::IdentityNft), "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET" => Some(Self::MplCoreAsset), - "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" => Some(Self::MplCoreCollection), + "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" => { + Some(Self::MplCoreCollection) + } _ => None, } } @@ -568,7 +582,9 @@ impl TokenStandard { TokenStandard::Fungible => "FUNGIBLE", TokenStandard::NonFungibleEdition => "NON_FUNGIBLE_EDITION", TokenStandard::ProgrammableNonFungible => "PROGRAMMABLE_NON_FUNGIBLE", - TokenStandard::ProgrammableNonFungibleEdition => "PROGRAMMABLE_NON_FUNGIBLE_EDITION", + TokenStandard::ProgrammableNonFungibleEdition => { + "PROGRAMMABLE_NON_FUNGIBLE_EDITION" + } } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -579,7 +595,9 @@ impl TokenStandard { "FUNGIBLE" => Some(Self::Fungible), "NON_FUNGIBLE_EDITION" => Some(Self::NonFungibleEdition), "PROGRAMMABLE_NON_FUNGIBLE" => Some(Self::ProgrammableNonFungible), - "PROGRAMMABLE_NON_FUNGIBLE_EDITION" => Some(Self::ProgrammableNonFungibleEdition), + "PROGRAMMABLE_NON_FUNGIBLE_EDITION" => { + Some(Self::ProgrammableNonFungibleEdition) + } _ => None, } } @@ -642,8 +660,8 @@ impl UpdateVersion { /// Generated client implementations. pub mod gap_filler_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// Define the gRPC service #[derive(Debug, Clone)] pub struct GapFillerServiceClient { @@ -688,8 +706,9 @@ pub mod gap_filler_service_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { GapFillerServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -731,21 +750,27 @@ pub mod gap_filler_service_client { tonic::Response>, tonic::Status, > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/gapfiller.GapFillerService/GetAssetsUpdatedWithin", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "gapfiller.GapFillerService", - "GetAssetsUpdatedWithin", - )); + req.extensions_mut() + .insert( + GrpcMethod::new( + "gapfiller.GapFillerService", + "GetAssetsUpdatedWithin", + ), + ); self.inner.server_streaming(req, path, codec).await } pub async fn get_raw_blocks_within( @@ -755,36 +780,43 @@ pub mod gap_filler_service_client { tonic::Response>, tonic::Status, > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/gapfiller.GapFillerService/GetRawBlocksWithin", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "gapfiller.GapFillerService", - "GetRawBlocksWithin", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("gapfiller.GapFillerService", "GetRawBlocksWithin"), + ); self.inner.server_streaming(req, path, codec).await } pub async fn get_raw_block( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/gapfiller.GapFillerService/GetRawBlock"); + let path = http::uri::PathAndQuery::from_static( + "/gapfiller.GapFillerService/GetRawBlock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("gapfiller.GapFillerService", "GetRawBlock")); @@ -802,21 +834,29 @@ pub mod gap_filler_service_server { /// Server streaming response type for the GetAssetsUpdatedWithin method. type GetAssetsUpdatedWithinStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > + Send + > + + Send + 'static; async fn get_assets_updated_within( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the GetRawBlocksWithin method. type GetRawBlocksWithinStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > + Send + > + + Send + 'static; async fn get_raw_blocks_within( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_raw_block( &self, request: tonic::Request, @@ -846,7 +886,10 @@ pub mod gap_filler_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -902,21 +945,26 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetAssetsUpdatedWithin" => { #[allow(non_camel_case_types)] struct GetAssetsUpdatedWithinSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for GetAssetsUpdatedWithinSvc - { + impl< + T: GapFillerService, + > tonic::server::ServerStreamingService + for GetAssetsUpdatedWithinSvc { type Response = super::AssetDetails; type ResponseStream = T::GetAssetsUpdatedWithinStream; - type Future = - BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_assets_updated_within(&inner, request) + ::get_assets_updated_within( + &inner, + request, + ) .await }; Box::pin(fut) @@ -948,21 +996,26 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetRawBlocksWithin" => { #[allow(non_camel_case_types)] struct GetRawBlocksWithinSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for GetRawBlocksWithinSvc - { + impl< + T: GapFillerService, + > tonic::server::ServerStreamingService + for GetRawBlocksWithinSvc { type Response = super::RawBlock; type ResponseStream = T::GetRawBlocksWithinStream; - type Future = - BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_raw_blocks_within(&inner, request) + ::get_raw_blocks_within( + &inner, + request, + ) .await }; Box::pin(fut) @@ -994,18 +1047,23 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetRawBlock" => { #[allow(non_camel_case_types)] struct GetRawBlockSvc(pub Arc); - impl tonic::server::UnaryService - for GetRawBlockSvc - { + impl< + T: GapFillerService, + > tonic::server::UnaryService + for GetRawBlockSvc { type Response = super::RawBlock; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_raw_block(&inner, request).await + ::get_raw_block(&inner, request) + .await }; Box::pin(fut) } @@ -1033,14 +1091,18 @@ pub mod gap_filler_service_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/nft_ingester/readme.md b/nft_ingester/readme.md index 06bbd7be7..d73b2d757 100644 --- a/nft_ingester/readme.md +++ b/nft_ingester/readme.md @@ -12,5 +12,5 @@ To set a global limit on request execution for PG DB, you can use the URL parame Example: `postgres://user:password@localhost/dbname?statement_timeout=2000` -To limit only API requests use config option **api_query_max_statement_timeout_sec** +To limit only API requests use config option **api_query_max_statement_timeout_secs** diff --git a/nft_ingester/src/api/api_impl.rs b/nft_ingester/src/api/api_impl.rs index 8e5deae61..bb91625cf 100644 --- a/nft_ingester/src/api/api_impl.rs +++ b/nft_ingester/src/api/api_impl.rs @@ -121,7 +121,7 @@ where self.pg_client .check_health() .await - .map_err(|e| DasApiError::InternalDdError(e.to_string()))?; + .map_err(|e| DasApiError::InternalDbError(e.to_string()))?; self.metrics .set_latency(label, latency_timer.elapsed().as_millis() as f64); @@ -748,7 +748,7 @@ where match res { Ok(Ok(res)) => Ok(res), - Ok(Err(e)) => Err(DasApiError::InternalDdError(e.to_string())), + Ok(Err(e)) => Err(DasApiError::InternalDbError(e.to_string())), Err(_) => Err(DasApiError::QueryTimedOut), } } diff --git a/nft_ingester/src/api/error.rs b/nft_ingester/src/api/error.rs index 54256cfc7..a1627ac27 100644 --- a/nft_ingester/src/api/error.rs +++ b/nft_ingester/src/api/error.rs @@ -39,7 +39,7 @@ pub enum DasApiError { #[error("Page number is too big. Up to {0} pages are supported with this kind of pagination. Please use a different pagination(before/after/cursor).")] PageTooBig(usize), #[error("Internal DB error")] - InternalDdError(String), + InternalDbError(String), #[error("CannotServiceRequest")] CannotServiceRequest, #[error("MissingOwnerAddress")] diff --git a/nft_ingester/src/api/service.rs b/nft_ingester/src/api/service.rs index 3f51d2423..c7ae02ed0 100644 --- a/nft_ingester/src/api/service.rs +++ b/nft_ingester/src/api/service.rs @@ -71,7 +71,7 @@ pub async fn start_api( account_balance_getter: Arc, storage_service_base_url: Option, native_mint_pubkey: String, - api_query_max_statement_timeout_sec: u64, + api_query_max_statement_timeout_secs: u64, ) -> Result<(), DasApiError> { let response_middleware = RpcResponseMiddleware {}; let request_middleware = RpcRequestMiddleware::new(archives_dir); @@ -128,7 +128,7 @@ pub async fn start_api( red_metrics, )), native_mint_pubkey, - api_query_max_statement_timeout_sec, + api_query_max_statement_timeout_secs, ); run_api( diff --git a/nft_ingester/src/bin/api/main.rs b/nft_ingester/src/bin/api/main.rs index 3e06d9120..e7cd4f75b 100644 --- a/nft_ingester/src/bin/api/main.rs +++ b/nft_ingester/src/bin/api/main.rs @@ -155,7 +155,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, - args.api_query_max_statement_timeout_sec, + args.api_query_max_statement_timeout_secs, ) .await { diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index fa2acbdf3..763f770c5 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -306,7 +306,7 @@ pub async fn main() -> Result<(), IngesterError> { account_balance_getter, args.storage_service_base_url, args.native_mint_pubkey, - args.api_max_query_statement_timeout_sec, + args.api_query_max_statement_timeout_secs, ) .await { diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 701a1d91c..7ad458ac7 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -258,7 +258,7 @@ pub struct IngesterClapArgs { default_value = "120", help = "Specifies the maximum execution time of a SQL query for API." )] - pub api_max_query_statement_timeout_sec: u64, + pub api_query_max_statement_timeout_secs: u64, #[clap( long, @@ -444,7 +444,7 @@ pub struct ApiClapArgs { default_value = "120", help = "Specifies the maximum execution time of a SQL query for API." )] - pub api_query_max_statement_timeout_sec: u64, + pub api_query_max_statement_timeout_secs: u64, #[clap( long, diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index a1279aa3c..e3a735279 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -73,8 +73,7 @@ mod tests { use usecase::proofs::MaybeProofChecker; const SLOT_UPDATED: u64 = 100; - // api_query_max_statement_timeout_sec - const API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC: u64 = 120; + const API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC: u64 = 120; // corresponds to So11111111111111111111111111111111111111112 pub const NATIVE_MINT_PUBKEY: Pubkey = Pubkey::new_from_array([ @@ -108,7 +107,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -584,7 +583,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -739,7 +738,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -870,7 +869,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -1044,7 +1043,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -1220,7 +1219,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -1380,7 +1379,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let first_tree = Pubkey::new_unique(); @@ -1602,7 +1601,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let token_updates_processor = @@ -1827,7 +1826,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let token_updates_processor = @@ -2093,7 +2092,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2158,7 +2157,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2220,7 +2219,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2282,7 +2281,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2396,7 +2395,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let pb = Pubkey::new_unique(); @@ -2555,7 +2554,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let asset_id = Pubkey::new_unique(); let tree_id = Pubkey::new_unique(); @@ -2616,7 +2615,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let asset_fees_count = 1000; let mut asset_ids = Vec::with_capacity(asset_fees_count); @@ -2708,7 +2707,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2799,7 +2798,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -2931,7 +2930,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3103,7 +3102,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3320,7 +3319,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3615,7 +3614,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); @@ -3802,7 +3801,7 @@ mod tests { None, Arc::new(RaydiumTokenPriceFetcher::default()), NATIVE_MINT_PUBKEY.to_string(), - API_DEFAULT_QUERY_STATEMENT_TIMEPOU_SEC, + API_DEFAULT_QUERY_STATEMENT_TIMEOUT_SEC, ); let tasks = JoinSet::new(); let mutexed_tasks = Arc::new(Mutex::new(tasks)); From 8c75888ab9c2275de4453116796d2f9c4d661f01 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:30:45 +0100 Subject: [PATCH 4/6] MTG-1110 Add request time out for DB query - fix code format --- grpc/src/asseturls.rs | 126 +++++++++----------------- grpc/src/gapfiller.rs | 206 +++++++++++++++--------------------------- 2 files changed, 117 insertions(+), 215 deletions(-) diff --git a/grpc/src/asseturls.rs b/grpc/src/asseturls.rs index ad9079f36..02d0c76ae 100644 --- a/grpc/src/asseturls.rs +++ b/grpc/src/asseturls.rs @@ -90,8 +90,8 @@ impl DownloadError { /// Generated client implementations. pub mod asset_url_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct AssetUrlServiceClient { inner: tonic::client::Grpc, @@ -135,9 +135,8 @@ pub mod asset_url_service_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { AssetUrlServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -176,31 +175,22 @@ pub mod asset_url_service_client { pub async fn get_asset_urls_to_download( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/asseturls.AssetUrlService/GetAssetUrlsToDownload", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "asseturls.AssetUrlService", - "GetAssetUrlsToDownload", - ), - ); + req.extensions_mut().insert(GrpcMethod::new( + "asseturls.AssetUrlService", + "GetAssetUrlsToDownload", + )); self.inner.unary(req, path, codec).await } /// Used to notify about asset download results @@ -208,24 +198,21 @@ pub mod asset_url_service_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/asseturls.AssetUrlService/SubmitDownloadResult", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("asseturls.AssetUrlService", "SubmitDownloadResult"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "asseturls.AssetUrlService", + "SubmitDownloadResult", + )); self.inner.unary(req, path, codec).await } } @@ -241,10 +228,7 @@ pub mod asset_url_service_server { async fn get_asset_urls_to_download( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// Used to notify about asset download results async fn submit_download_result( &self, @@ -274,10 +258,7 @@ pub mod asset_url_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -333,25 +314,18 @@ pub mod asset_url_service_server { "/asseturls.AssetUrlService/GetAssetUrlsToDownload" => { #[allow(non_camel_case_types)] struct GetAssetUrlsToDownloadSvc(pub Arc); - impl< - T: AssetUrlService, - > tonic::server::UnaryService - for GetAssetUrlsToDownloadSvc { + impl tonic::server::UnaryService + for GetAssetUrlsToDownloadSvc + { type Response = super::AssetsToDownload; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_asset_urls_to_download( - &inner, - request, - ) + ::get_asset_urls_to_download(&inner, request) .await }; Box::pin(fut) @@ -383,25 +357,19 @@ pub mod asset_url_service_server { "/asseturls.AssetUrlService/SubmitDownloadResult" => { #[allow(non_camel_case_types)] struct SubmitDownloadResultSvc(pub Arc); - impl< - T: AssetUrlService, - > tonic::server::UnaryService - for SubmitDownloadResultSvc { + impl + tonic::server::UnaryService + for SubmitDownloadResultSvc + { type Response = (); - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::submit_download_result( - &inner, - request, - ) + ::submit_download_result(&inner, request) .await }; Box::pin(fut) @@ -430,18 +398,14 @@ pub mod asset_url_service_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/grpc/src/gapfiller.rs b/grpc/src/gapfiller.rs index 4aa7e228a..0d6e0cfac 100644 --- a/grpc/src/gapfiller.rs +++ b/grpc/src/gapfiller.rs @@ -449,16 +449,10 @@ impl SpecificationAssetClass { pub fn as_str_name(&self) -> &'static str { match self { SpecificationAssetClass::Unknown => "SPECIFICATION_ASSET_CLASS_UNKNOWN", - SpecificationAssetClass::FungibleToken => { - "SPECIFICATION_ASSET_CLASS_FUNGIBLE_TOKEN" - } - SpecificationAssetClass::FungibleAsset => { - "SPECIFICATION_ASSET_CLASS_FUNGIBLE_ASSET" - } + SpecificationAssetClass::FungibleToken => "SPECIFICATION_ASSET_CLASS_FUNGIBLE_TOKEN", + SpecificationAssetClass::FungibleAsset => "SPECIFICATION_ASSET_CLASS_FUNGIBLE_ASSET", SpecificationAssetClass::Nft => "SPECIFICATION_ASSET_CLASS_NFT", - SpecificationAssetClass::PrintableNft => { - "SPECIFICATION_ASSET_CLASS_PRINTABLE_NFT" - } + SpecificationAssetClass::PrintableNft => "SPECIFICATION_ASSET_CLASS_PRINTABLE_NFT", SpecificationAssetClass::ProgrammableNft => { "SPECIFICATION_ASSET_CLASS_PROGRAMMABLE_NFT" } @@ -469,12 +463,8 @@ impl SpecificationAssetClass { SpecificationAssetClass::NonTransferableNft => { "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" } - SpecificationAssetClass::IdentityNft => { - "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT" - } - SpecificationAssetClass::MplCoreAsset => { - "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET" - } + SpecificationAssetClass::IdentityNft => "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT", + SpecificationAssetClass::MplCoreAsset => "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET", SpecificationAssetClass::MplCoreCollection => { "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" } @@ -493,14 +483,10 @@ impl SpecificationAssetClass { "SPECIFICATION_ASSET_CLASS_TRANSFER_RESTRICTED_NFT" => { Some(Self::TransferRestrictedNft) } - "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" => { - Some(Self::NonTransferableNft) - } + "SPECIFICATION_ASSET_CLASS_NON_TRANSFERABLE_NFT" => Some(Self::NonTransferableNft), "SPECIFICATION_ASSET_CLASS_IDENTITY_NFT" => Some(Self::IdentityNft), "SPECIFICATION_ASSET_CLASS_MPL_CORE_ASSET" => Some(Self::MplCoreAsset), - "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" => { - Some(Self::MplCoreCollection) - } + "SPECIFICATION_ASSET_CLASS_MPL_CORE_COLLECTION" => Some(Self::MplCoreCollection), _ => None, } } @@ -582,9 +568,7 @@ impl TokenStandard { TokenStandard::Fungible => "FUNGIBLE", TokenStandard::NonFungibleEdition => "NON_FUNGIBLE_EDITION", TokenStandard::ProgrammableNonFungible => "PROGRAMMABLE_NON_FUNGIBLE", - TokenStandard::ProgrammableNonFungibleEdition => { - "PROGRAMMABLE_NON_FUNGIBLE_EDITION" - } + TokenStandard::ProgrammableNonFungibleEdition => "PROGRAMMABLE_NON_FUNGIBLE_EDITION", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -595,9 +579,7 @@ impl TokenStandard { "FUNGIBLE" => Some(Self::Fungible), "NON_FUNGIBLE_EDITION" => Some(Self::NonFungibleEdition), "PROGRAMMABLE_NON_FUNGIBLE" => Some(Self::ProgrammableNonFungible), - "PROGRAMMABLE_NON_FUNGIBLE_EDITION" => { - Some(Self::ProgrammableNonFungibleEdition) - } + "PROGRAMMABLE_NON_FUNGIBLE_EDITION" => Some(Self::ProgrammableNonFungibleEdition), _ => None, } } @@ -660,8 +642,8 @@ impl UpdateVersion { /// Generated client implementations. pub mod gap_filler_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; /// Define the gRPC service #[derive(Debug, Clone)] pub struct GapFillerServiceClient { @@ -706,9 +688,8 @@ pub mod gap_filler_service_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { GapFillerServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -750,27 +731,21 @@ pub mod gap_filler_service_client { tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/gapfiller.GapFillerService/GetAssetsUpdatedWithin", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "gapfiller.GapFillerService", - "GetAssetsUpdatedWithin", - ), - ); + req.extensions_mut().insert(GrpcMethod::new( + "gapfiller.GapFillerService", + "GetAssetsUpdatedWithin", + )); self.inner.server_streaming(req, path, codec).await } pub async fn get_raw_blocks_within( @@ -780,43 +755,36 @@ pub mod gap_filler_service_client { tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/gapfiller.GapFillerService/GetRawBlocksWithin", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("gapfiller.GapFillerService", "GetRawBlocksWithin"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "gapfiller.GapFillerService", + "GetRawBlocksWithin", + )); self.inner.server_streaming(req, path, codec).await } pub async fn get_raw_block( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/gapfiller.GapFillerService/GetRawBlock", - ); + let path = + http::uri::PathAndQuery::from_static("/gapfiller.GapFillerService/GetRawBlock"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("gapfiller.GapFillerService", "GetRawBlock")); @@ -834,29 +802,21 @@ pub mod gap_filler_service_server { /// Server streaming response type for the GetAssetsUpdatedWithin method. type GetAssetsUpdatedWithinStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > - + Send + > + Send + 'static; async fn get_assets_updated_within( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the GetRawBlocksWithin method. type GetRawBlocksWithinStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > - + Send + > + Send + 'static; async fn get_raw_blocks_within( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn get_raw_block( &self, request: tonic::Request, @@ -886,10 +846,7 @@ pub mod gap_filler_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -945,26 +902,21 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetAssetsUpdatedWithin" => { #[allow(non_camel_case_types)] struct GetAssetsUpdatedWithinSvc(pub Arc); - impl< - T: GapFillerService, - > tonic::server::ServerStreamingService - for GetAssetsUpdatedWithinSvc { + impl + tonic::server::ServerStreamingService + for GetAssetsUpdatedWithinSvc + { type Response = super::AssetDetails; type ResponseStream = T::GetAssetsUpdatedWithinStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_assets_updated_within( - &inner, - request, - ) + ::get_assets_updated_within(&inner, request) .await }; Box::pin(fut) @@ -996,26 +948,21 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetRawBlocksWithin" => { #[allow(non_camel_case_types)] struct GetRawBlocksWithinSvc(pub Arc); - impl< - T: GapFillerService, - > tonic::server::ServerStreamingService - for GetRawBlocksWithinSvc { + impl + tonic::server::ServerStreamingService + for GetRawBlocksWithinSvc + { type Response = super::RawBlock; type ResponseStream = T::GetRawBlocksWithinStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_raw_blocks_within( - &inner, - request, - ) + ::get_raw_blocks_within(&inner, request) .await }; Box::pin(fut) @@ -1047,23 +994,18 @@ pub mod gap_filler_service_server { "/gapfiller.GapFillerService/GetRawBlock" => { #[allow(non_camel_case_types)] struct GetRawBlockSvc(pub Arc); - impl< - T: GapFillerService, - > tonic::server::UnaryService - for GetRawBlockSvc { + impl tonic::server::UnaryService + for GetRawBlockSvc + { type Response = super::RawBlock; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_raw_block(&inner, request) - .await + ::get_raw_block(&inner, request).await }; Box::pin(fut) } @@ -1091,18 +1033,14 @@ pub mod gap_filler_service_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } From 653518e811ecb8e2acddcda037435d3066ebd9a4 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Tue, 14 Jan 2025 18:34:46 +0100 Subject: [PATCH 5/6] MTG-1110 Add request time out for DB query - minor code style fix --- nft_ingester/tests/api_tests.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index e3a735279..33ac3394f 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -521,12 +521,11 @@ mod tests { let result = timeout(query_timeout, query("SELECT pg_sleep(3)").execute(&pg_pool)).await; match result { - Ok(Ok(res)) => assert!( - false, + Ok(Ok(res)) => panic!( "Query should have timed out, but completed successfully: {:?}", res ), - Ok(Err(e)) => assert!(false, "Query should have timed out, but failed: {:?}", e), + Ok(Err(e)) => panic!("Query should have timed out, but failed: {:?}", e), Err(_) => println!("Query timed out as expected"), } } @@ -544,15 +543,11 @@ mod tests { match result { Ok(Ok(res)) => println!("Query completed successfully: {:?}", res), - Ok(Err(e)) => assert!( - false, - "Query should completed successfully, but failed: {:?}", + Ok(Err(e)) => panic!( + "Query should have timed out, but completed successfully: {:?}", e ), - Err(_) => assert!( - false, - "Query should completed successfully but have timedout", - ), + Err(_) => panic!("Query should completed successfully but have timeout",), } } From 5b7b4546d13a1525ff9b17df86c2c74a8e17cb51 Mon Sep 17 00:00:00 2001 From: Andrii <18900364+andrii-kl@users.noreply.github.com> Date: Wed, 15 Jan 2025 22:02:15 +0100 Subject: [PATCH 6/6] Update README.md Co-authored-by: Oleksandr Mykhailenko <58030797+armyhaylenko@users.noreply.github.com> --- README.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 61a8282d8..f9bcaba82 100644 --- a/README.md +++ b/README.md @@ -64,16 +64,14 @@ The API specification is compatible with the standard DAS specification here htt ### Developing and running #### PR/Code requirements -1) CI/CD has code formating checker so use FTM before code commit: `cargo fmt` -2) +1) CI/CD has code formating checker so use fmt before code commit: `cargo fmt` #### Run Integration tests -Integration tests require Postgres db url, devnet and mainnet rpc +Integration tests require Postgres db url, solana devnet and solana mainnet rpc urls. How to run with CLI: -```cml -DATABASE_TEST_URL='postgres://solana:solana@localhost:5432/aura_db' DEVNET_RPC_URL='https://devnet-aura.metaplex.com/{YOUR_TOKEN_ACCESS}' MAINNET_RPC_URL='https://mainnet-aura.metaplex.com/{YOUR_TOKEN_ACCESS}' cargo test --features integration_tests -``` +```shell +DATABASE_TEST_URL='postgres://solana:solana@localhost:5432/aura_db' DEVNET_RPC_URL='https://api.devnet.solana.com' MAINNET_RPC_URL='https://api.mainnet-beta.solana.com' cargo test --features integration_tests