diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 3e7f1c5332..815849184d 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -1,15 +1,14 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::hash::Hash; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use bitflags::bitflags; -use cainome::cairo_serde::CairoSerde; use dojo_utils::provider as provider_utils; use dojo_world::contracts::world::WorldContractReader; -use futures_util::future::{join_all, try_join_all}; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, @@ -18,7 +17,7 @@ use starknet::core::types::{ }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; -use starknet_crypto::{poseidon_hash_many, Felt}; +use starknet_crypto::Felt; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor; use crate::processors::{ BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor, }; +use crate::task_manager::{self, ParallelizedEvent, TaskManager}; type EventProcessorMap

= HashMap>>>; @@ -200,17 +200,6 @@ pub struct FetchPendingResult { pub block_number: u64, } -#[derive(Debug)] -pub struct ParallelizedEvent { - pub block_number: u64, - pub block_timestamp: u64, - pub event_id: String, - pub event: Event, -} - -type TaskPriority = usize; -type TaskId = u64; - #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -220,7 +209,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: BTreeMap>>, + task_manager: TaskManager

, contracts: Arc>, } @@ -244,17 +233,27 @@ impl Engine

{ let contracts = Arc::new( contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(), ); + let world = Arc::new(world); + let processors = Arc::new(processors); + let max_concurrent_tasks = config.max_concurrent_tasks; + let event_processor_config = config.event_processor_config.clone(); Self { - world: Arc::new(world), - db, + world: world.clone(), + db: db.clone(), provider: Arc::new(provider), - processors: Arc::new(processors), + processors: processors.clone(), config, shutdown_tx, block_tx, contracts, - tasks: BTreeMap::new(), + task_manager: TaskManager::new( + db, + world, + processors, + max_concurrent_tasks, + event_processor_config, + ), } } @@ -542,7 +541,7 @@ impl Engine

{ } // Process parallelized events - self.process_tasks().await?; + self.task_manager.process_tasks().await?; self.db.update_cursors( data.block_number - 1, @@ -589,7 +588,7 @@ impl Engine

{ } // Process parallelized events - self.process_tasks().await?; + self.task_manager.process_tasks().await?; let last_block_timestamp = get_block_timestamp(&self.provider, data.latest_block_number).await?; @@ -599,77 +598,6 @@ impl Engine

{ Ok(()) } - async fn process_tasks(&mut self) -> Result<()> { - let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); - - // Process each priority level sequentially - for (priority, task_group) in std::mem::take(&mut self.tasks) { - let mut handles = Vec::new(); - - // Process all tasks within this priority level concurrently - for (task_id, events) in task_group { - let db = self.db.clone(); - let world = self.world.clone(); - let semaphore = semaphore.clone(); - let processors = self.processors.clone(); - let event_processor_config = self.config.event_processor_config.clone(); - - handles.push(tokio::spawn(async move { - let _permit = semaphore.acquire().await?; - let mut local_db = db.clone(); - - // Process all events for this task sequentially - for (contract_type, event) in events { - let contract_processors = processors.get_event_processor(contract_type); - if let Some(processors) = contract_processors.get(&event.event.keys[0]) { - let processor = processors - .iter() - .find(|p| p.validate(&event.event)) - .expect("Must find at least one processor for the event"); - - debug!( - target: LOG_TARGET, - event_name = processor.event_key(), - task_id = %task_id, - priority = %priority, - "Processing parallelized event." - ); - - if let Err(e) = processor - .process( - &world, - &mut local_db, - event.block_number, - event.block_timestamp, - &event.event_id, - &event.event, - &event_processor_config, - ) - .await - { - error!( - target: LOG_TARGET, - event_name = processor.event_key(), - error = %e, - task_id = %task_id, - priority = %priority, - "Processing parallelized event." - ); - } - } - } - - Ok::<_, anyhow::Error>(()) - })); - } - - // Wait for all tasks in this priority level to complete before moving to next priority - try_join_all(handles).await?; - } - - Ok(()) - } - async fn process_transaction_with_events( &mut self, transaction_hash: Felt, @@ -881,50 +809,21 @@ impl Engine

{ .find(|p| p.validate(event)) .expect("Must find atleast one processor for the event"); - let (task_priority, task_identifier) = match processor.event_key().as_str() { - "ModelRegistered" | "EventRegistered" => { - let mut hasher = DefaultHasher::new(); - event.keys.iter().for_each(|k| k.hash(&mut hasher)); - let hash = hasher.finish(); - (0usize, hash) // Priority 0 (highest) for model/event registration - } - "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - let mut hasher = DefaultHasher::new(); - event.keys[1].hash(&mut hasher); - event.keys[2].hash(&mut hasher); - let hash = hasher.finish(); - (2usize, hash) // Priority 2 (lower) for store operations - } - "EventEmitted" => { - let mut hasher = DefaultHasher::new(); - - let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { - panic!("Expected EventEmitted keys to be well formed: {:?}", e); - }); - - // selector - event.keys[1].hash(&mut hasher); - // entity id - let entity_id = poseidon_hash_many(&keys); - entity_id.hash(&mut hasher); + let (task_priority, task_identifier) = + (processor.task_priority(), processor.task_identifier(event)); - let hash = hasher.finish(); - (2usize, hash) // Priority 2 for event messages - } - _ => (0, 0), // No parallelization for other events - }; - - if task_identifier != 0 { - self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push( - ( + // if our event can be parallelized, we add it to the task manager + if task_identifier != task_manager::TASK_ID_SEQUENTIAL { + self.task_manager.add_parallelized_event( + task_priority, + task_identifier, + ParallelizedEvent { contract_type, - ParallelizedEvent { - event_id: event_id.to_string(), - event: event.clone(), - block_number, - block_timestamp, - }, - ), + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }, ); } else { // Process non-parallelized events immediately diff --git a/crates/torii/indexer/src/lib.rs b/crates/torii/indexer/src/lib.rs index 7191c5480f..dcb8787a1e 100644 --- a/crates/torii/indexer/src/lib.rs +++ b/crates/torii/indexer/src/lib.rs @@ -6,5 +6,6 @@ mod test; pub mod engine; pub mod processors; +mod task_manager; pub use engine::Engine; diff --git a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs index 3b207d466b..6783162419 100644 --- a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -8,6 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; @@ -34,6 +37,24 @@ where false } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs index a0643abd41..98fb907bb0 100644 --- a/crates/torii/indexer/src/processors/erc20_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -8,6 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; @@ -34,6 +37,24 @@ where false } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index df6b2a88de..e02304ed9c 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -8,6 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; @@ -34,6 +37,32 @@ where false } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.data[2].hash(&mut hasher); + event.data[3].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index faf124360b..4a0383a4f1 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -8,6 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer"; @@ -34,6 +37,32 @@ where false } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.keys[3].hash(&mut hasher); + event.keys[4].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index 4495665bed..f8c69f471e 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -1,14 +1,19 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; +use cainome::cairo_serde::CairoSerde; use dojo_world::contracts::abigen::world::Event as WorldEvent; use dojo_world::contracts::naming::get_tag; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt}; use starknet::providers::Provider; +use starknet_crypto::poseidon_hash_many; use torii_sqlite::Sql; use tracing::info; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::event_message"; @@ -28,6 +33,23 @@ where true } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { + panic!("Expected EventEmitted keys to be well formed: {:?}", e); + }); + // selector + event.keys[1].hash(&mut hasher); + // entity id + let entity_id = poseidon_hash_many(&keys); + entity_id.hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index c4b71c4a52..a3d40eb5cb 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use base64::engine::general_purpose; @@ -14,6 +16,7 @@ use torii_sqlite::Sql; use tracing::{error, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::metadata_update"; @@ -33,6 +36,16 @@ where true } + fn task_priority(&self) -> TaskPriority { + 3 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs index abe358b6e7..420dd798a7 100644 --- a/crates/torii/indexer/src/processors/mod.rs +++ b/crates/torii/indexer/src/processors/mod.rs @@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction}; use starknet::providers::Provider; use torii_sqlite::Sql; +use crate::task_manager::{TaskId, TaskPriority}; + pub mod erc20_legacy_transfer; pub mod erc20_transfer; pub mod erc721_legacy_transfer; @@ -53,6 +55,9 @@ where fn validate(&self, event: &Event) -> bool; + fn task_priority(&self) -> TaskPriority; + fn task_identifier(&self, event: &Event) -> TaskId; + #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs index c30a918fa2..8614b92695 100644 --- a/crates/torii/indexer/src/processors/raw_event.rs +++ b/crates/torii/indexer/src/processors/raw_event.rs @@ -6,6 +6,7 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{self, TaskId, TaskPriority}; #[derive(Default, Debug)] pub struct RawEventProcessor; @@ -23,6 +24,15 @@ where true } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, _event: &Event) -> TaskId { + // TODO. for now raw events are not parallelized + task_manager::TASK_ID_SEQUENTIAL + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs index e9c94f296a..df4a536625 100644 --- a/crates/torii/indexer/src/processors/register_event.rs +++ b/crates/torii/indexer/src/processors/register_event.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_event"; @@ -30,6 +33,16 @@ where true } + fn task_priority(&self) -> TaskPriority { + 0 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_model.rs b/crates/torii/indexer/src/processors/register_model.rs index d630feff88..dae000613a 100644 --- a/crates/torii/indexer/src/processors/register_model.rs +++ b/crates/torii/indexer/src/processors/register_model.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_model"; @@ -30,6 +33,16 @@ where true } + fn task_priority(&self) -> TaskPriority { + 0 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index c129e7e7a9..d3256c0ef1 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -8,6 +10,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record"; @@ -27,6 +30,17 @@ where true } + fn task_priority(&self) -> TaskPriority { + 2 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index 1b30b24016..d508013381 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_set_record"; @@ -28,6 +31,17 @@ where true } + fn task_priority(&self) -> TaskPriority { + 2 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index 30fd10882e..6c45c66154 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Context, Error, Result}; use async_trait::async_trait; use dojo_types::schema::{Struct, Ty}; @@ -10,6 +12,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_member"; @@ -29,6 +32,19 @@ where true } + fn task_priority(&self) -> TaskPriority { + 2 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // model selector + event.keys[1].hash(&mut hasher); + // entity id + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index a76f40293e..05984ecd72 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_types::schema::Ty; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_record"; @@ -28,6 +31,19 @@ where true } + fn task_priority(&self) -> TaskPriority { + 2 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // model selector + event.keys[1].hash(&mut hasher); + // entity id + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index ba7966d63b..3b9597b3ad 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_event"; @@ -30,6 +33,16 @@ where true } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 40717df30d..fe46304b72 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -9,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_model"; @@ -30,6 +33,16 @@ where true } + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs new file mode 100644 index 0000000000..9a7c4a4ed9 --- /dev/null +++ b/crates/torii/indexer/src/task_manager.rs @@ -0,0 +1,151 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + +use anyhow::Result; +use dojo_world::contracts::WorldContractReader; +use futures_util::future::try_join_all; +use starknet::core::types::Event; +use starknet::providers::Provider; +use tokio::sync::Semaphore; +use torii_sqlite::types::ContractType; +use torii_sqlite::Sql; +use tracing::{debug, error}; + +use crate::engine::Processors; +use crate::processors::EventProcessorConfig; + +pub const TASK_ID_SEQUENTIAL: TaskId = 0; + +const LOG_TARGET: &str = "torii::indexer::task_manager"; + +pub type TaskId = u64; +pub type TaskPriority = usize; + +#[derive(Debug)] +pub struct ParallelizedEvent { + pub contract_type: ContractType, + pub block_number: u64, + pub block_timestamp: u64, + pub event_id: String, + pub event: Event, +} + +pub struct TaskManager { + db: Sql, + world: Arc>, + tasks: BTreeMap>>, + processors: Arc>, + max_concurrent_tasks: usize, + event_processor_config: EventProcessorConfig, +} + +impl TaskManager

{ + pub fn new( + db: Sql, + world: Arc>, + processors: Arc>, + max_concurrent_tasks: usize, + event_processor_config: EventProcessorConfig, + ) -> Self { + Self { + db, + world, + tasks: BTreeMap::new(), + processors, + max_concurrent_tasks, + event_processor_config, + } + } + + pub fn add_parallelized_event( + &mut self, + priority: TaskPriority, + task_identifier: TaskId, + parallelized_event: ParallelizedEvent, + ) { + self.tasks + .entry(priority) + .or_default() + .entry(task_identifier) + .or_default() + .push(parallelized_event); + } + + pub async fn process_tasks(&mut self) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); + + // Process each priority level sequentially + for (priority, task_group) in std::mem::take(&mut self.tasks) { + let mut handles = Vec::new(); + + // Process all tasks within this priority level concurrently + for (task_id, events) in task_group { + let db = self.db.clone(); + let world = self.world.clone(); + let semaphore = semaphore.clone(); + let processors = self.processors.clone(); + let event_processor_config = self.event_processor_config.clone(); + + handles.push(tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + let mut local_db = db.clone(); + + // Process all events for this task sequentially + for ParallelizedEvent { + contract_type, + event, + block_number, + block_timestamp, + event_id, + } in events + { + let contract_processors = processors.get_event_processor(contract_type); + if let Some(processors) = contract_processors.get(&event.keys[0]) { + let processor = processors + .iter() + .find(|p| p.validate(&event)) + .expect("Must find at least one processor for the event"); + + debug!( + target: LOG_TARGET, + event_name = processor.event_key(), + task_id = %task_id, + priority = %priority, + "Processing parallelized event." + ); + + if let Err(e) = processor + .process( + &world, + &mut local_db, + block_number, + block_timestamp, + &event_id, + &event, + &event_processor_config, + ) + .await + { + error!( + target: LOG_TARGET, + event_name = processor.event_key(), + error = %e, + task_id = %task_id, + priority = %priority, + "Processing parallelized event." + ); + } + } + } + + Ok::<_, anyhow::Error>(()) + })); + } + + // Wait for all tasks in this priority level to complete before moving to next priority + try_join_all(handles).await?; + } + + Ok(()) + } +} diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index bbfad566db..3d6eea38b8 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -118,14 +118,8 @@ impl ModelCache { #[derive(Debug)] pub struct LocalCache { - pub erc_cache: HashMap<(ContractType, String), I256>, - pub token_id_registry: HashSet, -} - -impl Clone for LocalCache { - fn clone(&self) -> Self { - Self { erc_cache: HashMap::new(), token_id_registry: self.token_id_registry.clone() } - } + pub erc_cache: RwLock>, + pub token_id_registry: RwLock>, } impl LocalCache { @@ -139,14 +133,17 @@ impl LocalCache { let token_id_registry = token_id_registry.into_iter().map(|token_id| token_id.0).collect(); - Self { erc_cache: HashMap::new(), token_id_registry } + Self { + erc_cache: RwLock::new(HashMap::new()), + token_id_registry: RwLock::new(token_id_registry), + } } - pub fn contains_token_id(&self, token_id: &str) -> bool { - self.token_id_registry.contains(token_id) + pub async fn contains_token_id(&self, token_id: &str) -> bool { + self.token_id_registry.read().await.contains(token_id) } - pub fn register_token_id(&mut self, token_id: String) { - self.token_id_registry.insert(token_id); + pub async fn register_token_id(&self, token_id: String) { + self.token_id_registry.write().await.insert(token_id); } } diff --git a/crates/torii/sqlite/src/erc.rs b/crates/torii/sqlite/src/erc.rs index f11f7988c1..427a49a8d8 100644 --- a/crates/torii/sqlite/src/erc.rs +++ b/crates/torii/sqlite/src/erc.rs @@ -36,7 +36,7 @@ impl Sql { // contract_address let token_id = felt_to_sql_string(&contract_address); - let token_exists: bool = self.local_cache.contains_token_id(&token_id); + let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; @@ -52,26 +52,25 @@ impl Sql { event_id, )?; - if from_address != Felt::ZERO { - // from_address/contract_address/ - let from_balance_id = felts_to_sql_string(&[from_address, contract_address]); - let from_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC20, from_balance_id)) - .or_default(); - *from_balance -= I256::from(amount); - } - - if to_address != Felt::ZERO { - let to_balance_id = felts_to_sql_string(&[to_address, contract_address]); - let to_balance = - self.local_cache.erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default(); - *to_balance += I256::from(amount); + { + let mut erc_cache = self.local_cache.erc_cache.write().await; + if from_address != Felt::ZERO { + // from_address/contract_address/ + let from_balance_id = felts_to_sql_string(&[from_address, contract_address]); + let from_balance = + erc_cache.entry((ContractType::ERC20, from_balance_id)).or_default(); + *from_balance -= I256::from(amount); + } + + if to_address != Felt::ZERO { + let to_balance_id = felts_to_sql_string(&[to_address, contract_address]); + let to_balance = erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default(); + *to_balance += I256::from(amount); + } } let block_id = BlockId::Number(block_number); - if self.local_cache.erc_cache.len() >= 100000 { + if self.local_cache.erc_cache.read().await.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc20_transfer")?; self.apply_cache_diff(block_id).await?; } @@ -93,7 +92,7 @@ impl Sql { // contract_address:id let actual_token_id = token_id; let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); - let token_exists: bool = self.local_cache.contains_token_id(&token_id); + let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { self.register_erc721_token_metadata(contract_address, &token_id, actual_token_id) @@ -111,30 +110,31 @@ impl Sql { )?; // from_address/contract_address:id - if from_address != Felt::ZERO { - let from_balance_id = - format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id); - let from_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC721, from_balance_id)) - .or_default(); - *from_balance -= I256::from(1u8); + { + let mut erc_cache = self.local_cache.erc_cache.write().await; + if from_address != Felt::ZERO { + let from_balance_id = format!( + "{}{SQL_FELT_DELIMITER}{}", + felt_to_sql_string(&from_address), + &token_id + ); + let from_balance = + erc_cache.entry((ContractType::ERC721, from_balance_id)).or_default(); + *from_balance -= I256::from(1u8); + } + + if to_address != Felt::ZERO { + let to_balance_id = + format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); + let to_balance = + erc_cache.entry((ContractType::ERC721, to_balance_id)).or_default(); + *to_balance += I256::from(1u8); + } } - if to_address != Felt::ZERO { - let to_balance_id = - format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); - let to_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC721, to_balance_id)) - .or_default(); - *to_balance += I256::from(1u8); - } let block_id = BlockId::Number(block_number); - if self.local_cache.erc_cache.len() >= 100000 { + if self.local_cache.erc_cache.read().await.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc721_transfer")?; self.apply_cache_diff(block_id).await?; } @@ -215,7 +215,7 @@ impl Sql { }), ))?; - self.local_cache.register_token_id(token_id.to_string()); + self.local_cache.register_token_id(token_id.to_string()).await; Ok(()) } @@ -240,7 +240,7 @@ impl Sql { // this cache is used while applying the cache diff // so we need to make sure that all RegisterErc*Token queries // are applied before the cache diff is applied - self.local_cache.register_token_id(token_id.to_string()); + self.local_cache.register_token_id(token_id.to_string()).await; Ok(()) } @@ -279,15 +279,13 @@ impl Sql { } pub async fn apply_cache_diff(&mut self, block_id: BlockId) -> Result<()> { - if !self.local_cache.erc_cache.is_empty() { + if !self.local_cache.erc_cache.read().await.is_empty() { + let mut erc_cache = self.local_cache.erc_cache.write().await; self.executor.send(QueryMessage::new( "".to_string(), vec![], QueryType::ApplyBalanceDiff(ApplyBalanceDiffQuery { - erc_cache: mem::replace( - &mut self.local_cache.erc_cache, - HashMap::with_capacity(64), - ), + erc_cache: mem::replace(&mut erc_cache, HashMap::with_capacity(64)), block_id, }), ))?; diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index d8bbcc4dfa..241ac83191 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -43,8 +43,7 @@ pub struct Sql { pub pool: Pool, pub executor: UnboundedSender, model_cache: Arc, - // when SQL struct is cloned a empty local_cache is created - local_cache: LocalCache, + local_cache: Arc, } #[derive(Debug, Clone)] @@ -75,7 +74,8 @@ impl Sql { } let local_cache = LocalCache::new(pool.clone()).await; - let db = Self { pool: pool.clone(), executor, model_cache, local_cache }; + let db = + Self { pool: pool.clone(), executor, model_cache, local_cache: Arc::new(local_cache) }; db.execute().await?;