-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii-indexer): task manager & parallelize erc transfers #2913
Changes from all commits
4bebac9
49e91b0
dc5af0e
f7902f3
8c2c497
e2f4860
3490207
c3272bb
a046def
4d506b9
0c82bba
2a65785
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,14 @@ | ||
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; | ||
use std::fmt::Debug; | ||
use std::hash::{DefaultHasher, Hash, Hasher}; | ||
use std::hash::Hash; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
use anyhow::Result; | ||
use bitflags::bitflags; | ||
use cainome::cairo_serde::CairoSerde; | ||
use dojo_utils::provider as provider_utils; | ||
use dojo_world::contracts::world::WorldContractReader; | ||
use futures_util::future::{join_all, try_join_all}; | ||
use futures_util::future::join_all; | ||
use hashlink::LinkedHashMap; | ||
use starknet::core::types::{ | ||
BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, | ||
|
@@ -18,7 +17,7 @@ use starknet::core::types::{ | |
}; | ||
use starknet::core::utils::get_selector_from_name; | ||
use starknet::providers::Provider; | ||
use starknet_crypto::{poseidon_hash_many, Felt}; | ||
use starknet_crypto::Felt; | ||
use tokio::sync::broadcast::Sender; | ||
use tokio::sync::mpsc::Sender as BoundedSender; | ||
use tokio::sync::Semaphore; | ||
|
@@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor; | |
use crate::processors::{ | ||
BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor, | ||
}; | ||
use crate::task_manager::{self, ParallelizedEvent, TaskManager}; | ||
|
||
type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>; | ||
|
||
|
@@ -200,17 +200,6 @@ pub struct FetchPendingResult { | |
pub block_number: u64, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct ParallelizedEvent { | ||
pub block_number: u64, | ||
pub block_timestamp: u64, | ||
pub event_id: String, | ||
pub event: Event, | ||
} | ||
|
||
type TaskPriority = usize; | ||
type TaskId = u64; | ||
|
||
#[allow(missing_debug_implementations)] | ||
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> { | ||
world: Arc<WorldContractReader<P>>, | ||
|
@@ -220,7 +209,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> { | |
config: EngineConfig, | ||
shutdown_tx: Sender<()>, | ||
block_tx: Option<BoundedSender<u64>>, | ||
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>, | ||
task_manager: TaskManager<P>, | ||
contracts: Arc<HashMap<Felt, ContractType>>, | ||
} | ||
|
||
|
@@ -244,17 +233,27 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
let contracts = Arc::new( | ||
contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(), | ||
); | ||
let world = Arc::new(world); | ||
let processors = Arc::new(processors); | ||
let max_concurrent_tasks = config.max_concurrent_tasks; | ||
let event_processor_config = config.event_processor_config.clone(); | ||
|
||
Self { | ||
world: Arc::new(world), | ||
db, | ||
world: world.clone(), | ||
db: db.clone(), | ||
provider: Arc::new(provider), | ||
processors: Arc::new(processors), | ||
processors: processors.clone(), | ||
config, | ||
shutdown_tx, | ||
block_tx, | ||
contracts, | ||
tasks: BTreeMap::new(), | ||
task_manager: TaskManager::new( | ||
db, | ||
world, | ||
processors, | ||
max_concurrent_tasks, | ||
event_processor_config, | ||
), | ||
} | ||
} | ||
|
||
|
@@ -542,7 +541,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
} | ||
|
||
// Process parallelized events | ||
self.process_tasks().await?; | ||
self.task_manager.process_tasks().await?; | ||
|
||
self.db.update_cursors( | ||
data.block_number - 1, | ||
|
@@ -589,7 +588,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
} | ||
|
||
// Process parallelized events | ||
self.process_tasks().await?; | ||
self.task_manager.process_tasks().await?; | ||
|
||
let last_block_timestamp = | ||
get_block_timestamp(&self.provider, data.latest_block_number).await?; | ||
|
@@ -599,77 +598,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
Ok(()) | ||
} | ||
|
||
async fn process_tasks(&mut self) -> Result<()> { | ||
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); | ||
|
||
// Process each priority level sequentially | ||
for (priority, task_group) in std::mem::take(&mut self.tasks) { | ||
let mut handles = Vec::new(); | ||
|
||
// Process all tasks within this priority level concurrently | ||
for (task_id, events) in task_group { | ||
let db = self.db.clone(); | ||
let world = self.world.clone(); | ||
let semaphore = semaphore.clone(); | ||
let processors = self.processors.clone(); | ||
let event_processor_config = self.config.event_processor_config.clone(); | ||
|
||
handles.push(tokio::spawn(async move { | ||
let _permit = semaphore.acquire().await?; | ||
let mut local_db = db.clone(); | ||
|
||
// Process all events for this task sequentially | ||
for (contract_type, event) in events { | ||
let contract_processors = processors.get_event_processor(contract_type); | ||
if let Some(processors) = contract_processors.get(&event.event.keys[0]) { | ||
let processor = processors | ||
.iter() | ||
.find(|p| p.validate(&event.event)) | ||
.expect("Must find at least one processor for the event"); | ||
|
||
debug!( | ||
target: LOG_TARGET, | ||
event_name = processor.event_key(), | ||
task_id = %task_id, | ||
priority = %priority, | ||
"Processing parallelized event." | ||
); | ||
|
||
if let Err(e) = processor | ||
.process( | ||
&world, | ||
&mut local_db, | ||
event.block_number, | ||
event.block_timestamp, | ||
&event.event_id, | ||
&event.event, | ||
&event_processor_config, | ||
) | ||
.await | ||
{ | ||
error!( | ||
target: LOG_TARGET, | ||
event_name = processor.event_key(), | ||
error = %e, | ||
task_id = %task_id, | ||
priority = %priority, | ||
"Processing parallelized event." | ||
); | ||
} | ||
} | ||
} | ||
|
||
Ok::<_, anyhow::Error>(()) | ||
})); | ||
} | ||
|
||
// Wait for all tasks in this priority level to complete before moving to next priority | ||
try_join_all(handles).await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn process_transaction_with_events( | ||
&mut self, | ||
transaction_hash: Felt, | ||
|
@@ -881,50 +809,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
.find(|p| p.validate(event)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix code formatting issue. Ohayo, sensei! The pipeline indicates a long line that needs to be split. Let's fix that! - let processor = processors.iter().find(|p| p.validate(event)).expect("Must find atleast one processor for the event");
+ let processor = processors
+ .iter()
+ .find(|p| p.validate(event))
+ .expect("Must find atleast one processor for the event");
🧰 Tools🪛 GitHub Actions: ci[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines |
||
.expect("Must find atleast one processor for the event"); | ||
|
||
let (task_priority, task_identifier) = match processor.event_key().as_str() { | ||
"ModelRegistered" | "EventRegistered" => { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys.iter().for_each(|k| k.hash(&mut hasher)); | ||
let hash = hasher.finish(); | ||
(0usize, hash) // Priority 0 (highest) for model/event registration | ||
} | ||
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
let hash = hasher.finish(); | ||
(2usize, hash) // Priority 2 (lower) for store operations | ||
} | ||
"EventEmitted" => { | ||
let mut hasher = DefaultHasher::new(); | ||
|
||
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { | ||
panic!("Expected EventEmitted keys to be well formed: {:?}", e); | ||
}); | ||
|
||
// selector | ||
event.keys[1].hash(&mut hasher); | ||
// entity id | ||
let entity_id = poseidon_hash_many(&keys); | ||
entity_id.hash(&mut hasher); | ||
let (task_priority, task_identifier) = | ||
(processor.task_priority(), processor.task_identifier(event)); | ||
|
||
let hash = hasher.finish(); | ||
(2usize, hash) // Priority 2 for event messages | ||
} | ||
_ => (0, 0), // No parallelization for other events | ||
}; | ||
|
||
if task_identifier != 0 { | ||
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push( | ||
( | ||
// if our event can be parallelized, we add it to the task manager | ||
if task_identifier != task_manager::TASK_ID_SEQUENTIAL { | ||
self.task_manager.add_parallelized_event( | ||
task_priority, | ||
task_identifier, | ||
ParallelizedEvent { | ||
contract_type, | ||
ParallelizedEvent { | ||
event_id: event_id.to_string(), | ||
event: event.clone(), | ||
block_number, | ||
block_timestamp, | ||
}, | ||
), | ||
event_id: event_id.to_string(), | ||
event: event.clone(), | ||
block_number, | ||
block_timestamp, | ||
}, | ||
); | ||
} else { | ||
// Process non-parallelized events immediately | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,5 +6,6 @@ mod test; | |
|
||
pub mod engine; | ||
pub mod processors; | ||
mod task_manager; | ||
|
||
pub use engine::Engine; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use std::hash::{DefaultHasher, Hash, Hasher}; | ||
|
||
use anyhow::Error; | ||
use async_trait::async_trait; | ||
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; | ||
|
@@ -8,6 +10,7 @@ use torii_sqlite::Sql; | |
use tracing::debug; | ||
|
||
use super::{EventProcessor, EventProcessorConfig}; | ||
use crate::task_manager::{TaskId, TaskPriority}; | ||
|
||
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; | ||
|
||
|
@@ -34,6 +37,24 @@ where | |
false | ||
} | ||
|
||
fn task_priority(&self) -> TaskPriority { | ||
1 | ||
} | ||
|
||
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.data[0], event.data[1]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} | ||
Comment on lines
+44
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add bounds checking for data array access, sensei! The current implementation assumes fn task_identifier(&self, event: &Event) -> TaskId {
+ if event.data.len() < 2 {
+ let mut hasher = DefaultHasher::new();
+ event.hash(&mut hasher);
+ return hasher.finish();
+ }
let mut hasher = DefaultHasher::new();
event.keys[0].hash(&mut hasher);
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}
|
||
|
||
async fn process( | ||
&self, | ||
world: &WorldContractReader<P>, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,3 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use std::hash::{DefaultHasher, Hash, Hasher}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use anyhow::Error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use async_trait::async_trait; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -8,6 +10,7 @@ use torii_sqlite::Sql; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tracing::debug; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use super::{EventProcessor, EventProcessorConfig}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
use crate::task_manager::{TaskId, TaskPriority}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -34,6 +37,24 @@ where | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn task_priority(&self) -> TaskPriority { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn task_identifier(&self, event: &Event) -> TaskId { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut hasher = DefaultHasher::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Hash the event key (Transfer) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.keys[0].hash(&mut hasher); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Take the max of from/to addresses to get a canonical representation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// This ensures transfers between the same pair of addresses are grouped together | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// regardless of direction (A->B or B->A) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
canonical_pair.hash(&mut hasher); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
hasher.finish() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+44
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add bounds checking for array access, sensei! The current implementation assumes fn task_identifier(&self, event: &Event) -> TaskId {
+ if event.keys.len() < 3 {
+ let mut hasher = DefaultHasher::new();
+ event.hash(&mut hasher);
+ return hasher.finish();
+ }
let mut hasher = DefaultHasher::new();
event.keys[0].hash(&mut hasher);
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn process( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
world: &WorldContractReader<P>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
use std::hash::{DefaultHasher, Hash, Hasher}; | ||
|
||
use anyhow::Error; | ||
use async_trait::async_trait; | ||
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; | ||
|
@@ -8,6 +10,7 @@ use torii_sqlite::Sql; | |
use tracing::debug; | ||
|
||
use super::{EventProcessor, EventProcessorConfig}; | ||
use crate::task_manager::{TaskId, TaskPriority}; | ||
|
||
pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; | ||
|
||
|
@@ -34,6 +37,32 @@ where | |
false | ||
} | ||
|
||
fn task_priority(&self) -> TaskPriority { | ||
1 | ||
} | ||
|
||
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.data[0], event.data[1]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
// For ERC721, we can safely parallelize by token ID since each token is unique | ||
// and can only be owned by one address at a time. This means: | ||
// 1. Transfers of different tokens can happen in parallel | ||
// 2. Multiple transfers of the same token must be sequential | ||
// 3. The canonical address pair ensures related transfers stay together | ||
event.data[2].hash(&mut hasher); | ||
event.data[3].hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} | ||
Comment on lines
+44
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add bounds checking and consider sharing code with non-legacy version, sensei! The implementation mirrors the non-legacy version but operates on
|
||
|
||
async fn process( | ||
&self, | ||
_world: &WorldContractReader<P>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize
task_manager
field inEngine
Ohayo, sensei! On line 211, the
Engine
struct has a new fieldtask_manager
but it's not initialized in the constructor. Make sure to initializetask_manager
in theEngine::new
method to prevent a runtime error.Apply this diff to initialize
task_manager
: