Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mtg-1036] tests for asset index cleaner #374

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod tests {
Storage, ToFlatbuffersConverter,
};
use serde_json::{json, Value};
use setup::rocks::RocksTestEnvironment;
use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use spl_pod::{
Expand All @@ -80,6 +81,7 @@ mod tests {
};
use spl_token_2022::extension::interest_bearing_mint::BasisPoints;
use sqlx::QueryBuilder;
use tempfile::TempDir;
use testcontainers::clients::Cli;
use tokio::{sync::Mutex, task::JoinSet};
use usecase::proofs::MaybeProofChecker;
Expand Down Expand Up @@ -3694,4 +3696,177 @@ mod tests {
assert_eq!(idx_fungible_asset_iter.count(), 1);
assert_eq!(idx_non_fungible_asset_iter.count(), 1);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_idx_cleaner_does_not_erase_updates() {
let cnt = 1;
let slot = 0;
let cli = Cli::default();

let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, slot).await;
let nft_token_mint = generated_assets.pubkeys[0];
let owner: Pubkey = generated_assets.owners[0].owner.value.unwrap();

let mut batch_storage = BatchSaveStorage::new(
env.rocks_env.storage.clone(),
10,
Arc::new(IngesterMetricsConfig::new()),
);
let token_accounts_processor =
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new()));
let token_account_addr = Pubkey::new_unique();

// receive 10 updates for the asset
for i in 0..10 {
let token_account = TokenAccount {
pubkey: token_account_addr,
mint: nft_token_mint,
delegate: None,
owner,
extensions: None,
frozen: false,
delegated_amount: 0,
slot_updated: i,
amount: 100 + i,
write_version: i as u64,
};
token_accounts_processor
.transform_and_save_token_account(
&mut batch_storage,
token_account_addr,
&token_account,
)
.unwrap();
batch_storage.flush().unwrap();
}

let idx_fungible_asset_iter = env.rocks_env.storage.fungible_assets_update_idx.iter_start();
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
// 10 fungibles because we have 10 updates and created token account is not counted as fungible, then
// it will be a part of both indexes
assert_eq!(idx_fungible_asset_iter.count(), 10);
assert_eq!(idx_non_fungible_asset_iter.count(), 11);

// no data was syncronized, then no data should be erased
for asset_type in ASSET_TYPES {
clean_syncronized_idxs(
env.pg_env.client.clone(),
env.rocks_env.storage.clone(),
asset_type,
)
.await
.unwrap();
}

// after sync idxs should be cleaned again
let idx_fungible_asset_iter = env.rocks_env.storage.fungible_assets_update_idx.iter_start();
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
// 10 fungibles because we have 10 updates and created token account is not counted as fungible, then
// it will be a part of both indexes
assert_eq!(idx_fungible_asset_iter.count(), 10);
assert_eq!(idx_non_fungible_asset_iter.count(), 11);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_idx_cleaner_erases_updates_partially() {
let rocks_env: RocksTestEnvironment = RocksTestEnvironment::new(&[]);
let number_of_assets = 1;
let slot = 0;
let generated_assets = rocks_env.generate_assets(number_of_assets, slot).await;
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let temp_dir_path = temp_dir.path();

let cli: Cli = Cli::default();
let pg_env =
setup::pg::TestEnvironment::new_with_mount(&cli, temp_dir_path.to_str().unwrap()).await;
let syncronizer = Arc::new(nft_ingester::index_syncronizer::Synchronizer::new(
rocks_env.storage.clone(),
pg_env.client.clone(),
10,
temp_dir_path.to_str().unwrap().to_string(),
Arc::new(SynchronizerMetricsConfig::new()),
1,
));

let nft_token_mint = generated_assets.pubkeys[0];
let owner: Pubkey = generated_assets.owners[0].owner.value.unwrap();
let mut batch_storage = BatchSaveStorage::new(
rocks_env.storage.clone(),
10,
Arc::new(IngesterMetricsConfig::new()),
);
let token_accounts_processor =
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new()));
let token_account_addr = Pubkey::new_unique();

// receive 5 updates for the assset and update idxs accordingly
for i in 0..5 {
let token_account = TokenAccount {
pubkey: token_account_addr,
mint: nft_token_mint,
delegate: None,
owner,
extensions: None,
frozen: false,
delegated_amount: 0,
slot_updated: i,
amount: 100 + i,
write_version: i as u64,
};
token_accounts_processor
.transform_and_save_token_account(
&mut batch_storage,
token_account_addr,
&token_account,
)
.unwrap();
batch_storage.flush().unwrap();
}
let (_tx, rx) = tokio::sync::broadcast::channel::<()>(1);
for asset_type in ASSET_TYPES {
syncronizer.full_syncronize(&rx, asset_type).await.unwrap();
}

// receive 5 more updates for the same asset
for i in 0..5 {
let token_account = TokenAccount {
pubkey: token_account_addr,
mint: nft_token_mint,
delegate: None,
owner,
extensions: None,
frozen: false,
delegated_amount: 0,
slot_updated: i,
amount: 100 + i,
write_version: i as u64,
};
token_accounts_processor
.transform_and_save_token_account(
&mut batch_storage,
token_account_addr,
&token_account,
)
.unwrap();
batch_storage.flush().unwrap();
}

// full story of idxs is stored
let idx_fungible_asset_iter = rocks_env.storage.fungible_assets_update_idx.iter_start();
let idx_non_fungible_asset_iter = rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 10);
assert_eq!(idx_non_fungible_asset_iter.count(), 11);

for asset_type in ASSET_TYPES {
clean_syncronized_idxs(pg_env.client.clone(), rocks_env.storage.clone(), asset_type)
.await
.unwrap();
}

// after sync idxs should be half cleaned
let idx_fungible_asset_iter = rocks_env.storage.fungible_assets_update_idx.iter_start();
let idx_non_fungible_asset_iter = rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 6);
assert_eq!(idx_non_fungible_asset_iter.count(), 6);
}
}
Loading