diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 3e47a114..fa0a4851 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -72,6 +72,7 @@ mod tests { Storage, ToFlatbuffersConverter, }; use serde_json::{json, Value}; + use setup::rocks::RocksTestEnvironment; use solana_program::pubkey::Pubkey; use solana_sdk::signature::Signature; use spl_pod::{ @@ -80,6 +81,7 @@ mod tests { }; use spl_token_2022::extension::interest_bearing_mint::BasisPoints; use sqlx::QueryBuilder; + use tempfile::TempDir; use testcontainers::clients::Cli; use tokio::{sync::Mutex, task::JoinSet}; use usecase::proofs::MaybeProofChecker; @@ -3694,4 +3696,177 @@ mod tests { assert_eq!(idx_fungible_asset_iter.count(), 1); assert_eq!(idx_non_fungible_asset_iter.count(), 1); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_idx_cleaner_does_not_erase_updates() { + let cnt = 1; + let slot = 0; + let cli = Cli::default(); + + let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, slot).await; + let nft_token_mint = generated_assets.pubkeys[0]; + let owner: Pubkey = generated_assets.owners[0].owner.value.unwrap(); + + let mut batch_storage = BatchSaveStorage::new( + env.rocks_env.storage.clone(), + 10, + Arc::new(IngesterMetricsConfig::new()), + ); + let token_accounts_processor = + TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); + let token_account_addr = Pubkey::new_unique(); + + // receive 10 updates for the asset + for i in 0..10 { + let token_account = TokenAccount { + pubkey: token_account_addr, + mint: nft_token_mint, + delegate: None, + owner, + extensions: None, + frozen: false, + delegated_amount: 0, + slot_updated: i, + amount: 100 + i, + write_version: i as u64, + }; + token_accounts_processor + .transform_and_save_token_account( + &mut batch_storage, + token_account_addr, + &token_account, + ) + .unwrap(); + batch_storage.flush().unwrap(); + } + + let idx_fungible_asset_iter = env.rocks_env.storage.fungible_assets_update_idx.iter_start(); + let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + // 10 fungibles because we have 10 updates and created token account is not counted as fungible, then + // it will be a part of both indexes + assert_eq!(idx_fungible_asset_iter.count(), 10); + assert_eq!(idx_non_fungible_asset_iter.count(), 11); + + // no data was syncronized, then no data should be erased + for asset_type in ASSET_TYPES { + clean_syncronized_idxs( + env.pg_env.client.clone(), + env.rocks_env.storage.clone(), + asset_type, + ) + .await + .unwrap(); + } + + // after sync idxs should be cleaned again + let idx_fungible_asset_iter = env.rocks_env.storage.fungible_assets_update_idx.iter_start(); + let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + // 10 fungibles because we have 10 updates and created token account is not counted as fungible, then + // it will be a part of both indexes + assert_eq!(idx_fungible_asset_iter.count(), 10); + assert_eq!(idx_non_fungible_asset_iter.count(), 11); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_idx_cleaner_erases_updates_partially() { + let rocks_env: RocksTestEnvironment = RocksTestEnvironment::new(&[]); + let number_of_assets = 1; + let slot = 0; + let generated_assets = rocks_env.generate_assets(number_of_assets, slot).await; + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let temp_dir_path = temp_dir.path(); + + let cli: Cli = Cli::default(); + let pg_env = + setup::pg::TestEnvironment::new_with_mount(&cli, temp_dir_path.to_str().unwrap()).await; + let syncronizer = Arc::new(nft_ingester::index_syncronizer::Synchronizer::new( + rocks_env.storage.clone(), + pg_env.client.clone(), + 10, + temp_dir_path.to_str().unwrap().to_string(), + Arc::new(SynchronizerMetricsConfig::new()), + 1, + )); + + let nft_token_mint = generated_assets.pubkeys[0]; + let owner: Pubkey = generated_assets.owners[0].owner.value.unwrap(); + let mut batch_storage = BatchSaveStorage::new( + rocks_env.storage.clone(), + 10, + Arc::new(IngesterMetricsConfig::new()), + ); + let token_accounts_processor = + TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); + let token_account_addr = Pubkey::new_unique(); + + // receive 5 updates for the assset and update idxs accordingly + for i in 0..5 { + let token_account = TokenAccount { + pubkey: token_account_addr, + mint: nft_token_mint, + delegate: None, + owner, + extensions: None, + frozen: false, + delegated_amount: 0, + slot_updated: i, + amount: 100 + i, + write_version: i as u64, + }; + token_accounts_processor + .transform_and_save_token_account( + &mut batch_storage, + token_account_addr, + &token_account, + ) + .unwrap(); + batch_storage.flush().unwrap(); + } + let (_tx, rx) = tokio::sync::broadcast::channel::<()>(1); + for asset_type in ASSET_TYPES { + syncronizer.full_syncronize(&rx, asset_type).await.unwrap(); + } + + // receive 5 more updates for the same asset + for i in 0..5 { + let token_account = TokenAccount { + pubkey: token_account_addr, + mint: nft_token_mint, + delegate: None, + owner, + extensions: None, + frozen: false, + delegated_amount: 0, + slot_updated: i, + amount: 100 + i, + write_version: i as u64, + }; + token_accounts_processor + .transform_and_save_token_account( + &mut batch_storage, + token_account_addr, + &token_account, + ) + .unwrap(); + batch_storage.flush().unwrap(); + } + + // full story of idxs is stored + let idx_fungible_asset_iter = rocks_env.storage.fungible_assets_update_idx.iter_start(); + let idx_non_fungible_asset_iter = rocks_env.storage.assets_update_idx.iter_start(); + assert_eq!(idx_fungible_asset_iter.count(), 10); + assert_eq!(idx_non_fungible_asset_iter.count(), 11); + + for asset_type in ASSET_TYPES { + clean_syncronized_idxs(pg_env.client.clone(), rocks_env.storage.clone(), asset_type) + .await + .unwrap(); + } + + // after sync idxs should be half cleaned + let idx_fungible_asset_iter = rocks_env.storage.fungible_assets_update_idx.iter_start(); + let idx_non_fungible_asset_iter = rocks_env.storage.assets_update_idx.iter_start(); + assert_eq!(idx_fungible_asset_iter.count(), 6); + assert_eq!(idx_non_fungible_asset_iter.count(), 6); + } }