Skip to content

Commit

Permalink
feat(torii-indexer): task manager & task id for every event
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jan 15, 2025
1 parent a8af3d6 commit 15cb6e9
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 9 deletions.
10 changes: 1 addition & 9 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,6 @@ pub struct FetchPendingResult {
pub block_number: u64,
}

#[derive(Debug)]
pub struct ParallelizedEvent {
pub block_number: u64,
pub block_timestamp: u64,
pub event_id: String,
pub event: Event,
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand All @@ -216,7 +208,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
task_manager: TaskManager<P>,
contracts: Arc<HashMap<Felt, ContractType>>,
}

Expand Down
1 change: 1 addition & 0 deletions crates/torii/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod constants;
#[path = "test.rs"]
mod test;

mod task_manager;
pub mod engine;
pub mod processors;

Expand Down
22 changes: 22 additions & 0 deletions crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer";
Expand Down Expand Up @@ -34,6 +38,24 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
22 changes: 22 additions & 0 deletions crates/torii/indexer/src/processors/erc20_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer";
Expand Down Expand Up @@ -34,6 +38,24 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
30 changes: 30 additions & 0 deletions crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer";
Expand Down Expand Up @@ -34,6 +38,32 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.data[2].hash(&mut hasher);
event.data[3].hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
29 changes: 29 additions & 0 deletions crates/torii/indexer/src/processors/erc721_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +8,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer";
Expand Down Expand Up @@ -34,6 +37,32 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.keys[3].hash(&mut hasher);
event.keys[4].hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now event messages are not parallelized
0
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now metadata updates are not parallelized
0
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
5 changes: 5 additions & 0 deletions crates/torii/indexer/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction};
use starknet::providers::Provider;
use torii_sqlite::Sql;

use crate::task_manager::TaskId;

pub mod erc20_legacy_transfer;
pub mod erc20_transfer;
pub mod erc721_legacy_transfer;
Expand Down Expand Up @@ -53,6 +55,9 @@ where

fn validate(&self, event: &Event) -> bool;

fn task_priority(&self) -> usize;
fn task_identifier(&self, event: &Event) -> TaskId;

#[allow(clippy::too_many_arguments)]
async fn process(
&self,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/raw_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now raw events are not parallelized
0
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/register_event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -30,6 +32,16 @@ where
true
}

fn task_priority(&self) -> usize {
0
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/register_model.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -30,6 +32,16 @@ where
true
}

fn task_priority(&self) -> usize {
0
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
15 changes: 15 additions & 0 deletions crates/torii/indexer/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::{debug, info};

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record";
Expand All @@ -27,6 +31,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
13 changes: 13 additions & 0 deletions crates/torii/indexer/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -28,6 +30,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_types::schema::{Struct, Ty};
Expand Down Expand Up @@ -29,6 +30,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
Loading

0 comments on commit 15cb6e9

Please sign in to comment.