Skip to content

Commit

Permalink
MTG-1242 Fix build search query for Fungible tokens. MTG-1031 Fix tes… (
Browse files Browse the repository at this point in the history
#379)

* MTG-1242 Fix build search query for Fungible tokens. MTG-1031 Fix test for fungible tokens.

* MTG-1242_MTG-1031 Fix issue

* MTG-1245 Fixed integration api test for Fungible tokens, fixed related bugs.

- Fixed integration test_token_type test.
- Fixed show_fungible option for SearchAssets requests.
- Removed switching TokenType depending on the show_fungible option.

* MTG-1245 Rollback some changes

* MTG-1245 Rollback some changes

* MTG-1245 fix ftm
  • Loading branch information
andrii-kl authored Jan 24, 2025
1 parent 4ee5691 commit 0e29a10
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 77 deletions.
2 changes: 2 additions & 0 deletions nft_ingester/src/api/dapi/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ fn convert_rocks_asset_model(
token_account: asset_selected_maps.token_accounts.get(asset_pubkey).cloned(),
inscription,
spl_mint: asset_selected_maps.spl_mints.get(asset_pubkey).cloned(),

token_symbol: token_symbols.get(&asset_pubkey.to_string()).cloned(),
token_price: token_prices.get(&asset_pubkey.to_string()).cloned(),
})
Expand Down Expand Up @@ -193,6 +194,7 @@ pub async fn get_by_ids<

let unique_asset_ids: Vec<Pubkey> = unique_asset_ids_map.keys().cloned().collect();
let asset_ids_string = asset_ids.clone().into_iter().map(|id| id.to_string()).collect_vec();

let (token_prices, token_symbols) = if options.show_fungible {
let token_prices_fut = token_price_fetcher.fetch_token_prices(asset_ids_string.as_slice());
let token_symbols_fut =
Expand Down
19 changes: 10 additions & 9 deletions nft_ingester/src/api/dapi/search_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn fetch_assets<
JP: JsonPersister + Sync + Send + 'static,
PPC: ProcessingPossibilityChecker + Sync + Send + 'static,
>(
index_client: Arc<impl postgre_client::storage_traits::AssetPubkeyFilteredFetcher>,
pg_index_client: Arc<impl postgre_client::storage_traits::AssetPubkeyFilteredFetcher>,
rocks_db: Arc<Storage>,
filter: SearchAssetsQuery,
sort_by: AssetSorting,
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn fetch_assets<
}
};

let keys = index_client
let keys = pg_index_client
.get_asset_pubkeys_filtered(filter, &sort_by.into(), limit, page, before, after, &options)
.await
.map_err(|e| {
Expand Down Expand Up @@ -206,13 +206,14 @@ async fn fetch_assets<
};
let mut grand_total = None;
if options.show_grand_total {
grand_total = Some(index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
grand_total =
Some(pg_index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
}

let resp = AssetList {
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn main() -> Result<(), IngesterError> {
red_metrics.register(&mut registry);
metrics_utils::utils::start_metrics(registry, args.metrics_port).await;

let index_storage = Arc::new(
let pg_index_storage = Arc::new(
init_index_storage_with_migration(
&args.pg_database_url,
args.pg_max_db_connections,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub async fn main() -> Result<(), IngesterError> {

let synchronizer = Arc::new(Synchronizer::new(
rocks_storage.clone(),
index_storage.clone(),
pg_index_storage.clone(),
args.dump_synchronizer_batch_size,
args.rocks_dump_path.clone(),
metrics.clone(),
Expand Down
39 changes: 39 additions & 0 deletions nft_ingester/src/bin/synchronizer/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Building the Project

Clone the repository and navigate to the project directory:

```bash
git clone https://github.com/metaplex-foundation/aura.git
cd nft_ingester
```

Build the project using Cargo:

```bash
cargo build --bin synchronizer
```

## Running the Service

Run to see the full list of available arguments:

```bash
./target/debug/synchronizer -h
```

Run Synchronizer with minimum functionality.

```bash
./target/debug/synchronizer \
--pg-database-url postgres://solana:solana@localhost:5432/aura_db
```


## Tips for local debugging/testing

To increase log verbosity, set the log level to debug:
` --log-level debug`

To fill the local Redis with messages you can use any other Redis that is available.
There is a script that will copy existing messages from one Redis and forward copies of these messages to another one.
`nft_ingester/scripts/transfer_redis_messages.py`
78 changes: 45 additions & 33 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ where
T: AssetIndexSourceStorage,
U: AssetIndexStorage,
{
primary_storage: Arc<T>,
index_storage: Arc<U>,
rocks_primary_storage: Arc<T>,
pg_index_storage: Arc<U>,
dump_synchronizer_batch_size: usize,
dump_path: String,
metrics: Arc<SynchronizerMetricsConfig>,
Expand All @@ -49,16 +49,16 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(
primary_storage: Arc<T>,
index_storage: Arc<U>,
rocks_primary_storage: Arc<T>,
pg_index_storage: Arc<U>,
dump_synchronizer_batch_size: usize,
dump_path: String,
metrics: Arc<SynchronizerMetricsConfig>,
parallel_tasks: usize,
) -> Self {
Synchronizer {
primary_storage,
index_storage,
rocks_primary_storage,
pg_index_storage,
dump_synchronizer_batch_size,
dump_path,
metrics,
Expand Down Expand Up @@ -109,16 +109,16 @@ where
run_full_sync_threshold: i64,
asset_type: AssetType,
) -> Result<SyncStatus, IngesterError> {
let last_indexed_key = self.index_storage.fetch_last_synced_id(asset_type).await?;
let last_indexed_key = self.pg_index_storage.fetch_last_synced_id(asset_type).await?;
let last_indexed_key = last_indexed_key.map(decode_u64x2_pubkey).transpose()?;

// Fetch the last known key from the primary storage
let (last_key, prefix) = match asset_type {
AssetType::NonFungible => {
(self.primary_storage.last_known_nft_asset_updated_key()?, "nft")
(self.rocks_primary_storage.last_known_nft_asset_updated_key()?, "nft")
},
AssetType::Fungible => {
(self.primary_storage.last_known_fungible_asset_updated_key()?, "fungible")
(self.rocks_primary_storage.last_known_fungible_asset_updated_key()?, "fungible")
},
};
let Some(last_key) = last_key else {
Expand Down Expand Up @@ -195,13 +195,17 @@ where
let state = self.get_sync_state(run_full_sync_threshold, asset_type).await?;
match state {
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
tracing::warn!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await
},
SyncStatus::RegularSyncRequired(state) => {
tracing::debug!("Regular sync required for nft asset");
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await
},
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {
tracing::debug!("No sync required for nft asset");
Ok(())
},
}
}

Expand All @@ -216,15 +220,19 @@ where

match state {
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
tracing::warn!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
},
SyncStatus::RegularSyncRequired(state) => {
tracing::debug!("Regular sync required for fungible asset");
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
},
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {
tracing::debug!("No sync required for fungible asset");
Ok(())
},
}
}

Expand All @@ -234,8 +242,12 @@ where
asset_type: AssetType,
) -> Result<(), IngesterError> {
let last_known_key = match asset_type {
AssetType::NonFungible => self.primary_storage.last_known_nft_asset_updated_key()?,
AssetType::Fungible => self.primary_storage.last_known_fungible_asset_updated_key()?,
AssetType::NonFungible => {
self.rocks_primary_storage.last_known_nft_asset_updated_key()?
},
AssetType::Fungible => {
self.rocks_primary_storage.last_known_fungible_asset_updated_key()?
},
};
let Some(last_known_key) = last_known_key else {
return Ok(());
Expand Down Expand Up @@ -273,7 +285,7 @@ where
num_shards: u64,
) -> Result<(), IngesterError> {
let base_path = std::path::Path::new(self.dump_path.as_str());
self.index_storage.destructive_prep_to_batch_nft_load().await?;
self.pg_index_storage.destructive_prep_to_batch_nft_load().await?;

let shards = shard_pubkeys(num_shards);
type ResultWithPaths = Result<(usize, String, String, String, String), String>;
Expand Down Expand Up @@ -322,7 +334,7 @@ where
let end = *end;
let shutdown_rx = rx.resubscribe();
let metrics = self.metrics.clone();
let rocks_storage = self.primary_storage.clone();
let rocks_storage = self.rocks_primary_storage.clone();
tasks.spawn_blocking(move || {
let res = rocks_storage.dump_nft_csv(
assets_file,
Expand Down Expand Up @@ -350,7 +362,7 @@ where
while let Some(task) = tasks.join_next().await {
let (_cnt, assets_path, creators_path, authorities_path, metadata_path) =
task.map_err(|e| e.to_string())??;
let index_storage = self.index_storage.clone();
let index_storage = self.pg_index_storage.clone();
let semaphore = semaphore.clone();
index_tasks.spawn(async move {
index_storage
Expand All @@ -368,9 +380,9 @@ where
task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?;
}
tracing::info!("All NFT assets loads complete. Finalizing the batch load");
self.index_storage.finalize_batch_nft_load().await?;
self.pg_index_storage.finalize_batch_nft_load().await?;
tracing::info!("Batch load finalized for NFTs");
self.index_storage
self.pg_index_storage
.update_last_synced_key(last_included_rocks_key, AssetType::NonFungible)
.await?;
Ok(())
Expand All @@ -383,7 +395,7 @@ where
num_shards: u64,
) -> Result<(), IngesterError> {
let base_path = std::path::Path::new(self.dump_path.as_str());
self.index_storage.destructive_prep_to_batch_fungible_load().await?;
self.pg_index_storage.destructive_prep_to_batch_fungible_load().await?;

let shards = shard_pubkeys(num_shards);
let mut tasks: JoinSet<Result<(usize, String), String>> = JoinSet::new();
Expand All @@ -405,7 +417,7 @@ where
let end = *end;
let shutdown_rx = rx.resubscribe();
let metrics = self.metrics.clone();
let rocks_storage = self.primary_storage.clone();
let rocks_storage = self.rocks_primary_storage.clone();

tasks.spawn_blocking(move || {
let res = rocks_storage.dump_fungible_csv(
Expand All @@ -423,7 +435,7 @@ where
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
while let Some(task) = tasks.join_next().await {
let (_cnt, fungible_tokens_path) = task.map_err(|e| e.to_string())??;
let index_storage = self.index_storage.clone();
let index_storage = self.pg_index_storage.clone();
let semaphore = semaphore.clone();
index_tasks.spawn(async move {
index_storage
Expand All @@ -435,9 +447,9 @@ where
task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?;
}
tracing::info!("All token accounts/fungibles loads complete. Finalizing the batch load");
self.index_storage.finalize_batch_fungible_load().await?;
self.pg_index_storage.finalize_batch_fungible_load().await?;
tracing::info!("Batch load finalized for fungibles");
self.index_storage
self.pg_index_storage
.update_last_synced_key(last_included_rocks_key, AssetType::Fungible)
.await?;
Ok(())
Expand All @@ -461,7 +473,7 @@ where
break;
}
let (updated_keys, last_included_key) =
self.primary_storage.fetch_fungible_asset_updated_keys(
self.rocks_primary_storage.fetch_fungible_asset_updated_keys(
starting_key.clone(),
Some(last_key.clone()),
self.dump_synchronizer_batch_size,
Expand All @@ -482,8 +494,8 @@ where
// Update the asset indexes in the index storage
// let last_included_key = AssetsUpdateIdx::encode_key(last_included_key);
last_included_rocks_key = Some(last_included_key);
let primary_storage = self.primary_storage.clone();
let index_storage = self.index_storage.clone();
let primary_storage = self.rocks_primary_storage.clone();
let index_storage = self.pg_index_storage.clone();
let metrics = self.metrics.clone();
tasks.spawn(async move {
Self::syncronize_fungible_batch(
Expand Down Expand Up @@ -518,7 +530,7 @@ where
last_included_rocks_key.slot,
last_included_rocks_key.pubkey,
);
self.index_storage
self.pg_index_storage
.update_last_synced_key(&last_included_rocks_key, AssetType::Fungible)
.await?;
} else {
Expand Down Expand Up @@ -550,7 +562,7 @@ where
break;
}
let (updated_keys, last_included_key) =
self.primary_storage.fetch_nft_asset_updated_keys(
self.rocks_primary_storage.fetch_nft_asset_updated_keys(
starting_key.clone(),
Some(last_key.clone()),
self.dump_synchronizer_batch_size,
Expand All @@ -571,8 +583,8 @@ where
// Update the asset indexes in the index storage
// let last_included_key = AssetsUpdateIdx::encode_key(last_included_key);
last_included_rocks_key = Some(last_included_key);
let primary_storage = self.primary_storage.clone();
let index_storage = self.index_storage.clone();
let primary_storage = self.rocks_primary_storage.clone();
let index_storage = self.pg_index_storage.clone();
let metrics = self.metrics.clone();
tasks.spawn(async move {
Self::syncronize_nft_batch(
Expand Down Expand Up @@ -607,7 +619,7 @@ where
last_included_rocks_key.slot,
last_included_rocks_key.pubkey,
);
self.index_storage
self.pg_index_storage
.update_last_synced_key(&last_included_rocks_key, AssetType::NonFungible)
.await?;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl TokenAccountsProcessor {
.map_err(|e| StorageError::Common(e.to_string()))
})
.transpose()?;

let asset_dynamic_details = AssetDynamicDetails {
pubkey: mint.pubkey,
supply: Some(Updated::new(
Expand Down
Loading

0 comments on commit 0e29a10

Please sign in to comment.