Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): task manager & parallelize erc transfers #2913

Merged
merged 12 commits into from
Jan 22, 2025
169 changes: 34 additions & 135 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use bitflags::bitflags;
use cainome::cairo_serde::CairoSerde;
use dojo_utils::provider as provider_utils;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::{join_all, try_join_all};
use futures_util::future::join_all;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage,
Expand All @@ -18,7 +17,7 @@ use starknet::core::types::{
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use starknet_crypto::{poseidon_hash_many, Felt};
use starknet_crypto::Felt;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -47,6 +46,7 @@ use crate::processors::upgrade_model::UpgradeModelProcessor;
use crate::processors::{
BlockProcessor, EventProcessor, EventProcessorConfig, TransactionProcessor,
};
use crate::task_manager::{self, ParallelizedEvent, TaskManager};

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;

Expand Down Expand Up @@ -200,17 +200,6 @@ pub struct FetchPendingResult {
pub block_number: u64,
}

#[derive(Debug)]
pub struct ParallelizedEvent {
pub block_number: u64,
pub block_timestamp: u64,
pub event_id: String,
pub event: Event,
}

type TaskPriority = usize;
type TaskId = u64;

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand All @@ -220,7 +209,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<(ContractType, ParallelizedEvent)>>>,
task_manager: TaskManager<P>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize task_manager field in Engine

Ohayo, sensei! On line 211, the Engine struct has a new field task_manager but it's not initialized in the constructor. Make sure to initialize task_manager in the Engine::new method to prevent a runtime error.

Apply this diff to initialize task_manager:

          block_tx,
          contracts,
-         tasks: HashMap::new(),
+         task_manager: TaskManager::new(processors.clone()),
      }

Committable suggestion skipped: line range outside the PR's diff.

contracts: Arc<HashMap<Felt, ContractType>>,
}

Expand All @@ -244,17 +233,27 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let contracts = Arc::new(
contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(),
);
let world = Arc::new(world);
let processors = Arc::new(processors);
let max_concurrent_tasks = config.max_concurrent_tasks;
let event_processor_config = config.event_processor_config.clone();

Self {
world: Arc::new(world),
db,
world: world.clone(),
db: db.clone(),
provider: Arc::new(provider),
processors: Arc::new(processors),
processors: processors.clone(),
config,
shutdown_tx,
block_tx,
contracts,
tasks: BTreeMap::new(),
task_manager: TaskManager::new(
db,
world,
processors,
max_concurrent_tasks,
event_processor_config,
),
}
}

Expand Down Expand Up @@ -542,7 +541,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

// Process parallelized events
self.process_tasks().await?;
self.task_manager.process_tasks().await?;

self.db.update_cursors(
data.block_number - 1,
Expand Down Expand Up @@ -589,7 +588,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

// Process parallelized events
self.process_tasks().await?;
self.task_manager.process_tasks().await?;

let last_block_timestamp =
get_block_timestamp(&self.provider, data.latest_block_number).await?;
Expand All @@ -599,77 +598,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Ok(())
}

async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.config.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for (contract_type, event) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event.event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
event.block_number,
event.block_timestamp,
&event.event_id,
&event.event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}

Ok::<_, anyhow::Error>(())
}));
}

// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}

async fn process_transaction_with_events(
&mut self,
transaction_hash: Felt,
Expand Down Expand Up @@ -881,50 +809,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
.find(|p| p.validate(event))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix code formatting issue.

Ohayo, sensei! The pipeline indicates a long line that needs to be split. Let's fix that!

-        let processor = processors.iter().find(|p| p.validate(event)).expect("Must find atleast one processor for the event");
+        let processor = processors
+            .iter()
+            .find(|p| p.validate(event))
+            .expect("Must find atleast one processor for the event");

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines

.expect("Must find atleast one processor for the event");

let (task_priority, task_identifier) = match processor.event_key().as_str() {
"ModelRegistered" | "EventRegistered" => {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
let hash = hasher.finish();
(0usize, hash) // Priority 0 (highest) for model/event registration
}
"StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
"EventEmitted" => {
let mut hasher = DefaultHasher::new();

let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});

// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
let (task_priority, task_identifier) =
(processor.task_priority(), processor.task_identifier(event));

let hash = hasher.finish();
(2usize, hash) // Priority 2 for event messages
}
_ => (0, 0), // No parallelization for other events
};

if task_identifier != 0 {
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push(
(
// if our event can be parallelized, we add it to the task manager
if task_identifier != task_manager::TASK_ID_SEQUENTIAL {
self.task_manager.add_parallelized_event(
task_priority,
task_identifier,
ParallelizedEvent {
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
),
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
);
} else {
// Process non-parallelized events immediately
Expand Down
1 change: 1 addition & 0 deletions crates/torii/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ mod test;

pub mod engine;
pub mod processors;
mod task_manager;

pub use engine::Engine;
21 changes: 21 additions & 0 deletions crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer";

Expand All @@ -34,6 +37,24 @@ where
false
}

fn task_priority(&self) -> TaskPriority {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}
Comment on lines +44 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking for data array access, sensei!

The current implementation assumes event.data has at least 2 elements. Add validation to handle malformed events safely.

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.data.len() < 2 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
     canonical_pair.hash(&mut hasher);
     hasher.finish()
 }

Committable suggestion skipped: line range outside the PR's diff.


async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
21 changes: 21 additions & 0 deletions crates/torii/indexer/src/processors/erc20_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer";

Expand All @@ -34,6 +37,24 @@ where
false
}

fn task_priority(&self) -> TaskPriority {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}
Comment on lines +44 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking for array access, sensei!

The current implementation assumes event.keys has at least 3 elements. Add validation to handle malformed events safely:

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.keys.len() < 3 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
     canonical_pair.hash(&mut hasher);
     hasher.finish()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);
// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}
fn task_identifier(&self, event: &Event) -> TaskId {
if event.keys.len() < 3 {
let mut hasher = DefaultHasher::new();
event.hash(&mut hasher);
return hasher.finish();
}
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);
// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}


async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
29 changes: 29 additions & 0 deletions crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -8,6 +10,7 @@ use torii_sqlite::Sql;
use tracing::debug;

use super::{EventProcessor, EventProcessorConfig};
use crate::task_manager::{TaskId, TaskPriority};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer";

Expand All @@ -34,6 +37,32 @@ where
false
}

fn task_priority(&self) -> TaskPriority {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.data[2].hash(&mut hasher);
event.data[3].hash(&mut hasher);

hasher.finish()
}
Comment on lines +44 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking and consider sharing code with non-legacy version, sensei!

The implementation mirrors the non-legacy version but operates on event.data instead of event.keys. Consider:

  1. Adding bounds checking for array access
  2. Extracting common logic into a shared trait or utility function


async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
Loading
Loading