diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 3e7f1c5332..815849184d 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -1,15 +1,14 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::hash::Hash; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use bitflags::bitflags; -use cainome::cairo_serde::CairoSerde; use dojo_utils::provider as provider_utils; use dojo_world::contracts::world::WorldContractReader; -use futures_util::future::{join_all, try_join_all}; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, @@ -18,7 +17,7 @@ use starknet::core::types::{ }; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; -use starknet_crypto::{poseidon_hash_many, Felt}; +use starknet_crypto::Felt; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; @@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor; use crate::processors::{ BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor, }; +use crate::task_manager::{self, ParallelizedEvent, TaskManager}; type EventProcessorMap
= HashMap ,
contracts: Arc {
let contracts = Arc::new(
contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(),
);
+ let world = Arc::new(world);
+ let processors = Arc::new(processors);
+ let max_concurrent_tasks = config.max_concurrent_tasks;
+ let event_processor_config = config.event_processor_config.clone();
Self {
- world: Arc::new(world),
- db,
+ world: world.clone(),
+ db: db.clone(),
provider: Arc::new(provider),
- processors: Arc::new(processors),
+ processors: processors.clone(),
config,
shutdown_tx,
block_tx,
contracts,
- tasks: BTreeMap::new(),
+ task_manager: TaskManager::new(
+ db,
+ world,
+ processors,
+ max_concurrent_tasks,
+ event_processor_config,
+ ),
}
}
@@ -542,7 +541,7 @@ impl {
}
// Process parallelized events
- self.process_tasks().await?;
+ self.task_manager.process_tasks().await?;
self.db.update_cursors(
data.block_number - 1,
@@ -589,7 +588,7 @@ impl {
}
// Process parallelized events
- self.process_tasks().await?;
+ self.task_manager.process_tasks().await?;
let last_block_timestamp =
get_block_timestamp(&self.provider, data.latest_block_number).await?;
@@ -599,77 +598,6 @@ impl {
Ok(())
}
- async fn process_tasks(&mut self) -> Result<()> {
- let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));
-
- // Process each priority level sequentially
- for (priority, task_group) in std::mem::take(&mut self.tasks) {
- let mut handles = Vec::new();
-
- // Process all tasks within this priority level concurrently
- for (task_id, events) in task_group {
- let db = self.db.clone();
- let world = self.world.clone();
- let semaphore = semaphore.clone();
- let processors = self.processors.clone();
- let event_processor_config = self.config.event_processor_config.clone();
-
- handles.push(tokio::spawn(async move {
- let _permit = semaphore.acquire().await?;
- let mut local_db = db.clone();
-
- // Process all events for this task sequentially
- for (contract_type, event) in events {
- let contract_processors = processors.get_event_processor(contract_type);
- if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
- let processor = processors
- .iter()
- .find(|p| p.validate(&event.event))
- .expect("Must find at least one processor for the event");
-
- debug!(
- target: LOG_TARGET,
- event_name = processor.event_key(),
- task_id = %task_id,
- priority = %priority,
- "Processing parallelized event."
- );
-
- if let Err(e) = processor
- .process(
- &world,
- &mut local_db,
- event.block_number,
- event.block_timestamp,
- &event.event_id,
- &event.event,
- &event_processor_config,
- )
- .await
- {
- error!(
- target: LOG_TARGET,
- event_name = processor.event_key(),
- error = %e,
- task_id = %task_id,
- priority = %priority,
- "Processing parallelized event."
- );
- }
- }
- }
-
- Ok::<_, anyhow::Error>(())
- }));
- }
-
- // Wait for all tasks in this priority level to complete before moving to next priority
- try_join_all(handles).await?;
- }
-
- Ok(())
- }
-
async fn process_transaction_with_events(
&mut self,
transaction_hash: Felt,
@@ -881,50 +809,21 @@ impl {
.find(|p| p.validate(event))
.expect("Must find atleast one processor for the event");
- let (task_priority, task_identifier) = match processor.event_key().as_str() {
- "ModelRegistered" | "EventRegistered" => {
- let mut hasher = DefaultHasher::new();
- event.keys.iter().for_each(|k| k.hash(&mut hasher));
- let hash = hasher.finish();
- (0usize, hash) // Priority 0 (highest) for model/event registration
- }
- "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
- let mut hasher = DefaultHasher::new();
- event.keys[1].hash(&mut hasher);
- event.keys[2].hash(&mut hasher);
- let hash = hasher.finish();
- (2usize, hash) // Priority 2 (lower) for store operations
- }
- "EventEmitted" => {
- let mut hasher = DefaultHasher::new();
-
- let keys = Vec:: ,
diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs
index a0643abd41..98fb907bb0 100644
--- a/crates/torii/indexer/src/processors/erc20_transfer.rs
+++ b/crates/torii/indexer/src/processors/erc20_transfer.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
@@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer";
@@ -34,6 +37,24 @@ where
false
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ // Hash the event key (Transfer)
+ event.keys[0].hash(&mut hasher);
+
+ // Take the max of from/to addresses to get a canonical representation
+ // This ensures transfers between the same pair of addresses are grouped together
+ // regardless of direction (A->B or B->A)
+ let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
+ canonical_pair.hash(&mut hasher);
+
+ hasher.finish()
+ }
+
async fn process(
&self,
world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
index df6b2a88de..e02304ed9c 100644
--- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
+++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
@@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer";
@@ -34,6 +37,32 @@ where
false
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ // Hash the event key (Transfer)
+ event.keys[0].hash(&mut hasher);
+
+ // Take the max of from/to addresses to get a canonical representation
+ // This ensures transfers between the same pair of addresses are grouped together
+ // regardless of direction (A->B or B->A)
+ let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
+ canonical_pair.hash(&mut hasher);
+
+ // For ERC721, we can safely parallelize by token ID since each token is unique
+ // and can only be owned by one address at a time. This means:
+ // 1. Transfers of different tokens can happen in parallel
+ // 2. Multiple transfers of the same token must be sequential
+ // 3. The canonical address pair ensures related transfers stay together
+ event.data[2].hash(&mut hasher);
+ event.data[3].hash(&mut hasher);
+
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs
index faf124360b..4a0383a4f1 100644
--- a/crates/torii/indexer/src/processors/erc721_transfer.rs
+++ b/crates/torii/indexer/src/processors/erc721_transfer.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
@@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer";
@@ -34,6 +37,32 @@ where
false
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ // Hash the event key (Transfer)
+ event.keys[0].hash(&mut hasher);
+
+ // Take the max of from/to addresses to get a canonical representation
+ // This ensures transfers between the same pair of addresses are grouped together
+ // regardless of direction (A->B or B->A)
+ let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
+ canonical_pair.hash(&mut hasher);
+
+ // For ERC721, we can safely parallelize by token ID since each token is unique
+ // and can only be owned by one address at a time. This means:
+ // 1. Transfers of different tokens can happen in parallel
+ // 2. Multiple transfers of the same token must be sequential
+ // 3. The canonical address pair ensures related transfers stay together
+ event.keys[3].hash(&mut hasher);
+ event.keys[4].hash(&mut hasher);
+
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs
index 4495665bed..f8c69f471e 100644
--- a/crates/torii/indexer/src/processors/event_message.rs
+++ b/crates/torii/indexer/src/processors/event_message.rs
@@ -1,14 +1,19 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
+use cainome::cairo_serde::CairoSerde;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
use dojo_world::contracts::naming::get_tag;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, Felt};
use starknet::providers::Provider;
+use starknet_crypto::poseidon_hash_many;
use torii_sqlite::Sql;
use tracing::info;
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::event_message";
@@ -28,6 +33,23 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ let keys = Vec:: ,
diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs
index c4b71c4a52..a3d40eb5cb 100644
--- a/crates/torii/indexer/src/processors/metadata_update.rs
+++ b/crates/torii/indexer/src/processors/metadata_update.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use base64::engine::general_purpose;
@@ -14,6 +16,7 @@ use torii_sqlite::Sql;
use tracing::{error, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::metadata_update";
@@ -33,6 +36,16 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 3
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys.iter().for_each(|k| k.hash(&mut hasher));
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs
index abe358b6e7..420dd798a7 100644
--- a/crates/torii/indexer/src/processors/mod.rs
+++ b/crates/torii/indexer/src/processors/mod.rs
@@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction};
use starknet::providers::Provider;
use torii_sqlite::Sql;
+use crate::task_manager::{TaskId, TaskPriority};
+
pub mod erc20_legacy_transfer;
pub mod erc20_transfer;
pub mod erc721_legacy_transfer;
@@ -53,6 +55,9 @@ where
fn validate(&self, event: &Event) -> bool;
+ fn task_priority(&self) -> TaskPriority;
+ fn task_identifier(&self, event: &Event) -> TaskId;
+
#[allow(clippy::too_many_arguments)]
async fn process(
&self,
diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs
index c30a918fa2..8614b92695 100644
--- a/crates/torii/indexer/src/processors/raw_event.rs
+++ b/crates/torii/indexer/src/processors/raw_event.rs
@@ -6,6 +6,7 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{self, TaskId, TaskPriority};
#[derive(Default, Debug)]
pub struct RawEventProcessor;
@@ -23,6 +24,15 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, _event: &Event) -> TaskId {
+ // TODO. for now raw events are not parallelized
+ task_manager::TASK_ID_SEQUENTIAL
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs
index e9c94f296a..df4a536625 100644
--- a/crates/torii/indexer/src/processors/register_event.rs
+++ b/crates/torii/indexer/src/processors/register_event.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_event";
@@ -30,6 +33,16 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 0
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys.iter().for_each(|k| k.hash(&mut hasher));
+ hasher.finish()
+ }
+
async fn process(
&self,
world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/register_model.rs b/crates/torii/indexer/src/processors/register_model.rs
index d630feff88..dae000613a 100644
--- a/crates/torii/indexer/src/processors/register_model.rs
+++ b/crates/torii/indexer/src/processors/register_model.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::register_model";
@@ -30,6 +33,16 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 0
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys.iter().for_each(|k| k.hash(&mut hasher));
+ hasher.finish()
+ }
+
async fn process(
&self,
world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs
index c129e7e7a9..d3256c0ef1 100644
--- a/crates/torii/indexer/src/processors/store_del_record.rs
+++ b/crates/torii/indexer/src/processors/store_del_record.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record";
@@ -27,6 +30,17 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 2
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys[1].hash(&mut hasher);
+ event.keys[2].hash(&mut hasher);
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs
index 1b30b24016..d508013381 100644
--- a/crates/torii/indexer/src/processors/store_set_record.rs
+++ b/crates/torii/indexer/src/processors/store_set_record.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_set_record";
@@ -28,6 +31,17 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 2
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys[1].hash(&mut hasher);
+ event.keys[2].hash(&mut hasher);
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs
index 30fd10882e..6c45c66154 100644
--- a/crates/torii/indexer/src/processors/store_update_member.rs
+++ b/crates/torii/indexer/src/processors/store_update_member.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_types::schema::{Struct, Ty};
@@ -10,6 +12,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_member";
@@ -29,6 +32,19 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 2
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ // model selector
+ event.keys[1].hash(&mut hasher);
+ // entity id
+ event.keys[2].hash(&mut hasher);
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs
index a76f40293e..05984ecd72 100644
--- a/crates/torii/indexer/src/processors/store_update_record.rs
+++ b/crates/torii/indexer/src/processors/store_update_record.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_types::schema::Ty;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_update_record";
@@ -28,6 +31,19 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 2
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ // model selector
+ event.keys[1].hash(&mut hasher);
+ // entity id
+ event.keys[2].hash(&mut hasher);
+ hasher.finish()
+ }
+
async fn process(
&self,
_world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs
index ba7966d63b..3b9597b3ad 100644
--- a/crates/torii/indexer/src/processors/upgrade_event.rs
+++ b/crates/torii/indexer/src/processors/upgrade_event.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_event";
@@ -30,6 +33,16 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys.iter().for_each(|k| k.hash(&mut hasher));
+ hasher.finish()
+ }
+
async fn process(
&self,
world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs
index 40717df30d..fe46304b72 100644
--- a/crates/torii/indexer/src/processors/upgrade_model.rs
+++ b/crates/torii/indexer/src/processors/upgrade_model.rs
@@ -1,3 +1,5 @@
+use std::hash::{DefaultHasher, Hash, Hasher};
+
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
@@ -9,6 +11,7 @@ use torii_sqlite::Sql;
use tracing::{debug, info};
use super::{EventProcessor, EventProcessorConfig};
+use crate::task_manager::{TaskId, TaskPriority};
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::upgrade_model";
@@ -30,6 +33,16 @@ where
true
}
+ fn task_priority(&self) -> TaskPriority {
+ 1
+ }
+
+ fn task_identifier(&self, event: &Event) -> TaskId {
+ let mut hasher = DefaultHasher::new();
+ event.keys.iter().for_each(|k| k.hash(&mut hasher));
+ hasher.finish()
+ }
+
async fn process(
&self,
world: &WorldContractReader ,
diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs
new file mode 100644
index 0000000000..9a7c4a4ed9
--- /dev/null
+++ b/crates/torii/indexer/src/task_manager.rs
@@ -0,0 +1,151 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::Arc;
+
+use anyhow::Result;
+use dojo_world::contracts::WorldContractReader;
+use futures_util::future::try_join_all;
+use starknet::core::types::Event;
+use starknet::providers::Provider;
+use tokio::sync::Semaphore;
+use torii_sqlite::types::ContractType;
+use torii_sqlite::Sql;
+use tracing::{debug, error};
+
+use crate::engine::Processors;
+use crate::processors::EventProcessorConfig;
+
+pub const TASK_ID_SEQUENTIAL: TaskId = 0;
+
+const LOG_TARGET: &str = "torii::indexer::task_manager";
+
+pub type TaskId = u64;
+pub type TaskPriority = usize;
+
+#[derive(Debug)]
+pub struct ParallelizedEvent {
+ pub contract_type: ContractType,
+ pub block_number: u64,
+ pub block_timestamp: u64,
+ pub event_id: String,
+ pub event: Event,
+}
+
+pub struct TaskManager {
+ pub fn new(
+ db: Sql,
+ world: Arc