Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): task manager & parallelize erc transfers #2913

Merged
merged 12 commits into from
Jan 22, 2025

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Jan 15, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new task management system for event processing.
    • Added task prioritization and unique event identification across various event processors.
  • Improvements

    • Enhanced event processing capabilities with more granular task handling.
    • Implemented a flexible TaskManager to manage parallelized events more efficiently.
    • Updated local cache for improved concurrency and performance.
  • Technical Updates

    • Restructured event processing logic in the indexer.
    • Added methods for task priority and task identification to event processors.
    • Transitioned to asynchronous operations for cache management and event handling.
    • Updated the LocalCache for concurrent access using reference counting.

Copy link

coderabbitai bot commented Jan 15, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Ohayo, sensei! This pull request introduces substantial modifications to the event processing system within the Torii indexer. Key changes include the removal of the ParallelizedEvent struct, the introduction of a TaskManager struct to enhance task handling, and the addition of task_priority and task_identifier methods across various event processors. These updates reflect a shift towards a more modular and efficient approach to managing tasks and events, streamlining the overall processing logic.

Changes

File Change Summary
crates/torii/indexer/src/engine.rs Removed ParallelizedEvent struct, replaced tasks with task_manager field, updated task processing logic.
crates/torii/indexer/src/lib.rs Added new task_manager module.
crates/torii/indexer/src/processors/* Added task_priority() and task_identifier() methods to multiple event processors, enhancing task management capabilities.
crates/torii/indexer/src/task_manager.rs New implementation of TaskManager and ParallelizedEvent structs, including methods for managing and processing tasks.
crates/torii/sqlite/src/cache.rs Updated LocalCache fields to use RwLock for concurrency, modified method signatures to be asynchronous.
crates/torii/sqlite/src/erc.rs Made handle_erc20_transfer and handle_erc721_transfer methods asynchronous, updated cache handling logic.
crates/torii/sqlite/src/lib.rs Changed local_cache field type from LocalCache to Arc<LocalCache> for shared ownership.

Possibly related PRs


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4d506b9 and 2a65785.

📒 Files selected for processing (2)
  • crates/torii/indexer/src/processors/metadata_update.rs (3 hunks)
  • crates/torii/indexer/src/processors/register_model.rs (3 hunks)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
crates/torii/indexer/src/engine.rs (1)

Line range hint 473-495: Update process_tasks method to use task_manager

Ohayo, sensei! The process_tasks method still references self.tasks, which should now be managed by task_manager. Update the method to retrieve tasks from task_manager and process them accordingly.

🧹 Nitpick comments (10)
crates/torii/indexer/src/task_manager.rs (2)

58-66: Consider returning a reference in take_tasks for efficiency

Ohayo, sensei! In the take_tasks method (lines 58-62), using std::mem::take moves the tasks out of the TaskManager, leaving an empty BTreeMap. If the intention is to process tasks without clearing them, consider returning a reference or cloning the tasks.


24-26: Add visibility modifier to tasks field if needed

Ohayo, sensei! The tasks field in TaskManager is currently private. If external modules need to access tasks, consider adding a visibility modifier like pub(crate).

crates/torii/indexer/src/engine.rs (1)

Line range hint 838-860: Remove unused function get_transaction_hash_from_event_id if obsolete

Ohayo, sensei! The function get_transaction_hash_from_event_id on lines 838-841 might be obsolete due to changes in event handling. If it's no longer used, consider removing it to keep the codebase clean.

crates/torii/indexer/src/processors/raw_event.rs (1)

26-34: Clarify task parallelization for RawEventProcessor

Ohayo, sensei! In the RawEventProcessor, the task_identifier method returns 0, indicating that raw events are not parallelized (lines 30-33). If you plan to parallelize raw event processing in the future, consider adding a TODO comment for clarity.

crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)

45-57: Ohayo sensei! Consider reducing code duplication.

The task identification logic is identical to Erc20TransferProcessor except for accessing event data differently. Consider extracting the common hashing logic into a shared utility function.

Here's a suggested approach:

// In a shared utils module:
pub fn compute_transfer_task_id(event_key: &Felt, address1: &Felt, address2: &Felt) -> TaskId {
    let mut hasher = DefaultHasher::new();
    event_key.hash(&mut hasher);
    let canonical_pair = std::cmp::max(*address1, *address2);
    canonical_pair.hash(&mut hasher);
    hasher.finish()
}

// In Erc20TransferProcessor:
fn task_identifier(&self, event: &Event) -> TaskId {
    compute_transfer_task_id(&event.keys[0], &event.keys[1], &event.keys[2])
}

// In Erc20LegacyTransferProcessor:
fn task_identifier(&self, event: &Event) -> TaskId {
    compute_transfer_task_id(&event.keys[0], &event.data[0], &event.data[1])
}
crates/torii/indexer/src/processors/store_del_record.rs (1)

38-43: Consider adding bounds checking and documentation.

The task identifier implementation could benefit from:

  1. Array bounds validation before accessing keys[1] and keys[2]
  2. Comments explaining what these keys represent in the context of StoreDelRecord events
 fn task_identifier(&self, event: &Event) -> TaskId {
+    // Ensure we have enough keys
+    if event.keys.len() < 3 {
+        panic!("StoreDelRecord event must have at least 3 keys");
+    }
+
+    // Keys[1]: Model selector
+    // Keys[2]: Entity ID
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     hasher.finish()
 }
crates/torii/indexer/src/processors/store_set_record.rs (1)

37-42: Add safety checks and documentation.

Similar to StoreDelRecord, consider:

  1. Validating array bounds
  2. Documenting the meaning of keys[1] and keys[2]
 fn task_identifier(&self, event: &Event) -> u64 {
+    // Ensure we have enough keys
+    if event.keys.len() < 3 {
+        panic!("StoreSetRecord event must have at least 3 keys");
+    }
+
+    // Keys[1]: Model selector
+    // Keys[2]: Entity ID
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     hasher.finish()
 }
crates/torii/indexer/src/processors/store_update_member.rs (1)

33-42: Ohayo sensei! Consider extracting common task ID generation logic.

The task_identifier implementation is identical to StoreUpdateRecordProcessor. Consider:

  1. Extracting this common logic into a trait or utility function
  2. Adding bounds checking for array access
// Example utility function
fn generate_task_id(keys: &[FieldElement]) -> u64 {
    let mut hasher = DefaultHasher::new();
    if keys.len() > 2 {
        keys[1].hash(&mut hasher);
        keys[2].hash(&mut hasher);
    }
    hasher.finish()
}
crates/torii/indexer/src/processors/register_model.rs (1)

39-43: Consider adopting this comprehensive task ID generation approach globally.

This implementation is more robust as it hashes all available keys. Consider using this approach in other processors (like StoreUpdateRecord and StoreUpdateMember) for consistency and completeness.

crates/torii/indexer/src/processors/upgrade_event.rs (1)

Line range hint 82-82: Consider improving documentation clarity, sensei.

The comment "Called model here by language, but it's an event" suggests potential confusion. Consider updating variable names to better reflect their purpose (e.g., event_model or event_definition).

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8af3d6 and 15cb6e9.

📒 Files selected for processing (19)
  • crates/torii/indexer/src/engine.rs (1 hunks)
  • crates/torii/indexer/src/lib.rs (1 hunks)
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc20_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/event_message.rs (1 hunks)
  • crates/torii/indexer/src/processors/metadata_update.rs (1 hunks)
  • crates/torii/indexer/src/processors/mod.rs (2 hunks)
  • crates/torii/indexer/src/processors/raw_event.rs (1 hunks)
  • crates/torii/indexer/src/processors/register_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/register_model.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (2 hunks)
  • crates/torii/indexer/src/task_manager.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • crates/torii/indexer/src/lib.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (16)
crates/torii/indexer/src/task_manager.rs (1)

46-53: Confirm intentional exclusion of events with task_id zero

Ohayo, sensei! The current logic skips adding events to tasks when task_id is zero (lines 46-53). Please verify if this is intentional. If events with task_id zero should be processed differently, consider documenting the rationale.

crates/torii/indexer/src/processors/mod.rs (1)

58-59: Ohayo! The trait changes look good, sensei!

The addition of task_priority and task_identifier methods to the EventProcessor trait provides a solid foundation for task management. This enables consistent prioritization and identification of tasks across all event processors.

crates/torii/indexer/src/processors/event_message.rs (1)

35-38: Ohayo sensei! Let's address the TODO for event message parallelization.

Returning 0 for all task identifiers could lead to task collisions if parallelization is implemented in the future. Consider implementing a proper task identifier based on event properties, similar to other processors.

Would you like me to propose an implementation that generates unique task identifiers based on event properties?

crates/torii/indexer/src/processors/erc20_transfer.rs (1)

45-57: Ohayo! Excellent implementation of task identification, sensei!

The use of canonical representation for transfer addresses is a clever approach. By taking the maximum of the from/to addresses, you ensure that transfers between the same pair of addresses are grouped together regardless of direction. The implementation is well-documented and efficient.

crates/torii/indexer/src/processors/store_del_record.rs (1)

34-36: Ohayo! Task priority looks good, sensei!

The constant priority of 1 is consistent with other processors.

crates/torii/indexer/src/processors/store_set_record.rs (1)

33-35: Task priority implementation looks good, sensei!

Consistent with other processors.

crates/torii/indexer/src/processors/erc721_transfer.rs (2)

40-42: Task priority looks good, sensei!

Consistent priority level of 1 matches other processors.


44-64: Excellent implementation with clear documentation!

The task identification strategy is well thought out:

  1. Uses canonical address pairs to group related transfers
  2. Enables safe parallelization by token ID
  3. Includes clear documentation explaining the approach

This is a great example of how to implement and document task identification logic!

crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (2)

41-43: Task priority implementation looks good, sensei!

Consistent with other processors.


45-65: Excellent implementation with clear documentation!

The task identification strategy mirrors the non-legacy implementation:

  1. Uses canonical address pairs to group related transfers
  2. Enables safe parallelization by token ID
  3. Includes clear documentation explaining the approach

Great consistency with the non-legacy implementation while adapting to the different event structure!

crates/torii/indexer/src/processors/store_update_record.rs (1)

1-1: Ohayo! Implementation looks good, sensei!

The priority value of 1 is appropriate for store update events, as they should be processed after system events (priority 0).

Also applies to: 33-35

crates/torii/indexer/src/processors/register_model.rs (1)

35-37: Ohayo! Excellent priority choice, sensei!

Setting priority 0 for model registration is correct, as models need to be registered before any store operations can be processed.

crates/torii/indexer/src/processors/register_event.rs (1)

35-43: Ohayo! Implementation is consistent and robust, sensei!

The implementation aligns well with RegisterModelProcessor:

  • Priority 0 is appropriate for system events
  • Comprehensive key hashing approach ensures unique task IDs
crates/torii/indexer/src/processors/upgrade_model.rs (2)

Line range hint 44-136: Process method implementation looks solid, sensei!

The extensive error handling and logging in the process method align well with its high-priority task status.


34-43: Ohayo! Add bounds checking for event.keys array access, sensei.

The task_identifier implementation assumes event.keys[1] always exists. Consider adding a bounds check to handle cases where the event keys array might be empty or have insufficient elements.

Also, consider adding documentation to explain:

  • Why priority 0 was chosen for model upgrades
  • The significance of using the second key for task identification
crates/torii/indexer/src/processors/upgrade_event.rs (1)

34-43: Extract common task identification logic, sensei!

The task identifier implementation is duplicated across multiple processors. Consider extracting this into a common utility function or trait default implementation.

Also, similar to the model upgrade processor, add bounds checking for event.keys[1] access.

Comment on lines 33 to 42
pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId {
let event_key = parallelized_event.event.keys[0];
let processor = self
.processors
.get_event_processor(parallelized_event.contract_type)
.get(&event_key)
.unwrap()
.iter()
.find(|p| p.validate(&parallelized_event.event))
.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using .unwrap() to prevent potential panics

Ohayo, sensei! The usage of .unwrap() on lines 39 and 42 could lead to a panic if the Option is None. To enhance the robustness of the code, consider handling the None case explicitly or propagating the error using ?.

Apply this diff to handle the potential None values:

 let processor = self
     .processors
     .get_event_processor(parallelized_event.contract_type)
-    .get(&event_key)
-    .unwrap()
+    .get(&event_key)
+    .ok_or_else(|| anyhow!("No processor found for event key"))?
     .iter()
     .find(|p| p.validate(&parallelized_event.event))
-    .unwrap();
+    .ok_or_else(|| anyhow!("No valid processor found for event"))?;

Committable suggestion skipped: line range outside the PR's diff.

@@ -216,7 +208,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
task_manager: TaskManager<P>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize task_manager field in Engine

Ohayo, sensei! On line 211, the Engine struct has a new field task_manager but it's not initialized in the constructor. Make sure to initialize task_manager in the Engine::new method to prevent a runtime error.

Apply this diff to initialize task_manager:

          block_tx,
          contracts,
-         tasks: HashMap::new(),
+         task_manager: TaskManager::new(processors.clone()),
      }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 37 to 42
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Consider adding bounds checking for array access.

The current implementation assumes keys[1] and keys[2] always exist. Consider adding bounds checking to prevent potential panic:

     fn task_identifier(&self, event: &Event) -> u64 {
         let mut hasher = DefaultHasher::new();
-        event.keys[1].hash(&mut hasher);
-        event.keys[2].hash(&mut hasher);
+        if event.keys.len() > 2 {
+            event.keys[1].hash(&mut hasher);
+            event.keys[2].hash(&mut hasher);
+        }
         hasher.finish()
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
if event.keys.len() > 2 {
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
}
hasher.finish()
}

crates/torii/indexer/src/processors/metadata_update.rs Outdated Show resolved Hide resolved
@Larkooo Larkooo changed the title feat(torii-indexer): task manager & task id for every event feat(torii-indexer): task manager & parallelize erc transfers Jan 15, 2025
@Larkooo Larkooo force-pushed the better-task-management branch from 15cb6e9 to 49e91b0 Compare January 16, 2025 07:48
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
crates/torii/indexer/src/task_manager.rs (1)

96-98: ⚠️ Potential issue

Avoid using .expect() to prevent potential panics

Ohayo, sensei! The use of .expect("Must find at least one processor for the event") on line 98 could lead to a panic if no processor is found that validates the event. To enhance the robustness of the code, consider handling the None case explicitly or propagating the error using ?.

Apply this diff to handle the potential None value:

 .find(|p| p.validate(&event))
-.expect("Must find at least one processor for the event");
+.ok_or_else(|| anyhow!("No valid processor found for the event"))?;
🧹 Nitpick comments (7)
crates/torii/indexer/src/processors/register_model.rs (2)

35-37: Consider documenting the priority scheme, sensei!

The constant priority of 0 suggests this is a low-priority processor, but the reasoning and its impact on execution order isn't documented. Adding documentation would help other developers understand the priority hierarchy across different processors.

+    /// Returns the task priority for the RegisterModel processor.
+    /// Priority 0 indicates lowest priority in the processing queue.
     fn task_priority(&self) -> usize {
         0
     }

39-43: Consider hash collision mitigation, sensei!

The current implementation might produce different hashes for the same logical event if the keys are in a different order. Additionally, there's no explicit handling of hash collisions.

Consider these improvements:

  1. Sort the keys before hashing to ensure consistent results
  2. Include more event fields in the hash for better uniqueness
     fn task_identifier(&self, event: &Event) -> u64 {
         let mut hasher = DefaultHasher::new();
-        event.keys.iter().for_each(|k| k.hash(&mut hasher));
+        let mut sorted_keys = event.keys.clone();
+        sorted_keys.sort();
+        sorted_keys.hash(&mut hasher);
+        event.from_address.hash(&mut hasher);
+        event.data.hash(&mut hasher);
         hasher.finish()
     }
crates/torii/indexer/src/processors/store_update_member.rs (1)

33-35: Ohayo sensei! Let's document the priority scheme!

The constant priority value of 1 would benefit from documentation explaining:

  • The range of valid priorities
  • What this priority level means
  • How it compares to other processors' priorities

Consider adding a doc comment like this:

+    /// Returns the task priority for store update member events.
+    /// Priority 1 indicates [explain significance of this priority level]
+    /// Valid priority range is [specify range]
     fn task_priority(&self) -> usize {
         1
     }
crates/torii/indexer/src/processors/store_del_record.rs (1)

34-36: Ohayo! Consider documenting the priority scheme, sensei!

The priority value of 1 is used without context. Adding documentation would help future maintainers understand the priority hierarchy across different processors.

+    /// Returns the task priority for StoreDelRecord events.
+    /// Priority 1 indicates high-priority processing (add more context about the priority scheme).
     fn task_priority(&self) -> usize {
         1
     }
crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)

52-52: Fix trailing whitespace.

There's a trailing whitespace on this line that needs to be removed.

-        // regardless of direction (A->B or B->A) 
+        // regardless of direction (A->B or B->A)
🧰 Tools
🪛 GitHub Actions: ci

[warning] 52-52: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/processors/erc721_transfer.rs (1)

49-49: Fix trailing whitespace.

There are trailing whitespaces on these lines that need to be removed.

-        // regardless of direction (A->B or B->A) 
+        // regardless of direction (A->B or B->A)
-        // 2. Multiple transfers of the same token must be sequential 
+        // 2. Multiple transfers of the same token must be sequential

Also applies to: 59-59

🧰 Tools
🪛 GitHub Actions: ci

[warning] 49-49: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/processors/upgrade_model.rs (1)

34-43: Add documentation for task management methods.

Ohayo, sensei! The implementation looks good, but could benefit from documentation explaining:

  • Why the priority is set to 0
  • Why the second key (index 1) is used for task identification
  • What the task identifier represents in the context of model upgrades
+    /// Returns the task priority for model upgrade events.
+    /// Priority 0 indicates highest priority processing.
     fn task_priority(&self) -> usize {
         0
     }

+    /// Generates a unique task identifier for model upgrade events.
+    /// Uses the model selector (second key) to ensure upgrades for the same model
+    /// are processed sequentially.
     fn task_identifier(&self, event: &Event) -> u64 {
         let mut hasher = DefaultHasher::new();
         // model selector
         event.keys[1].hash(&mut hasher);
         hasher.finish()
     }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 15cb6e9 and 49e91b0.

📒 Files selected for processing (22)
  • crates/torii/indexer/src/engine.rs (8 hunks)
  • crates/torii/indexer/src/lib.rs (1 hunks)
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc20_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/event_message.rs (1 hunks)
  • crates/torii/indexer/src/processors/metadata_update.rs (1 hunks)
  • crates/torii/indexer/src/processors/mod.rs (2 hunks)
  • crates/torii/indexer/src/processors/raw_event.rs (1 hunks)
  • crates/torii/indexer/src/processors/register_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/register_model.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (2 hunks)
  • crates/torii/indexer/src/task_manager.rs (1 hunks)
  • crates/torii/sqlite/src/cache.rs (2 hunks)
  • crates/torii/sqlite/src/erc.rs (7 hunks)
  • crates/torii/sqlite/src/lib.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
  • crates/torii/indexer/src/lib.rs
  • crates/torii/indexer/src/processors/mod.rs
  • crates/torii/indexer/src/processors/store_update_record.rs
  • crates/torii/indexer/src/processors/raw_event.rs
  • crates/torii/indexer/src/processors/event_message.rs
  • crates/torii/indexer/src/processors/metadata_update.rs
  • crates/torii/indexer/src/processors/upgrade_event.rs
  • crates/torii/indexer/src/processors/store_set_record.rs
  • crates/torii/indexer/src/processors/erc20_transfer.rs
  • crates/torii/indexer/src/processors/register_event.rs
🧰 Additional context used
🪛 GitHub Actions: ci
crates/torii/sqlite/src/lib.rs

[warning] 74-74: Code formatting issue: Inconsistent struct formatting

crates/torii/indexer/src/processors/erc20_legacy_transfer.rs

[warning] 52-52: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/processors/erc721_transfer.rs

[warning] 49-49: Code formatting issue: Trailing whitespace detected


[warning] 59-59: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/engine.rs

[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines

🔇 Additional comments (10)
crates/torii/indexer/src/processors/register_model.rs (1)

1-1: Ohayo! Clean import addition, sensei!

The new hash-related imports are well-organized and properly utilized by the task identifier implementation.

crates/torii/indexer/src/processors/store_update_member.rs (1)

1-1: Ohayo sensei! The hash-related imports look good!

The new imports are properly organized and effectively utilized for generating task identifiers.

crates/torii/indexer/src/processors/store_del_record.rs (1)

1-1: Ohayo! Clean import addition, sensei!

The hash-related imports are well-chosen for generating unique task identifiers.

crates/torii/sqlite/src/erc.rs (1)

Line range hint 39-69: Improvements enhance concurrency and safety

Ohayo, sensei! The asynchronous modifications and the use of RwLock for erc_cache and token_id_registry improve concurrency and thread safety in the code. The changes ensure that cache operations are performed safely in an asynchronous context.

Also applies to: 95-137, 218-243, 282-288

crates/torii/sqlite/src/lib.rs (1)

46-46: Ohayo! The thread-safe cache implementation looks good, sensei!

The change to wrap LocalCache in Arc enables safe concurrent access. This aligns well with the parallelization objectives.

Let's verify the cache usage with this script:

Also applies to: 77-82

✅ Verification successful

Ohayo! Your Arc implementation is properly thread-safe, sensei! 🎯

The codebase shows correct usage of Arc with proper read/write locks and async patterns. All cache operations are properly synchronized through .read().await and .write().await. No unsafe mutations or problematic clone patterns were detected.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for proper Arc usage and potential thread safety issues

# Search for any direct mutations of LocalCache that might bypass Arc
rg -A 3 "local_cache\." 

# Look for potential clone() calls that might create multiple strong references
rg "local_cache\.clone\(\)"

Length of output: 2999

crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)

41-57: Ohayo! Nice implementation of task management, sensei!

The task prioritization and identification logic is well thought out. The canonical pair approach ensures consistent grouping of transfers between the same addresses, regardless of direction.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 52-52: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/processors/erc721_transfer.rs (1)

40-64: Ohayo! Excellent implementation with clear documentation, sensei!

The parallelization strategy for ERC721 tokens is well thought out and clearly documented. The three key points about parallel transfers, sequential token handling, and related transfer grouping make the implementation's behavior very clear.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 49-49: Code formatting issue: Trailing whitespace detected


[warning] 59-59: Code formatting issue: Trailing whitespace detected

crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (1)

41-65: Ohayo! Consistent implementation with great documentation, sensei!

The implementation aligns perfectly with the other processors and maintains the same high-quality documentation for the parallelization strategy.

crates/torii/indexer/src/engine.rs (2)

236-256: LGTM! TaskManager initialization looks good.

The TaskManager is properly initialized with all required dependencies: database, world contract, processors, concurrency limits, and processor config.


812-825: LGTM! Event parallelization logic is well-structured.

The implementation correctly:

  1. Determines if an event can be parallelized using task identifiers
  2. Routes parallelizable events through the task manager
  3. Maintains immediate processing for non-parallelizable events

Comment on lines 37 to 42
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Let's make the task identifier generation more robust!

The current implementation has several areas for improvement:

  1. No bounds checking before accessing event.keys[1] and event.keys[2]
  2. Missing documentation about why these specific keys are chosen

Consider this safer implementation:

     fn task_identifier(&self, event: &Event) -> u64 {
+        // Ensure we have enough keys
+        if event.keys.len() < 3 {
+            // Either return a default hash or handle the error appropriately
+            return 0;
+        }
+
+        // Hash keys[1] (represents ...) and keys[2] (represents ...)
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }

Also, please add documentation explaining:

  • The significance of keys[1] and keys[2]
  • Why these specific keys are chosen for the identifier
  • What makes this combination unique for store update member events
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
fn task_identifier(&self, event: &Event) -> u64 {
// Ensure we have enough keys
if event.keys.len() < 3 {
// Either return a default hash or handle the error appropriately
return 0;
}
// Hash keys[1] (represents ...) and keys[2] (represents ...)
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

Comment on lines +38 to +43
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add bounds checking and documentation for the task identifier, sensei!

The task identifier implementation has potential issues:

  1. No validation before accessing event.keys indices
  2. Missing documentation explaining why these specific keys are used for identification

Consider this safer implementation:

+    /// Generates a unique task identifier by hashing specific event keys.
+    /// Keys[1] represents ... and Keys[2] represents ... (please document the meaning of these keys)
     fn task_identifier(&self, event: &Event) -> TaskId {
+        if event.keys.len() < 3 {
+            // Either return a default hash or handle the error appropriately
+            let mut hasher = DefaultHasher::new();
+            event.hash(&mut hasher);
+            return hasher.finish();
+        }
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
/// Generates a unique task identifier by hashing specific event keys.
/// Keys[1] represents ... and Keys[2] represents ... (please document the meaning of these keys)
fn task_identifier(&self, event: &Event) -> TaskId {
if event.keys.len() < 3 {
// Either return a default hash or handle the error appropriately
let mut hasher = DefaultHasher::new();
event.hash(&mut hasher);
return hasher.finish();
}
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

Comment on lines 127 to 129
Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(HashSet::new()) }
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Review the Clone implementation for LocalCache

Ohayo, sensei! In the Clone implementation of LocalCache, initializing erc_cache and token_id_registry as new empty collections means the cloned cache does not retain the original state. This could lead to unexpected behavior if the cache contents need to be preserved. Consider one of the following options:

  • Option 1: Remove the Clone implementation if cloning is not necessary.
  • Option 2: Adjust the implementation to clone the contents of the caches.

Since .await cannot be used in a synchronous context, cloning the contents safely is non-trivial.

@@ -881,50 +809,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
.find(|p| p.validate(event))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix code formatting issue.

Ohayo, sensei! The pipeline indicates a long line that needs to be split. Let's fix that!

-        let processor = processors.iter().find(|p| p.validate(event)).expect("Must find atleast one processor for the event");
+        let processor = processors
+            .iter()
+            .find(|p| p.validate(event))
+            .expect("Must find atleast one processor for the event");

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: ci

[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/store_del_record.rs (1)

37-42: ⚠️ Potential issue

Add bounds checking for array access, sensei!

Similar to other processors, this implementation needs validation for array access safety.

🧹 Nitpick comments (3)
crates/torii/indexer/src/processors/erc20_transfer.rs (1)

40-42: Consider using an enum for task priorities, sensei!

Having a constant priority value of 1 across all processors makes the priority levels implicit. Consider introducing an enum to make priorities more explicit and maintainable:

pub enum TaskPriority {
    High = 1,
    Medium = 2,
    Low = 3,
}
crates/torii/indexer/src/processors/store_update_member.rs (1)

34-36: Ohayo sensei! Consider documenting the priority value.

Please add documentation explaining why this priority value was chosen and its significance in the task processing order.

+    /// Returns a priority value of 1 for store update member events.
+    /// Lower values indicate higher priority in task processing.
     fn task_priority(&self) -> usize {
         1
     }
crates/torii/indexer/src/engine.rs (1)

812-826: Ohayo sensei! Let's document the parallelization logic!

Please add documentation explaining:

  1. The significance of task_identifier being 0
  2. The criteria for parallelization
  3. The relationship between task_priority and task_identifier
+        // Get task priority and identifier from the processor
+        // task_identifier of 0 indicates that the event cannot be parallelized
         let (task_priority, task_identifier) =
             (processor.task_priority(), processor.task_identifier(event));

+        // Parallelize events with non-zero task identifiers
         if task_identifier != 0 {
             self.task_manager.add_parallelized_event(
                 task_priority,
                 task_identifier,
                 ParallelizedEvent {
                     contract_type,
                     event_id: event_id.to_string(),
                     event: event.clone(),
                     block_number,
                     block_timestamp,
                 },
             );
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 49e91b0 and dc5af0e.

📒 Files selected for processing (14)
  • crates/torii/indexer/src/engine.rs (8 hunks)
  • crates/torii/indexer/src/lib.rs (1 hunks)
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc20_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/event_message.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (2 hunks)
  • crates/torii/indexer/src/task_manager.rs (1 hunks)
  • crates/torii/sqlite/src/cache.rs (2 hunks)
  • crates/torii/sqlite/src/lib.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • crates/torii/indexer/src/lib.rs
  • crates/torii/indexer/src/processors/upgrade_model.rs
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
  • crates/torii/indexer/src/processors/upgrade_event.rs
  • crates/torii/sqlite/src/cache.rs
  • crates/torii/sqlite/src/lib.rs
🔇 Additional comments (7)
crates/torii/indexer/src/processors/event_message.rs (1)

39-50: LGTM! Well-implemented task identifier with proper error handling.

The implementation properly handles deserialization errors and uses appropriate hashing mechanisms for both the selector and entity ID.

crates/torii/indexer/src/processors/store_update_member.rs (1)

38-43: Ohayo sensei! Let's make the task identifier generation more robust!

The current implementation has several areas for improvement:

  1. No bounds checking before accessing event.keys[1] and event.keys[2]
  2. Missing documentation about why these specific keys are chosen
crates/torii/indexer/src/task_manager.rs (2)

41-56: Ohayo sensei! The constructor looks good!

The initialization is well-structured and includes all necessary parameters.


58-70: Ohayo sensei! The event addition logic is clean and efficient!

The nested entry API usage ensures proper initialization of default values.

crates/torii/indexer/src/engine.rs (3)

236-256: Ohayo sensei! The task manager initialization looks good!

The dependencies are properly cloned and the initialization is correct.


544-544: Ohayo sensei! The task processing placement is correct!

The task manager's process_tasks call is properly placed after event processing in the pending block handler.


591-591: Ohayo sensei! The task processing placement is correct!

The task manager's process_tasks call is properly placed after event processing in the range block handler.

Comment on lines +44 to +56
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking for array access, sensei!

The current implementation assumes event.keys has at least 3 elements. Add validation to handle malformed events safely:

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.keys.len() < 3 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
     canonical_pair.hash(&mut hasher);
     hasher.finish()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);
// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}
fn task_identifier(&self, event: &Event) -> TaskId {
if event.keys.len() < 3 {
let mut hasher = DefaultHasher::new();
event.hash(&mut hasher);
return hasher.finish();
}
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);
// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}

Comment on lines +44 to +64
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.keys[3].hash(&mut hasher);
event.keys[4].hash(&mut hasher);

hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Excellent implementation with clear documentation, sensei!

The task identifier implementation is well documented and shows good design choices:

  1. Canonical representation for address pairs
  2. Smart parallelization strategy using token IDs
  3. Clear comments explaining the rationale

However, please add bounds checking for array access safety.

Comment on lines +44 to +64
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.data[2].hash(&mut hasher);
event.data[3].hash(&mut hasher);

hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking and consider sharing code with non-legacy version, sensei!

The implementation mirrors the non-legacy version but operates on event.data instead of event.keys. Consider:

  1. Adding bounds checking for array access
  2. Extracting common logic into a shared trait or utility function

Comment on lines +72 to +148
pub async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks));

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.event_processor_config.clone();

handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for ParallelizedEvent {
contract_type,
event,
block_number,
block_timestamp,
event_id,
} in events
{
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event))
.expect("Must find at least one processor for the event");

debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);

if let Err(e) = processor
.process(
&world,
&mut local_db,
block_number,
block_timestamp,
&event_id,
&event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}

Ok::<_, anyhow::Error>(())
}));
}

// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Let's enhance the error handling!

The task processing logic is well-structured, but the error handling could be improved in two areas:

  1. Line 105: Using expect could cause a panic. Consider handling the case when no processor is found.
  2. Lines 115-135: The error is logged but not propagated, which might hide critical issues.

Apply this diff to improve error handling:

-                            let processor = processors
-                                .iter()
-                                .find(|p| p.validate(&event))
-                                .expect("Must find at least one processor for the event");
+                            let processor = processors
+                                .iter()
+                                .find(|p| p.validate(&event))
+                                .ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;

-                            if let Err(e) = processor
+                            processor
                                 .process(
                                     &world,
                                     &mut local_db,
                                     block_number,
                                     block_timestamp,
                                     &event_id,
                                     &event,
                                     &event_processor_config,
                                 )
-                                .await
-                            {
-                                error!(
-                                    target: LOG_TARGET,
-                                    event_name = processor.event_key(),
-                                    error = %e,
-                                    task_id = %task_id,
-                                    priority = %priority,
-                                    "Processing parallelized event."
-                                );
-                            }
+                                .await
+                                .map_err(|e| {
+                                    error!(
+                                        target: LOG_TARGET,
+                                        event_name = processor.event_key(),
+                                        error = %e,
+                                        task_id = %task_id,
+                                        priority = %priority,
+                                        "Processing parallelized event."
+                                    );
+                                    e
+                                })?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks));
// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();
// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.event_processor_config.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
// Process all events for this task sequentially
for ParallelizedEvent {
contract_type,
event,
block_number,
block_timestamp,
event_id,
} in events
{
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event))
.expect("Must find at least one processor for the event");
debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
if let Err(e) = processor
.process(
&world,
&mut local_db,
block_number,
block_timestamp,
&event_id,
&event,
&event_processor_config,
)
.await
{
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
}
}
}
Ok::<_, anyhow::Error>(())
}));
}
// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}
Ok(())
}
pub async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks));
// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();
// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
let world = self.world.clone();
let semaphore = semaphore.clone();
let processors = self.processors.clone();
let event_processor_config = self.event_processor_config.clone();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
// Process all events for this task sequentially
for ParallelizedEvent {
contract_type,
event,
block_number,
block_timestamp,
event_id,
} in events
{
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.keys[0]) {
let processor = processors
.iter()
.find(|p| p.validate(&event))
.ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;
debug!(
target: LOG_TARGET,
event_name = processor.event_key(),
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
processor
.process(
&world,
&mut local_db,
block_number,
block_timestamp,
&event_id,
&event,
&event_processor_config,
)
.await
.map_err(|e| {
error!(
target: LOG_TARGET,
event_name = processor.event_key(),
error = %e,
task_id = %task_id,
priority = %priority,
"Processing parallelized event."
);
e
})?;
}
}
Ok::<_, anyhow::Error>(())
}));
}
// Wait for all tasks in this priority level to complete before moving to next priority
try_join_all(handles).await?;
}
Ok(())
}

Copy link

codecov bot commented Jan 16, 2025

Codecov Report

Attention: Patch coverage is 48.41499% with 179 lines in your changes missing coverage. Please review.

Project coverage is 55.74%. Comparing base (c989595) to head (2a65785).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
crates/torii/sqlite/src/erc.rs 0.00% 40 Missing ⚠️
...i/indexer/src/processors/erc721_legacy_transfer.rs 0.00% 24 Missing ⚠️
...es/torii/indexer/src/processors/erc721_transfer.rs 0.00% 24 Missing ⚠️
...ii/indexer/src/processors/erc20_legacy_transfer.rs 0.00% 16 Missing ⚠️
...tes/torii/indexer/src/processors/erc20_transfer.rs 0.00% 16 Missing ⚠️
...orii/indexer/src/processors/store_update_member.rs 0.00% 11 Missing ⚠️
crates/torii/indexer/src/task_manager.rs 87.20% 11 Missing ⚠️
...es/torii/indexer/src/processors/metadata_update.rs 0.00% 8 Missing ⚠️
...ates/torii/indexer/src/processors/upgrade_event.rs 0.00% 8 Missing ⚠️
...ates/torii/indexer/src/processors/upgrade_model.rs 0.00% 8 Missing ⚠️
... and 4 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2913      +/-   ##
==========================================
- Coverage   55.79%   55.74%   -0.05%     
==========================================
  Files         444      445       +1     
  Lines       57435    57627     +192     
==========================================
+ Hits        32045    32126      +81     
- Misses      25390    25501     +111     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Larkooo Larkooo enabled auto-merge (squash) January 17, 2025 06:26
Copy link
Collaborator

@glihm glihm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments and let's merge. 👍 Thanks for the work done here @Larkooo!

crates/torii/indexer/src/processors/event_message.rs Outdated Show resolved Hide resolved
crates/torii/indexer/src/processors/event_message.rs Outdated Show resolved Hide resolved
crates/torii/indexer/src/processors/metadata_update.rs Outdated Show resolved Hide resolved
crates/torii/indexer/src/processors/upgrade_model.rs Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/store_update_member.rs (1)

38-43: ⚠️ Potential issue

The task identifier implementation still needs improvement!

The previous review comments about bounds checking and documentation remain unaddressed.

For reference, the implementation should include bounds checking and documentation as previously suggested:

+    /// Generates a unique task identifier by combining specific event keys.
+    /// keys[1]: ... (explain what this key represents)
+    /// keys[2]: ... (explain what this key represents)
     fn task_identifier(&self, event: &Event) -> u64 {
+        // Ensure we have enough keys
+        if event.keys.len() < 3 {
+            // Either return a default hash or handle the error appropriately
+            return 0;
+        }
+
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }
🧹 Nitpick comments (5)
crates/torii/indexer/src/processors/store_del_record.rs (1)

33-35: Document the task priority value, sensei!

Please add documentation explaining:

  • The significance of priority level 2
  • How it compares to other processors' priorities
  • Impact on task scheduling
+    /// Returns the priority level for store deletion tasks.
+    /// Priority 2 indicates ... (please explain the significance)
     fn task_priority(&self) -> usize {
         2
     }
crates/torii/indexer/src/processors/metadata_update.rs (2)

39-41: Ohayo! Consider documenting the priority scale, sensei.

The hardcoded priority value of 3 would benefit from documentation explaining its significance in the overall task priority scale. This helps maintainers understand where metadata updates rank in relation to other processor priorities.

Add a doc comment explaining the priority scale:

+    /// Returns the task priority (3) for metadata updates.
+    /// Priority scale:
+    /// - Higher values indicate higher priority
+    /// - Current scale: X-Y (document the range)
     fn task_priority(&self) -> usize {
         3
     }

43-47: Ohayo! Great implementation of task identification, sensei!

The implementation nicely addresses the parallelization concern by using event keys for task identification. However, consider adding documentation to explain the hashing strategy.

Add documentation and consider defining a constant for default TaskId as suggested by @glihm:

+    /// Computes a unique task identifier by hashing the event keys.
+    /// This allows parallel processing of metadata updates for different resources.
+    /// 
+    /// If no keys are present, returns DEFAULT_TASK_ID for serialized processing.
+    const DEFAULT_TASK_ID: u64 = 0;
     fn task_identifier(&self, event: &Event) -> u64 {
+        if event.keys.is_empty() {
+            return Self::DEFAULT_TASK_ID;
+        }
         let mut hasher = DefaultHasher::new();
         event.keys.iter().for_each(|k| k.hash(&mut hasher));
         hasher.finish()
     }
crates/torii/indexer/src/processors/store_update_member.rs (1)

34-36: Ohayo sensei! Let's document the task priority!

The priority value of 2 needs documentation explaining:

  • Why this specific priority was chosen
  • How it relates to other processors' priorities
  • What impact it has on processing order

Add documentation above the method:

+    /// Returns a priority of 2 for store update member events.
+    /// This priority ensures that ... (explain the reasoning)
     fn task_priority(&self) -> usize {
         2
     }
crates/torii/indexer/src/processors/store_set_record.rs (1)

33-35: Document the priority scheme, sensei!

The hardcoded priority value of 2 needs documentation to explain:

  • The priority scale being used (e.g., 1-5, 1-10)
  • What this priority level means for StoreSetRecord events
  • How it relates to other processors' priorities

Add a doc comment explaining the priority scheme:

+    /// Returns the priority level for StoreSetRecord events.
+    /// Priority 2 indicates [explain significance here]
+    /// The priority scale ranges from [min] to [max].
     fn task_priority(&self) -> usize {
         2
     }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f7902f3 and 3490207.

📒 Files selected for processing (7)
  • crates/torii/indexer/src/processors/metadata_update.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • crates/torii/indexer/src/processors/upgrade_model.rs
  • crates/torii/indexer/src/processors/store_update_record.rs
  • crates/torii/indexer/src/processors/upgrade_event.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build
🔇 Additional comments (5)
crates/torii/indexer/src/processors/store_del_record.rs (2)

1-1: Ohayo! The imports look good, sensei!

The new imports are well-organized and properly support the task management functionality.

Also applies to: 13-13


37-42: Ohayo! The task identifier still needs bounds checking and documentation, sensei!

The implementation could panic if event.keys has fewer than 3 elements. Consider adding bounds checking and documentation as suggested in the previous review.

crates/torii/indexer/src/processors/store_update_member.rs (1)

1-1: Ohayo sensei! The hash-related imports look good!

The necessary imports for hash functionality are correctly added.

crates/torii/indexer/src/processors/store_set_record.rs (2)

1-1: Ohayo! Clean import statement, sensei!

The hash-related imports are well-organized and properly scoped for the new task identification functionality.


37-42: Verify task identifier implementation robustness, sensei!

The current implementation has several potential concerns:

  1. No bounds checking for event.keys array access
  2. Unclear why only keys[1] and keys[2] are used for identification
  3. No documentation about hash collision probability

Consider adding:

  1. Bounds checking or documentation about expected key length
  2. Comments explaining the significance of the chosen keys
  3. Error handling for cases where keys might be missing
+    /// Generates a unique identifier for the event based on specific keys.
+    /// Uses keys[1] and keys[2] because [explain significance].
+    /// 
+    /// # Panics
+    /// Panics if the event has fewer than 3 keys.
     fn task_identifier(&self, event: &Event) -> u64 {
+        debug_assert!(event.keys.len() >= 3, "Event must have at least 3 keys");
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }

Let's verify the key usage pattern:

✅ Verification successful

Ohayo! Task identifier implementation is verified, sensei!

The implementation is correct and intentionally designed for parallel processing:

  • keys[1] and keys[2] represent entity coordinates in the World contract
  • This ensures updates to the same entity are processed sequentially
  • Different entities can be processed in parallel for better performance
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for StoreSetRecord event key patterns
rg -A 5 "StoreSetRecord.*keys\[" 

Length of output: 34


Script:

#!/bin/bash
# Find Event struct definition
ast-grep --pattern 'struct Event {
  $$$
}'

# Find StoreSetRecord implementation and usage
rg -l "StoreSetRecord"

# Look for test files
fd "test.*\.rs$" --exec rg -l "StoreSetRecord|Event"

Length of output: 670


Script:

#!/bin/bash
# Check Event struct definition
cat crates/dojo/types/src/event.rs

# Look for StoreSetRecord usage in tests
rg -A 10 "StoreSetRecord" crates/torii/graphql/src/tests/events_test.rs

# Check engine implementation
cat crates/torii/indexer/src/engine.rs

Length of output: 34133

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/erc20_transfer.rs (1)

44-56: ⚠️ Potential issue

Add bounds checking for array access, sensei!

The current implementation assumes event.keys has at least 3 elements. Add validation to handle malformed events safely.

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.keys.len() < 3 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
     canonical_pair.hash(&mut hasher);
     hasher.finish()
 }
🧹 Nitpick comments (4)
crates/torii/indexer/src/processors/raw_event.rs (1)

27-34: Ohayo! Add documentation for task management methods, sensei!

Consider adding doc comments to explain:

  • Why raw events have priority 1
  • Why raw events are processed sequentially
  • Future plans for parallelization (mentioned in TODO)
+    /// Returns the priority level for raw event processing.
+    /// Currently set to 1 as a baseline priority.
     fn task_priority(&self) -> TaskPriority {
         1
     }

+    /// Returns a sequential task ID for raw events.
+    /// Raw events are currently processed sequentially to maintain order consistency.
+    /// TODO: Evaluate parallelization opportunities for raw events.
     fn task_identifier(&self, _event: &Event) -> TaskId {
         // TODO. for now raw events are not parallelized
         task_manager::TASK_ID_SEQUENTIAL
     }
crates/torii/indexer/src/processors/upgrade_event.rs (1)

40-44: Consider adding a comment explaining the task identifier generation logic.

The implementation is correct, but documenting why we hash the event keys would improve maintainability.

 fn task_identifier(&self, event: &Event) -> TaskId {
+    // Generate a unique task ID by hashing event keys to allow parallel processing
+    // of different upgrade events
     let mut hasher = DefaultHasher::new();
     event.keys.iter().for_each(|k| k.hash(&mut hasher));
     hasher.finish()
 }
crates/torii/indexer/src/task_manager.rs (1)

33-40: Consider adding documentation for TaskManager fields.

Adding documentation for each field would improve code maintainability and help other developers understand the purpose of each component.

 pub struct TaskManager<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
+    /// Database connection for persisting task state
     db: Sql,
+    /// World contract reader for blockchain interaction
     world: Arc<WorldContractReader<P>>,
+    /// Tasks organized by priority and task ID
     tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<ParallelizedEvent>>>,
+    /// Available event processors
     processors: Arc<Processors<P>>,
+    /// Maximum number of concurrent tasks
     max_concurrent_tasks: usize,
+    /// Configuration for event processors
     event_processor_config: EventProcessorConfig,
 }
crates/torii/indexer/src/engine.rs (1)

250-256: Consider extracting configuration into a dedicated struct.

Ohayo sensei! The TaskManager initialization has multiple parameters. Consider grouping them into a configuration struct for better maintainability.

+#[derive(Clone)]
+pub struct TaskManagerConfig<P> {
+    pub db: Sql,
+    pub world: Arc<WorldContractReader<P>>,
+    pub processors: Arc<Processors<P>>,
+    pub max_concurrent_tasks: usize,
+    pub event_processor_config: EventProcessorConfig,
+}

-            task_manager: TaskManager::new(
-                db,
-                world,
-                processors,
-                max_concurrent_tasks,
-                event_processor_config,
-            ),
+            task_manager: TaskManager::new(TaskManagerConfig {
+                db,
+                world,
+                processors,
+                max_concurrent_tasks,
+                event_processor_config,
+            }),
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3490207 and 4d506b9.

📒 Files selected for processing (17)
  • crates/torii/indexer/src/engine.rs (8 hunks)
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc20_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/event_message.rs (2 hunks)
  • crates/torii/indexer/src/processors/metadata_update.rs (3 hunks)
  • crates/torii/indexer/src/processors/mod.rs (2 hunks)
  • crates/torii/indexer/src/processors/raw_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/register_event.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (3 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (3 hunks)
  • crates/torii/indexer/src/task_manager.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • crates/torii/indexer/src/processors/mod.rs
  • crates/torii/indexer/src/processors/event_message.rs
  • crates/torii/indexer/src/processors/store_update_record.rs
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: test
  • GitHub Check: ensure-windows
  • GitHub Check: test-hurl
🔇 Additional comments (11)
crates/torii/indexer/src/processors/store_del_record.rs (1)

33-42: ⚠️ Potential issue

Add safety checks and documentation for task management, sensei!

The implementation needs:

  1. Documentation explaining why this processor has priority 2
  2. Bounds checking for array access in task_identifier
+    /// Returns priority 2 for store delete record processing.
+    /// Higher priority than transfer events to ensure consistent state management.
     fn task_priority(&self) -> TaskPriority {
         2
     }

+    /// Generates a task identifier from event keys to manage parallel processing.
     fn task_identifier(&self, event: &Event) -> TaskId {
+        if event.keys.len() < 3 {
+            let mut hasher = DefaultHasher::new();
+            event.hash(&mut hasher);
+            return hasher.finish();
+        }
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }

Likely invalid or redundant comment.

crates/torii/indexer/src/processors/erc721_transfer.rs (1)

44-64: ⚠️ Potential issue

Add bounds checking while maintaining excellent parallelization strategy, sensei!

The implementation has well-documented parallelization logic but needs safety checks for array access.

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.keys.len() < 5 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     // Rest of the implementation remains unchanged as it's well-documented
     // and implements an excellent parallelization strategy
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
     canonical_pair.hash(&mut hasher);
     event.keys[3].hash(&mut hasher);
     event.keys[4].hash(&mut hasher);
     hasher.finish()
 }

Likely invalid or redundant comment.

crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (1)

40-64: Excellent implementation of task identification, sensei!

The task identifier implementation is well-documented and thoughtfully designed:

  1. Uses canonical address pairs to group related transfers
  2. Leverages token ID for safe parallelization
  3. Clearly explains the rationale in comments
crates/torii/indexer/src/processors/register_event.rs (1)

36-44: Well-implemented task management, sensei!

The implementation is solid:

  1. Appropriate highest priority (0) for registration events
  2. Safe iteration over all keys for identifier generation
crates/torii/indexer/src/processors/store_update_member.rs (1)

39-46: 🛠️ Refactor suggestion

Ohayo sensei! Great documentation, but let's add bounds checking!

The implementation has good documentation explaining key significance but should include bounds checking.

Consider this safer implementation:

     fn task_identifier(&self, event: &Event) -> TaskId {
+        // Ensure we have enough keys
+        if event.keys.len() < 3 {
+            return 0;
+        }
+
         let mut hasher = DefaultHasher::new();
         // model selector
         event.keys[1].hash(&mut hasher);
         // entity id
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }

Likely invalid or redundant comment.

crates/torii/indexer/src/processors/upgrade_model.rs (1)

36-44: Clean and safe implementation, sensei!

The implementation is well done:

  1. Appropriate priority level for upgrade events
  2. Safe iteration over keys for identifier generation
crates/torii/indexer/src/processors/upgrade_event.rs (1)

36-38: LGTM! Task priority is appropriate for upgrade events.

The priority of 1 is suitable for upgrade events as they are critical but not as time-sensitive as other operations.

crates/torii/indexer/src/processors/metadata_update.rs (2)

40-42: Verify the task priority value.

Ohayo sensei! The priority of 3 seems high compared to upgrade events (priority 1). Consider documenting why metadata updates need higher priority or adjusting the value.


44-48: LGTM! Task identifier implementation is consistent.

The implementation follows the same pattern as other processors, ensuring consistent task identification across the system.

crates/torii/indexer/src/task_manager.rs (1)

74-147: ⚠️ Potential issue

Enhance error handling in task processing.

Ohayo sensei! While the task processing logic is well-structured, consider improving error handling:

  1. Line 107: Using expect could cause a panic if no processor is found.
  2. Error propagation could be improved to maintain system stability.

Apply this diff to improve error handling:

-                            let processor = processors
-                                .iter()
-                                .find(|p| p.validate(&event))
-                                .expect("Must find at least one processor for the event");
+                            let processor = processors
+                                .iter()
+                                .find(|p| p.validate(&event))
+                                .ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;

Likely invalid or redundant comment.

crates/torii/indexer/src/engine.rs (1)

815-826: LGTM! Clean implementation of parallelized event handling.

The logic for determining whether to parallelize an event and adding it to the task manager is well-implemented. The check against TASK_ID_SEQUENTIAL provides a clean way to handle non-parallelizable events.

Comment on lines +44 to +56
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add bounds checking for data array access, sensei!

The current implementation assumes event.data has at least 2 elements. Add validation to handle malformed events safely.

 fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.data.len() < 2 {
+        let mut hasher = DefaultHasher::new();
+        event.hash(&mut hasher);
+        return hasher.finish();
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[0].hash(&mut hasher);
     let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
     canonical_pair.hash(&mut hasher);
     hasher.finish()
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +38 to +43
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Ohayo sensei! The bounds check is indeed necessary for robustness!

The codebase consistently performs bounds checking before accessing event keys, as seen in other processors like ERC20/ERC721 transfers. This pattern is crucial for handling potentially malformed events and preventing panics.

fn task_identifier(&self, event: &Event) -> TaskId {
    // Ensure we have the required keys (selector, model, entity)
    if event.keys.len() < 3 {
        return 0; // Return default hash for invalid events
    }
    
    let mut hasher = DefaultHasher::new();
    event.keys[1].hash(&mut hasher); // model selector
    event.keys[2].hash(&mut hasher); // entity ID
    hasher.finish()
}
🔗 Analysis chain

Ohayo sensei! Let's make the task identifier more robust!

The implementation should include:

  1. Bounds checking before accessing event.keys
  2. Documentation explaining the significance of the selected keys

Consider this safer implementation:

     fn task_identifier(&self, event: &Event) -> TaskId {
+        // Ensure we have enough keys
+        if event.keys.len() < 3 {
+            // Return a default hash for invalid events
+            return 0;
+        }
+        
+        // Hash keys[1] (model selector) and keys[2] (entity ID)
         let mut hasher = DefaultHasher::new();
         event.keys[1].hash(&mut hasher);
         event.keys[2].hash(&mut hasher);
         hasher.finish()
     }
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find Event struct definition
ast-grep --pattern 'struct Event {$$$}'

# Find usages of event.keys
rg "event\.keys" -A 2 -B 2

# Look for documentation mentioning Event or keys
rg -i "event.*keys|keys.*event" -g "*.{md,txt,rs}"

Length of output: 65806

@glihm glihm disabled auto-merge January 22, 2025 03:23
@glihm glihm merged commit 8f54b66 into dojoengine:main Jan 22, 2025
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants