From 4bebac9b63bd3bc7814b824887d0b9509ff6ee76 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 17:06:07 +0700 Subject: [PATCH 1/9] feat(torii-indexer): task manager & task id for every event --- crates/torii/indexer/src/engine.rs | 13 +--- crates/torii/indexer/src/lib.rs | 1 + .../src/processors/erc20_legacy_transfer.rs | 22 ++++++ .../indexer/src/processors/erc20_transfer.rs | 22 ++++++ .../src/processors/erc721_legacy_transfer.rs | 30 +++++++++ .../indexer/src/processors/erc721_transfer.rs | 29 ++++++++ .../indexer/src/processors/event_message.rs | 9 +++ .../indexer/src/processors/metadata_update.rs | 9 +++ crates/torii/indexer/src/processors/mod.rs | 5 ++ .../torii/indexer/src/processors/raw_event.rs | 9 +++ .../indexer/src/processors/register_event.rs | 12 ++++ .../indexer/src/processors/register_model.rs | 12 ++++ .../src/processors/store_del_record.rs | 15 +++++ .../src/processors/store_set_record.rs | 13 ++++ .../src/processors/store_update_member.rs | 12 ++++ .../src/processors/store_update_record.rs | 13 ++++ .../indexer/src/processors/upgrade_event.rs | 12 ++++ .../indexer/src/processors/upgrade_model.rs | 12 ++++ crates/torii/indexer/src/task_manager.rs | 67 +++++++++++++++++++ 19 files changed, 305 insertions(+), 12 deletions(-) create mode 100644 crates/torii/indexer/src/task_manager.rs diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 3e7f1c5332..e1f18a7d30 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -200,17 +200,6 @@ pub struct FetchPendingResult { pub block_number: u64, } -#[derive(Debug)] -pub struct ParallelizedEvent { - pub block_number: u64, - pub block_timestamp: u64, - pub event_id: String, - pub event: Event, -} - -type TaskPriority = usize; -type TaskId = u64; - #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -220,7 +209,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: BTreeMap>>, + task_manager: TaskManager

, contracts: Arc>, } diff --git a/crates/torii/indexer/src/lib.rs b/crates/torii/indexer/src/lib.rs index 7191c5480f..e766b08dc0 100644 --- a/crates/torii/indexer/src/lib.rs +++ b/crates/torii/indexer/src/lib.rs @@ -4,6 +4,7 @@ mod constants; #[path = "test.rs"] mod test; +mod task_manager; pub mod engine; pub mod processors; diff --git a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs index 3b207d466b..9f01406089 100644 --- a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; @@ -34,6 +38,24 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs index a0643abd41..f4d8426dbe 100644 --- a/crates/torii/indexer/src/processors/erc20_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; @@ -34,6 +38,24 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index df6b2a88de..f329fbbaa4 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; @@ -34,6 +38,32 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.data[2].hash(&mut hasher); + event.data[3].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index faf124360b..60a68df784 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +8,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer"; @@ -34,6 +37,32 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.keys[3].hash(&mut hasher); + event.keys[4].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index 4495665bed..97954edd18 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -28,6 +28,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now event messages are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index ec2c4b493d..017ad8b451 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -34,6 +34,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now metadata updates are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs index abe358b6e7..3044577bd0 100644 --- a/crates/torii/indexer/src/processors/mod.rs +++ b/crates/torii/indexer/src/processors/mod.rs @@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction}; use starknet::providers::Provider; use torii_sqlite::Sql; +use crate::task_manager::TaskId; + pub mod erc20_legacy_transfer; pub mod erc20_transfer; pub mod erc721_legacy_transfer; @@ -53,6 +55,9 @@ where fn validate(&self, event: &Event) -> bool; + fn task_priority(&self) -> usize; + fn task_identifier(&self, event: &Event) -> TaskId; + #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs index c30a918fa2..262d76cce8 100644 --- a/crates/torii/indexer/src/processors/raw_event.rs +++ b/crates/torii/indexer/src/processors/raw_event.rs @@ -23,6 +23,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now raw events are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs index e9c94f296a..f109f2ddd4 100644 --- a/crates/torii/indexer/src/processors/register_event.rs +++ b/crates/torii/indexer/src/processors/register_event.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +32,16 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_model.rs b/crates/torii/indexer/src/processors/register_model.rs index d630feff88..cae4772ff1 100644 --- a/crates/torii/indexer/src/processors/register_model.rs +++ b/crates/torii/indexer/src/processors/register_model.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +32,16 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index c129e7e7a9..943212dcf1 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record"; @@ -27,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index 1b30b24016..2a4c856499 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -28,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index 30fd10882e..c31e1721b9 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Context, Error, Result}; use async_trait::async_trait; use dojo_types::schema::{Struct, Ty}; @@ -29,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index a76f40293e..31c2803c66 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_types::schema::Ty; @@ -28,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index ba7966d63b..2840530faa 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + // event selector + event.keys[1].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 40717df30d..2bedeee8a5 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + // model selector + event.keys[1].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs new file mode 100644 index 0000000000..1dbc1b7690 --- /dev/null +++ b/crates/torii/indexer/src/task_manager.rs @@ -0,0 +1,67 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use starknet::{core::types::Event, providers::Provider}; +use torii_sqlite::types::ContractType; + +use crate::engine::Processors; + +pub type TaskId = u64; +type TaskPriority = usize; + +#[derive(Debug)] +pub struct ParallelizedEvent { + pub contract_type: ContractType, + pub block_number: u64, + pub block_timestamp: u64, + pub event_id: String, + pub event: Event, +} + +pub struct TaskManager { + tasks: BTreeMap>>, + processors: Arc>, +} + +impl TaskManager

{ + pub fn new(processors: Arc>) -> Self { + Self { tasks: BTreeMap::new(), processors } + } + + pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId { + let event_key = parallelized_event.event.keys[0]; + let processor = self + .processors + .get_event_processor(parallelized_event.contract_type) + .get(&event_key) + .unwrap() + .iter() + .find(|p| p.validate(¶llelized_event.event)) + .unwrap(); + let priority = processor.task_priority(); + let task_id = processor.task_identifier(¶llelized_event.event); + + if task_id != 0 { + self.tasks + .entry(priority) + .or_default() + .entry(task_id) + .or_default() + .push(parallelized_event); + } + + task_id + } + + pub fn take_tasks( + &mut self, + ) -> BTreeMap>> { + std::mem::take(&mut self.tasks) + } + + pub fn is_empty(&self) -> bool { + self.tasks.is_empty() + } +} From 49e91b0e940f1a28754b813d5b5fa15fa757ba38 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 16 Jan 2025 14:48:48 +0700 Subject: [PATCH 2/9] feat: finish up task manager --- crates/torii/indexer/src/engine.rs | 153 +++++------------------ crates/torii/indexer/src/task_manager.rs | 143 ++++++++++++++++----- crates/torii/sqlite/src/cache.rs | 16 +-- crates/torii/sqlite/src/erc.rs | 90 +++++++------ crates/torii/sqlite/src/lib.rs | 10 +- 5 files changed, 199 insertions(+), 213 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index e1f18a7d30..12eef20880 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -1,15 +1,14 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::hash::Hash; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use bitflags::bitflags; -use cainome::cairo_serde::CairoSerde; use dojo_utils::provider as provider_utils; use dojo_world::contracts::world::WorldContractReader; -use futures_util::future::{join_all, try_join_all}; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, @@ -18,7 +17,7 @@ use starknet::core::types::{ }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; -use starknet_crypto::{poseidon_hash_many, Felt}; +use starknet_crypto::Felt; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor; use crate::processors::{ BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor, }; +use crate::task_manager::{ParallelizedEvent, TaskManager}; type EventProcessorMap

= HashMap>>>; @@ -233,17 +233,27 @@ impl Engine

{ let contracts = Arc::new( contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(), ); + let world = Arc::new(world); + let processors = Arc::new(processors); + let max_concurrent_tasks = config.max_concurrent_tasks; + let event_processor_config = config.event_processor_config.clone(); Self { - world: Arc::new(world), - db, + world: world.clone(), + db: db.clone(), provider: Arc::new(provider), - processors: Arc::new(processors), + processors: processors.clone(), config, shutdown_tx, block_tx, contracts, - tasks: BTreeMap::new(), + task_manager: TaskManager::new( + db, + world, + processors, + max_concurrent_tasks, + event_processor_config, + ), } } @@ -531,7 +541,7 @@ impl Engine

{ } // Process parallelized events - self.process_tasks().await?; + self.task_manager.process_tasks().await?; self.db.update_cursors( data.block_number - 1, @@ -578,7 +588,7 @@ impl Engine

{ } // Process parallelized events - self.process_tasks().await?; + self.task_manager.process_tasks().await?; let last_block_timestamp = get_block_timestamp(&self.provider, data.latest_block_number).await?; @@ -588,77 +598,6 @@ impl Engine

{ Ok(()) } - async fn process_tasks(&mut self) -> Result<()> { - let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); - - // Process each priority level sequentially - for (priority, task_group) in std::mem::take(&mut self.tasks) { - let mut handles = Vec::new(); - - // Process all tasks within this priority level concurrently - for (task_id, events) in task_group { - let db = self.db.clone(); - let world = self.world.clone(); - let semaphore = semaphore.clone(); - let processors = self.processors.clone(); - let event_processor_config = self.config.event_processor_config.clone(); - - handles.push(tokio::spawn(async move { - let _permit = semaphore.acquire().await?; - let mut local_db = db.clone(); - - // Process all events for this task sequentially - for (contract_type, event) in events { - let contract_processors = processors.get_event_processor(contract_type); - if let Some(processors) = contract_processors.get(&event.event.keys[0]) { - let processor = processors - .iter() - .find(|p| p.validate(&event.event)) - .expect("Must find at least one processor for the event"); - - debug!( - target: LOG_TARGET, - event_name = processor.event_key(), - task_id = %task_id, - priority = %priority, - "Processing parallelized event." - ); - - if let Err(e) = processor - .process( - &world, - &mut local_db, - event.block_number, - event.block_timestamp, - &event.event_id, - &event.event, - &event_processor_config, - ) - .await - { - error!( - target: LOG_TARGET, - event_name = processor.event_key(), - error = %e, - task_id = %task_id, - priority = %priority, - "Processing parallelized event." - ); - } - } - } - - Ok::<_, anyhow::Error>(()) - })); - } - - // Wait for all tasks in this priority level to complete before moving to next priority - try_join_all(handles).await?; - } - - Ok(()) - } - async fn process_transaction_with_events( &mut self, transaction_hash: Felt, @@ -870,50 +809,20 @@ impl Engine

{ .find(|p| p.validate(event)) .expect("Must find atleast one processor for the event"); - let (task_priority, task_identifier) = match processor.event_key().as_str() { - "ModelRegistered" | "EventRegistered" => { - let mut hasher = DefaultHasher::new(); - event.keys.iter().for_each(|k| k.hash(&mut hasher)); - let hash = hasher.finish(); - (0usize, hash) // Priority 0 (highest) for model/event registration - } - "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - let mut hasher = DefaultHasher::new(); - event.keys[1].hash(&mut hasher); - event.keys[2].hash(&mut hasher); - let hash = hasher.finish(); - (2usize, hash) // Priority 2 (lower) for store operations - } - "EventEmitted" => { - let mut hasher = DefaultHasher::new(); - - let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { - panic!("Expected EventEmitted keys to be well formed: {:?}", e); - }); - - // selector - event.keys[1].hash(&mut hasher); - // entity id - let entity_id = poseidon_hash_many(&keys); - entity_id.hash(&mut hasher); - - let hash = hasher.finish(); - (2usize, hash) // Priority 2 for event messages - } - _ => (0, 0), // No parallelization for other events - }; + let (task_priority, task_identifier) = (processor.task_priority(), processor.task_identifier(event)); + // if our event can be parallelized, we add it to the task manager if task_identifier != 0 { - self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push( - ( + self.task_manager.add_parallelized_event( + task_priority, + task_identifier, + ParallelizedEvent { contract_type, - ParallelizedEvent { - event_id: event_id.to_string(), - event: event.clone(), - block_number, - block_timestamp, - }, - ), + event_id: event_id.to_string(), + event: event.clone(), + block_number, + block_timestamp, + }, ); } else { // Process non-parallelized events immediately diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs index 1dbc1b7690..5dabe3fb20 100644 --- a/crates/torii/indexer/src/task_manager.rs +++ b/crates/torii/indexer/src/task_manager.rs @@ -2,11 +2,17 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, }; - +use anyhow::Result; +use dojo_world::contracts::WorldContractReader; +use futures_util::future::try_join_all; use starknet::{core::types::Event, providers::Provider}; -use torii_sqlite::types::ContractType; +use tokio::sync::Semaphore; +use torii_sqlite::{types::ContractType, Sql}; +use tracing::{debug, error}; + +use crate::{engine::Processors, processors::EventProcessorConfig}; -use crate::engine::Processors; +const LOG_TARGET: &str = "torii::indexer::task_manager"; pub type TaskId = u64; type TaskPriority = usize; @@ -21,47 +27,116 @@ pub struct ParallelizedEvent { } pub struct TaskManager { + db: Sql, + world: Arc>, tasks: BTreeMap>>, processors: Arc>, + max_concurrent_tasks: usize, + event_processor_config: EventProcessorConfig, } impl TaskManager

{ - pub fn new(processors: Arc>) -> Self { - Self { tasks: BTreeMap::new(), processors } - } - - pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId { - let event_key = parallelized_event.event.keys[0]; - let processor = self - .processors - .get_event_processor(parallelized_event.contract_type) - .get(&event_key) - .unwrap() - .iter() - .find(|p| p.validate(¶llelized_event.event)) - .unwrap(); - let priority = processor.task_priority(); - let task_id = processor.task_identifier(¶llelized_event.event); - - if task_id != 0 { - self.tasks - .entry(priority) - .or_default() - .entry(task_id) - .or_default() - .push(parallelized_event); + pub fn new( + db: Sql, + world: Arc>, + processors: Arc>, + max_concurrent_tasks: usize, + event_processor_config: EventProcessorConfig, + ) -> Self { + Self { + db, + world, + tasks: BTreeMap::new(), + processors, + max_concurrent_tasks, + event_processor_config, } - - task_id } - pub fn take_tasks( + pub fn add_parallelized_event( &mut self, - ) -> BTreeMap>> { - std::mem::take(&mut self.tasks) + priority: TaskPriority, + task_identifier: TaskId, + parallelized_event: ParallelizedEvent, + ) { + self.tasks + .entry(priority) + .or_default() + .entry(task_identifier) + .or_default() + .push(parallelized_event); } - pub fn is_empty(&self) -> bool { - self.tasks.is_empty() + pub async fn process_tasks( + &mut self + ) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); + + // Process each priority level sequentially + for (priority, task_group) in std::mem::take(&mut self.tasks) { + let mut handles = Vec::new(); + + // Process all tasks within this priority level concurrently + for (task_id, events) in task_group { + let db = self.db.clone(); + let world = self.world.clone(); + let semaphore = semaphore.clone(); + let processors = self.processors.clone(); + let event_processor_config = self.event_processor_config.clone(); + + handles.push(tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + let mut local_db = db.clone(); + + // Process all events for this task sequentially + for ParallelizedEvent { contract_type, event, block_number, block_timestamp, event_id } in events { + let contract_processors = processors.get_event_processor(contract_type); + if let Some(processors) = contract_processors.get(&event.keys[0]) { + let processor = processors + .iter() + .find(|p| p.validate(&event)) + .expect("Must find at least one processor for the event"); + + debug!( + target: LOG_TARGET, + event_name = processor.event_key(), + task_id = %task_id, + priority = %priority, + "Processing parallelized event." + ); + + if let Err(e) = processor + .process( + &world, + &mut local_db, + block_number, + block_timestamp, + &event_id, + &event, + &event_processor_config, + ) + .await + { + error!( + target: LOG_TARGET, + event_name = processor.event_key(), + error = %e, + task_id = %task_id, + priority = %priority, + "Processing parallelized event." + ); + } + } + } + + Ok::<_, anyhow::Error>(()) + })); + } + + // Wait for all tasks in this priority level to complete before moving to next priority + try_join_all(handles).await?; + } + + Ok(()) } } diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index bbfad566db..5b4dd71a1f 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -118,13 +118,13 @@ impl ModelCache { #[derive(Debug)] pub struct LocalCache { - pub erc_cache: HashMap<(ContractType, String), I256>, - pub token_id_registry: HashSet, + pub erc_cache: RwLock>, + pub token_id_registry: RwLock>, } impl Clone for LocalCache { fn clone(&self) -> Self { - Self { erc_cache: HashMap::new(), token_id_registry: self.token_id_registry.clone() } + Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(HashSet::new()) } } } @@ -139,14 +139,14 @@ impl LocalCache { let token_id_registry = token_id_registry.into_iter().map(|token_id| token_id.0).collect(); - Self { erc_cache: HashMap::new(), token_id_registry } + Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(token_id_registry) } } - pub fn contains_token_id(&self, token_id: &str) -> bool { - self.token_id_registry.contains(token_id) + pub async fn contains_token_id(&self, token_id: &str) -> bool { + self.token_id_registry.read().await.contains(token_id) } - pub fn register_token_id(&mut self, token_id: String) { - self.token_id_registry.insert(token_id); + pub async fn register_token_id(&self, token_id: String) { + self.token_id_registry.write().await.insert(token_id); } } diff --git a/crates/torii/sqlite/src/erc.rs b/crates/torii/sqlite/src/erc.rs index f11f7988c1..427a49a8d8 100644 --- a/crates/torii/sqlite/src/erc.rs +++ b/crates/torii/sqlite/src/erc.rs @@ -36,7 +36,7 @@ impl Sql { // contract_address let token_id = felt_to_sql_string(&contract_address); - let token_exists: bool = self.local_cache.contains_token_id(&token_id); + let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; @@ -52,26 +52,25 @@ impl Sql { event_id, )?; - if from_address != Felt::ZERO { - // from_address/contract_address/ - let from_balance_id = felts_to_sql_string(&[from_address, contract_address]); - let from_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC20, from_balance_id)) - .or_default(); - *from_balance -= I256::from(amount); - } - - if to_address != Felt::ZERO { - let to_balance_id = felts_to_sql_string(&[to_address, contract_address]); - let to_balance = - self.local_cache.erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default(); - *to_balance += I256::from(amount); + { + let mut erc_cache = self.local_cache.erc_cache.write().await; + if from_address != Felt::ZERO { + // from_address/contract_address/ + let from_balance_id = felts_to_sql_string(&[from_address, contract_address]); + let from_balance = + erc_cache.entry((ContractType::ERC20, from_balance_id)).or_default(); + *from_balance -= I256::from(amount); + } + + if to_address != Felt::ZERO { + let to_balance_id = felts_to_sql_string(&[to_address, contract_address]); + let to_balance = erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default(); + *to_balance += I256::from(amount); + } } let block_id = BlockId::Number(block_number); - if self.local_cache.erc_cache.len() >= 100000 { + if self.local_cache.erc_cache.read().await.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc20_transfer")?; self.apply_cache_diff(block_id).await?; } @@ -93,7 +92,7 @@ impl Sql { // contract_address:id let actual_token_id = token_id; let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); - let token_exists: bool = self.local_cache.contains_token_id(&token_id); + let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { self.register_erc721_token_metadata(contract_address, &token_id, actual_token_id) @@ -111,30 +110,31 @@ impl Sql { )?; // from_address/contract_address:id - if from_address != Felt::ZERO { - let from_balance_id = - format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&from_address), &token_id); - let from_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC721, from_balance_id)) - .or_default(); - *from_balance -= I256::from(1u8); + { + let mut erc_cache = self.local_cache.erc_cache.write().await; + if from_address != Felt::ZERO { + let from_balance_id = format!( + "{}{SQL_FELT_DELIMITER}{}", + felt_to_sql_string(&from_address), + &token_id + ); + let from_balance = + erc_cache.entry((ContractType::ERC721, from_balance_id)).or_default(); + *from_balance -= I256::from(1u8); + } + + if to_address != Felt::ZERO { + let to_balance_id = + format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); + let to_balance = + erc_cache.entry((ContractType::ERC721, to_balance_id)).or_default(); + *to_balance += I256::from(1u8); + } } - if to_address != Felt::ZERO { - let to_balance_id = - format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); - let to_balance = self - .local_cache - .erc_cache - .entry((ContractType::ERC721, to_balance_id)) - .or_default(); - *to_balance += I256::from(1u8); - } let block_id = BlockId::Number(block_number); - if self.local_cache.erc_cache.len() >= 100000 { + if self.local_cache.erc_cache.read().await.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc721_transfer")?; self.apply_cache_diff(block_id).await?; } @@ -215,7 +215,7 @@ impl Sql { }), ))?; - self.local_cache.register_token_id(token_id.to_string()); + self.local_cache.register_token_id(token_id.to_string()).await; Ok(()) } @@ -240,7 +240,7 @@ impl Sql { // this cache is used while applying the cache diff // so we need to make sure that all RegisterErc*Token queries // are applied before the cache diff is applied - self.local_cache.register_token_id(token_id.to_string()); + self.local_cache.register_token_id(token_id.to_string()).await; Ok(()) } @@ -279,15 +279,13 @@ impl Sql { } pub async fn apply_cache_diff(&mut self, block_id: BlockId) -> Result<()> { - if !self.local_cache.erc_cache.is_empty() { + if !self.local_cache.erc_cache.read().await.is_empty() { + let mut erc_cache = self.local_cache.erc_cache.write().await; self.executor.send(QueryMessage::new( "".to_string(), vec![], QueryType::ApplyBalanceDiff(ApplyBalanceDiffQuery { - erc_cache: mem::replace( - &mut self.local_cache.erc_cache, - HashMap::with_capacity(64), - ), + erc_cache: mem::replace(&mut erc_cache, HashMap::with_capacity(64)), block_id, }), ))?; diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index d8bbcc4dfa..4349b88718 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -43,8 +43,7 @@ pub struct Sql { pub pool: Pool, pub executor: UnboundedSender, model_cache: Arc, - // when SQL struct is cloned a empty local_cache is created - local_cache: LocalCache, + local_cache: Arc, } #[derive(Debug, Clone)] @@ -75,7 +74,12 @@ impl Sql { } let local_cache = LocalCache::new(pool.clone()).await; - let db = Self { pool: pool.clone(), executor, model_cache, local_cache }; + let db = Self { + pool: pool.clone(), + executor, + model_cache, + local_cache: Arc::new(local_cache), + }; db.execute().await?; From dc5af0e42525685734c3a27848f98be092250f88 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 16 Jan 2025 14:54:06 +0700 Subject: [PATCH 3/9] event message parallel --- crates/torii/indexer/src/engine.rs | 3 +- crates/torii/indexer/src/lib.rs | 2 +- .../src/processors/erc20_legacy_transfer.rs | 5 ++-- .../indexer/src/processors/erc20_transfer.rs | 5 ++-- .../src/processors/erc721_legacy_transfer.rs | 3 +- .../indexer/src/processors/erc721_transfer.rs | 8 ++--- .../indexer/src/processors/event_message.rs | 18 ++++++++++-- .../src/processors/store_del_record.rs | 3 +- .../src/processors/store_update_member.rs | 1 + .../indexer/src/processors/upgrade_event.rs | 1 + .../indexer/src/processors/upgrade_model.rs | 1 + crates/torii/indexer/src/task_manager.rs | 29 ++++++++++++------- crates/torii/sqlite/src/cache.rs | 10 +++++-- crates/torii/sqlite/src/lib.rs | 8 ++--- 14 files changed, 59 insertions(+), 38 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 12eef20880..115cb7b880 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -809,7 +809,8 @@ impl Engine

{ .find(|p| p.validate(event)) .expect("Must find atleast one processor for the event"); - let (task_priority, task_identifier) = (processor.task_priority(), processor.task_identifier(event)); + let (task_priority, task_identifier) = + (processor.task_priority(), processor.task_identifier(event)); // if our event can be parallelized, we add it to the task manager if task_identifier != 0 { diff --git a/crates/torii/indexer/src/lib.rs b/crates/torii/indexer/src/lib.rs index e766b08dc0..dcb8787a1e 100644 --- a/crates/torii/indexer/src/lib.rs +++ b/crates/torii/indexer/src/lib.rs @@ -4,8 +4,8 @@ mod constants; #[path = "test.rs"] mod test; -mod task_manager; pub mod engine; pub mod processors; +mod task_manager; pub use engine::Engine; diff --git a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs index 9f01406089..e71069b577 100644 --- a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs @@ -9,9 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; -use crate::task_manager::TaskId; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::TaskId; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; @@ -52,7 +51,7 @@ where // regardless of direction (A->B or B->A) let canonical_pair = std::cmp::max(event.data[0], event.data[1]); canonical_pair.hash(&mut hasher); - + hasher.finish() } diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs index f4d8426dbe..5f6faba8f3 100644 --- a/crates/torii/indexer/src/processors/erc20_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_transfer.rs @@ -9,9 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; -use crate::task_manager::TaskId; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::TaskId; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; @@ -52,7 +51,7 @@ where // regardless of direction (A->B or B->A) let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); canonical_pair.hash(&mut hasher); - + hasher.finish() } diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index f329fbbaa4..de5b20db1f 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -9,9 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; -use crate::task_manager::TaskId; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::TaskId; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index 60a68df784..a802b05eb2 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -1,4 +1,5 @@ use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -8,9 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; -use crate::task_manager::TaskId; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::TaskId; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer"; @@ -49,7 +49,7 @@ where // Take the max of from/to addresses to get a canonical representation // This ensures transfers between the same pair of addresses are grouped together // regardless of direction (A->B or B->A) - let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); canonical_pair.hash(&mut hasher); // For ERC721, we can safely parallelize by token ID since each token is unique @@ -59,7 +59,7 @@ where // 3. The canonical address pair ensures related transfers stay together event.keys[3].hash(&mut hasher); event.keys[4].hash(&mut hasher); - + hasher.finish() } diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index 97954edd18..e127802edd 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -1,10 +1,14 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; +use cainome::cairo_serde::CairoSerde; use dojo_world::contracts::abigen::world::Event as WorldEvent; use dojo_world::contracts::naming::get_tag; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, Felt}; use starknet::providers::Provider; +use starknet_crypto::poseidon_hash_many; use torii_sqlite::Sql; use tracing::info; @@ -32,9 +36,17 @@ where 1 } - fn task_identifier(&self, _event: &Event) -> u64 { - // TODO. for now event messages are not parallelized - 0 + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { + panic!("Expected EventEmitted keys to be well formed: {:?}", e); + }); + // selector + event.keys[1].hash(&mut hasher); + // entity id + let entity_id = poseidon_hash_many(&keys); + entity_id.hash(&mut hasher); + hasher.finish() } async fn process( diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index 943212dcf1..cfb58e7879 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -9,9 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::TaskId; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::TaskId; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record"; diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index c31e1721b9..5fe9dc8e3e 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -1,4 +1,5 @@ use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Context, Error, Result}; use async_trait::async_trait; use dojo_types::schema::{Struct, Ty}; diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index 2840530faa..2884e53021 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -1,4 +1,5 @@ use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 2bedeee8a5..2bd8bccccf 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -1,4 +1,5 @@ use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs index 5dabe3fb20..eb8b113b92 100644 --- a/crates/torii/indexer/src/task_manager.rs +++ b/crates/torii/indexer/src/task_manager.rs @@ -1,16 +1,18 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + use anyhow::Result; use dojo_world::contracts::WorldContractReader; use futures_util::future::try_join_all; -use starknet::{core::types::Event, providers::Provider}; +use starknet::core::types::Event; +use starknet::providers::Provider; use tokio::sync::Semaphore; -use torii_sqlite::{types::ContractType, Sql}; +use torii_sqlite::types::ContractType; +use torii_sqlite::Sql; use tracing::{debug, error}; -use crate::{engine::Processors, processors::EventProcessorConfig}; +use crate::engine::Processors; +use crate::processors::EventProcessorConfig; const LOG_TARGET: &str = "torii::indexer::task_manager"; @@ -67,9 +69,7 @@ impl TaskManager

{ .push(parallelized_event); } - pub async fn process_tasks( - &mut self - ) -> Result<()> { + pub async fn process_tasks(&mut self) -> Result<()> { let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); // Process each priority level sequentially @@ -89,7 +89,14 @@ impl TaskManager

{ let mut local_db = db.clone(); // Process all events for this task sequentially - for ParallelizedEvent { contract_type, event, block_number, block_timestamp, event_id } in events { + for ParallelizedEvent { + contract_type, + event, + block_number, + block_timestamp, + event_id, + } in events + { let contract_processors = processors.get_event_processor(contract_type); if let Some(processors) = contract_processors.get(&event.keys[0]) { let processor = processors diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index 5b4dd71a1f..fce506884c 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -124,7 +124,10 @@ pub struct LocalCache { impl Clone for LocalCache { fn clone(&self) -> Self { - Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(HashSet::new()) } + Self { + erc_cache: RwLock::new(HashMap::new()), + token_id_registry: RwLock::new(HashSet::new()), + } } } @@ -139,7 +142,10 @@ impl LocalCache { let token_id_registry = token_id_registry.into_iter().map(|token_id| token_id.0).collect(); - Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(token_id_registry) } + Self { + erc_cache: RwLock::new(HashMap::new()), + token_id_registry: RwLock::new(token_id_registry), + } } pub async fn contains_token_id(&self, token_id: &str) -> bool { diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index 4349b88718..241ac83191 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -74,12 +74,8 @@ impl Sql { } let local_cache = LocalCache::new(pool.clone()).await; - let db = Self { - pool: pool.clone(), - executor, - model_cache, - local_cache: Arc::new(local_cache), - }; + let db = + Self { pool: pool.clone(), executor, model_cache, local_cache: Arc::new(local_cache) }; db.execute().await?; From f7902f33799497aaee956c10ac37c4482f066d94 Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 16 Jan 2025 15:00:02 +0700 Subject: [PATCH 4/9] dont empty up cache on sql clone --- crates/torii/sqlite/src/cache.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index fce506884c..3d6eea38b8 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -122,15 +122,6 @@ pub struct LocalCache { pub token_id_registry: RwLock>, } -impl Clone for LocalCache { - fn clone(&self) -> Self { - Self { - erc_cache: RwLock::new(HashMap::new()), - token_id_registry: RwLock::new(HashSet::new()), - } - } -} - impl LocalCache { pub async fn new(pool: Pool) -> Self { // read existing token_id's from balances table and cache them From 3490207c1858edf9096f0623e82b61f7c999fceb Mon Sep 17 00:00:00 2001 From: Nasr Date: Thu, 16 Jan 2025 15:45:58 +0700 Subject: [PATCH 5/9] refactor: task priorities & parallelize metadata update --- .../torii/indexer/src/processors/metadata_update.rs | 11 +++++++---- .../torii/indexer/src/processors/store_del_record.rs | 2 +- .../torii/indexer/src/processors/store_set_record.rs | 2 +- .../indexer/src/processors/store_update_member.rs | 2 +- .../indexer/src/processors/store_update_record.rs | 2 +- crates/torii/indexer/src/processors/upgrade_event.rs | 5 ++--- crates/torii/indexer/src/processors/upgrade_model.rs | 5 ++--- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index 017ad8b451..9968e7aada 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use base64::engine::general_purpose; @@ -35,12 +37,13 @@ where } fn task_priority(&self) -> usize { - 1 + 3 } - fn task_identifier(&self, _event: &Event) -> u64 { - // TODO. for now metadata updates are not parallelized - 0 + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() } async fn process( diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index cfb58e7879..57fddfb293 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -31,7 +31,7 @@ where } fn task_priority(&self) -> usize { - 1 + 2 } fn task_identifier(&self, event: &Event) -> TaskId { diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index 2a4c856499..ac493f7d66 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -31,7 +31,7 @@ where } fn task_priority(&self) -> usize { - 1 + 2 } fn task_identifier(&self, event: &Event) -> u64 { diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index 5fe9dc8e3e..d2d073bb4b 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -32,7 +32,7 @@ where } fn task_priority(&self) -> usize { - 1 + 2 } fn task_identifier(&self, event: &Event) -> u64 { diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index 31c2803c66..bf4d28a2b5 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -31,7 +31,7 @@ where } fn task_priority(&self) -> usize { - 1 + 2 } fn task_identifier(&self, event: &Event) -> u64 { diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index 2884e53021..ae75162856 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -33,13 +33,12 @@ where } fn task_priority(&self) -> usize { - 0 + 1 } fn task_identifier(&self, event: &Event) -> u64 { let mut hasher = DefaultHasher::new(); - // event selector - event.keys[1].hash(&mut hasher); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() } diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 2bd8bccccf..974bab1fec 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -33,13 +33,12 @@ where } fn task_priority(&self) -> usize { - 0 + 1 } fn task_identifier(&self, event: &Event) -> u64 { let mut hasher = DefaultHasher::new(); - // model selector - event.keys[1].hash(&mut hasher); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() } From c3272bb61b23b7e2b6a0b234bd687302af6c40d3 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 21 Jan 2025 13:08:17 +0700 Subject: [PATCH 6/9] cleanup code and sequential task id --- crates/torii/indexer/src/engine.rs | 4 ++-- .../torii/indexer/src/processors/erc20_legacy_transfer.rs | 4 ++-- crates/torii/indexer/src/processors/erc20_transfer.rs | 4 ++-- .../indexer/src/processors/erc721_legacy_transfer.rs | 4 ++-- crates/torii/indexer/src/processors/erc721_transfer.rs | 4 ++-- crates/torii/indexer/src/processors/event_message.rs | 6 ++++-- crates/torii/indexer/src/processors/mod.rs | 4 ++-- crates/torii/indexer/src/processors/raw_event.rs | 8 +++++--- crates/torii/indexer/src/processors/register_event.rs | 6 ++++-- crates/torii/indexer/src/processors/store_del_record.rs | 4 ++-- crates/torii/indexer/src/processors/store_set_record.rs | 6 ++++-- .../torii/indexer/src/processors/store_update_member.rs | 8 ++++++-- .../torii/indexer/src/processors/store_update_record.rs | 8 ++++++-- crates/torii/indexer/src/processors/upgrade_event.rs | 6 ++++-- crates/torii/indexer/src/processors/upgrade_model.rs | 6 ++++-- crates/torii/indexer/src/task_manager.rs | 4 +++- 16 files changed, 54 insertions(+), 32 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 115cb7b880..815849184d 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -46,7 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor; use crate::processors::{ BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor, }; -use crate::task_manager::{ParallelizedEvent, TaskManager}; +use crate::task_manager::{self, ParallelizedEvent, TaskManager}; type EventProcessorMap

= HashMap>>>; @@ -813,7 +813,7 @@ impl Engine

{ (processor.task_priority(), processor.task_identifier(event)); // if our event can be parallelized, we add it to the task manager - if task_identifier != 0 { + if task_identifier != task_manager::TASK_ID_SEQUENTIAL { self.task_manager.add_parallelized_event( task_priority, task_identifier, diff --git a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs index e71069b577..6783162419 100644 --- a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs @@ -10,7 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; @@ -37,7 +37,7 @@ where false } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs index 5f6faba8f3..98fb907bb0 100644 --- a/crates/torii/indexer/src/processors/erc20_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_transfer.rs @@ -10,7 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; @@ -37,7 +37,7 @@ where false } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index de5b20db1f..e02304ed9c 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -10,7 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; @@ -37,7 +37,7 @@ where false } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index a802b05eb2..4a0383a4f1 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -10,7 +10,7 @@ use torii_sqlite::Sql; use tracing::debug; use super::{EventProcessor, EventProcessorConfig}; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer"; @@ -37,7 +37,7 @@ where false } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index e127802edd..ab814ae3ee 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -12,6 +12,8 @@ use starknet_crypto::poseidon_hash_many; use torii_sqlite::Sql; use tracing::info; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::event_message"; @@ -32,11 +34,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); let keys = Vec::::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { panic!("Expected EventEmitted keys to be well formed: {:?}", e); diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs index 3044577bd0..420dd798a7 100644 --- a/crates/torii/indexer/src/processors/mod.rs +++ b/crates/torii/indexer/src/processors/mod.rs @@ -7,7 +7,7 @@ use starknet::core::types::{Event, Felt, Transaction}; use starknet::providers::Provider; use torii_sqlite::Sql; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub mod erc20_legacy_transfer; pub mod erc20_transfer; @@ -55,7 +55,7 @@ where fn validate(&self, event: &Event) -> bool; - fn task_priority(&self) -> usize; + fn task_priority(&self) -> TaskPriority; fn task_identifier(&self, event: &Event) -> TaskId; #[allow(clippy::too_many_arguments)] diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs index 262d76cce8..7e99356ac4 100644 --- a/crates/torii/indexer/src/processors/raw_event.rs +++ b/crates/torii/indexer/src/processors/raw_event.rs @@ -5,6 +5,8 @@ use starknet::core::types::Event; use starknet::providers::Provider; use torii_sqlite::Sql; +use crate::task_manager::{self, TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; #[derive(Default, Debug)] @@ -23,13 +25,13 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } - fn task_identifier(&self, _event: &Event) -> u64 { + fn task_identifier(&self, _event: &Event) -> TaskId { // TODO. for now raw events are not parallelized - 0 + task_manager::TASK_ID_SEQUENTIAL } async fn process( diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs index f109f2ddd4..a19ceb0083 100644 --- a/crates/torii/indexer/src/processors/register_event.rs +++ b/crates/torii/indexer/src/processors/register_event.rs @@ -10,6 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_event"; @@ -32,11 +34,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 0 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index 57fddfb293..d3256c0ef1 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -10,7 +10,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; -use crate::task_manager::TaskId; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record"; @@ -30,7 +30,7 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 2 } diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index ac493f7d66..3f73b9a46a 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -10,6 +10,8 @@ use torii_sqlite::utils::felts_to_sql_string; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_set_record"; @@ -30,11 +32,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 2 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index d2d073bb4b..eb168c4eaa 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -11,6 +11,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_member"; @@ -31,13 +33,15 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 2 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); + // model selector event.keys[1].hash(&mut hasher); + // entity id event.keys[2].hash(&mut hasher); hasher.finish() } diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index bf4d28a2b5..de786f6752 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -10,6 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_record"; @@ -30,13 +32,15 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 2 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); + // model selector event.keys[1].hash(&mut hasher); + // entity id event.keys[2].hash(&mut hasher); hasher.finish() } diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index ae75162856..d07d74dfdf 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -10,6 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_event"; @@ -32,11 +34,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 974bab1fec..9c307b281b 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -10,6 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_model"; @@ -32,11 +34,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 1 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs index eb8b113b92..9a7c4a4ed9 100644 --- a/crates/torii/indexer/src/task_manager.rs +++ b/crates/torii/indexer/src/task_manager.rs @@ -14,10 +14,12 @@ use tracing::{debug, error}; use crate::engine::Processors; use crate::processors::EventProcessorConfig; +pub const TASK_ID_SEQUENTIAL: TaskId = 0; + const LOG_TARGET: &str = "torii::indexer::task_manager"; pub type TaskId = u64; -type TaskPriority = usize; +pub type TaskPriority = usize; #[derive(Debug)] pub struct ParallelizedEvent { From a046def64392ef69b21f2fa101dececf9b0b9c0d Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 21 Jan 2025 13:09:42 +0700 Subject: [PATCH 7/9] metadata update parallelized --- crates/torii/indexer/src/processors/metadata_update.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index 9968e7aada..d7ebc41e7e 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -16,6 +16,8 @@ use torii_sqlite::utils::fetch_content_from_ipfs; use torii_sqlite::Sql; use tracing::{error, info}; +use crate::task_manager::{TaskId, TaskPriority}; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::metadata_update"; @@ -36,11 +38,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 3 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() From 4d506b91cf440c041e8f3764cf873c647194edfd Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 21 Jan 2025 13:10:16 +0700 Subject: [PATCH 8/9] fmt --- crates/torii/indexer/src/processors/event_message.rs | 3 +-- crates/torii/indexer/src/processors/metadata_update.rs | 3 +-- crates/torii/indexer/src/processors/raw_event.rs | 3 +-- crates/torii/indexer/src/processors/register_event.rs | 3 +-- crates/torii/indexer/src/processors/store_set_record.rs | 3 +-- crates/torii/indexer/src/processors/store_update_member.rs | 3 +-- crates/torii/indexer/src/processors/store_update_record.rs | 3 +-- crates/torii/indexer/src/processors/upgrade_event.rs | 3 +-- crates/torii/indexer/src/processors/upgrade_model.rs | 3 +-- 9 files changed, 9 insertions(+), 18 deletions(-) diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index ab814ae3ee..f8c69f471e 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -12,9 +12,8 @@ use starknet_crypto::poseidon_hash_many; use torii_sqlite::Sql; use tracing::info; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::event_message"; diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index d7ebc41e7e..b56431d8b5 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -16,9 +16,8 @@ use torii_sqlite::utils::fetch_content_from_ipfs; use torii_sqlite::Sql; use tracing::{error, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::metadata_update"; diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs index 7e99356ac4..8614b92695 100644 --- a/crates/torii/indexer/src/processors/raw_event.rs +++ b/crates/torii/indexer/src/processors/raw_event.rs @@ -5,9 +5,8 @@ use starknet::core::types::Event; use starknet::providers::Provider; use torii_sqlite::Sql; -use crate::task_manager::{self, TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{self, TaskId, TaskPriority}; #[derive(Default, Debug)] pub struct RawEventProcessor; diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs index a19ceb0083..df4a536625 100644 --- a/crates/torii/indexer/src/processors/register_event.rs +++ b/crates/torii/indexer/src/processors/register_event.rs @@ -10,9 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_event"; diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index 3f73b9a46a..d508013381 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -10,9 +10,8 @@ use torii_sqlite::utils::felts_to_sql_string; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_set_record"; diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index eb168c4eaa..6c45c66154 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -11,9 +11,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_member"; diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index de786f6752..05984ecd72 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -10,9 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_record"; diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index d07d74dfdf..3b9597b3ad 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -10,9 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_event"; diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 9c307b281b..fe46304b72 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -10,9 +10,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; -use crate::task_manager::{TaskId, TaskPriority}; - use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_model"; From 0c82bba936bef7251693aa526afd0ec298c294c3 Mon Sep 17 00:00:00 2001 From: glihm Date: Tue, 21 Jan 2025 21:21:21 -0600 Subject: [PATCH 9/9] fix: use TaskPriority and TaskId for register_model --- crates/torii/indexer/src/processors/register_model.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/torii/indexer/src/processors/register_model.rs b/crates/torii/indexer/src/processors/register_model.rs index cae4772ff1..dae000613a 100644 --- a/crates/torii/indexer/src/processors/register_model.rs +++ b/crates/torii/indexer/src/processors/register_model.rs @@ -11,6 +11,7 @@ use torii_sqlite::Sql; use tracing::{debug, info}; use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{TaskId, TaskPriority}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_model"; @@ -32,11 +33,11 @@ where true } - fn task_priority(&self) -> usize { + fn task_priority(&self) -> TaskPriority { 0 } - fn task_identifier(&self, event: &Event) -> u64 { + fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish()