-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii-indexer): task manager & parallelize erc transfers #2913
Conversation
Caution Review failedThe pull request is closed. WalkthroughOhayo, sensei! This pull request introduces substantial modifications to the event processing system within the Torii indexer. Key changes include the removal of the Changes
Possibly related PRs
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (2)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
crates/torii/indexer/src/engine.rs (1)
Line range hint
473-495
: Updateprocess_tasks
method to usetask_manager
Ohayo, sensei! The
process_tasks
method still referencesself.tasks
, which should now be managed bytask_manager
. Update the method to retrieve tasks fromtask_manager
and process them accordingly.
🧹 Nitpick comments (10)
crates/torii/indexer/src/task_manager.rs (2)
58-66
: Consider returning a reference intake_tasks
for efficiencyOhayo, sensei! In the
take_tasks
method (lines 58-62), usingstd::mem::take
moves thetasks
out of theTaskManager
, leaving an emptyBTreeMap
. If the intention is to process tasks without clearing them, consider returning a reference or cloning the tasks.
24-26
: Add visibility modifier totasks
field if neededOhayo, sensei! The
tasks
field inTaskManager
is currently private. If external modules need to accesstasks
, consider adding a visibility modifier likepub(crate)
.crates/torii/indexer/src/engine.rs (1)
Line range hint
838-860
: Remove unused functionget_transaction_hash_from_event_id
if obsoleteOhayo, sensei! The function
get_transaction_hash_from_event_id
on lines 838-841 might be obsolete due to changes in event handling. If it's no longer used, consider removing it to keep the codebase clean.crates/torii/indexer/src/processors/raw_event.rs (1)
26-34
: Clarify task parallelization forRawEventProcessor
Ohayo, sensei! In the
RawEventProcessor
, thetask_identifier
method returns0
, indicating that raw events are not parallelized (lines 30-33). If you plan to parallelize raw event processing in the future, consider adding aTODO
comment for clarity.crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)
45-57
: Ohayo sensei! Consider reducing code duplication.The task identification logic is identical to
Erc20TransferProcessor
except for accessing event data differently. Consider extracting the common hashing logic into a shared utility function.Here's a suggested approach:
// In a shared utils module: pub fn compute_transfer_task_id(event_key: &Felt, address1: &Felt, address2: &Felt) -> TaskId { let mut hasher = DefaultHasher::new(); event_key.hash(&mut hasher); let canonical_pair = std::cmp::max(*address1, *address2); canonical_pair.hash(&mut hasher); hasher.finish() } // In Erc20TransferProcessor: fn task_identifier(&self, event: &Event) -> TaskId { compute_transfer_task_id(&event.keys[0], &event.keys[1], &event.keys[2]) } // In Erc20LegacyTransferProcessor: fn task_identifier(&self, event: &Event) -> TaskId { compute_transfer_task_id(&event.keys[0], &event.data[0], &event.data[1]) }crates/torii/indexer/src/processors/store_del_record.rs (1)
38-43
: Consider adding bounds checking and documentation.The task identifier implementation could benefit from:
- Array bounds validation before accessing keys[1] and keys[2]
- Comments explaining what these keys represent in the context of StoreDelRecord events
fn task_identifier(&self, event: &Event) -> TaskId { + // Ensure we have enough keys + if event.keys.len() < 3 { + panic!("StoreDelRecord event must have at least 3 keys"); + } + + // Keys[1]: Model selector + // Keys[2]: Entity ID let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }crates/torii/indexer/src/processors/store_set_record.rs (1)
37-42
: Add safety checks and documentation.Similar to StoreDelRecord, consider:
- Validating array bounds
- Documenting the meaning of keys[1] and keys[2]
fn task_identifier(&self, event: &Event) -> u64 { + // Ensure we have enough keys + if event.keys.len() < 3 { + panic!("StoreSetRecord event must have at least 3 keys"); + } + + // Keys[1]: Model selector + // Keys[2]: Entity ID let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }crates/torii/indexer/src/processors/store_update_member.rs (1)
33-42
: Ohayo sensei! Consider extracting common task ID generation logic.The task_identifier implementation is identical to StoreUpdateRecordProcessor. Consider:
- Extracting this common logic into a trait or utility function
- Adding bounds checking for array access
// Example utility function fn generate_task_id(keys: &[FieldElement]) -> u64 { let mut hasher = DefaultHasher::new(); if keys.len() > 2 { keys[1].hash(&mut hasher); keys[2].hash(&mut hasher); } hasher.finish() }crates/torii/indexer/src/processors/register_model.rs (1)
39-43
: Consider adopting this comprehensive task ID generation approach globally.This implementation is more robust as it hashes all available keys. Consider using this approach in other processors (like StoreUpdateRecord and StoreUpdateMember) for consistency and completeness.
crates/torii/indexer/src/processors/upgrade_event.rs (1)
Line range hint
82-82
: Consider improving documentation clarity, sensei.The comment "Called model here by language, but it's an event" suggests potential confusion. Consider updating variable names to better reflect their purpose (e.g.,
event_model
orevent_definition
).
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
crates/torii/indexer/src/engine.rs
(1 hunks)crates/torii/indexer/src/lib.rs
(1 hunks)crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc20_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/event_message.rs
(1 hunks)crates/torii/indexer/src/processors/metadata_update.rs
(1 hunks)crates/torii/indexer/src/processors/mod.rs
(2 hunks)crates/torii/indexer/src/processors/raw_event.rs
(1 hunks)crates/torii/indexer/src/processors/register_event.rs
(2 hunks)crates/torii/indexer/src/processors/register_model.rs
(2 hunks)crates/torii/indexer/src/processors/store_del_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_set_record.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_member.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_record.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_event.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_model.rs
(2 hunks)crates/torii/indexer/src/task_manager.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/torii/indexer/src/lib.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: fmt
🔇 Additional comments (16)
crates/torii/indexer/src/task_manager.rs (1)
46-53
: Confirm intentional exclusion of events withtask_id
zeroOhayo, sensei! The current logic skips adding events to
tasks
whentask_id
is zero (lines 46-53). Please verify if this is intentional. If events withtask_id
zero should be processed differently, consider documenting the rationale.crates/torii/indexer/src/processors/mod.rs (1)
58-59
: Ohayo! The trait changes look good, sensei!The addition of
task_priority
andtask_identifier
methods to theEventProcessor
trait provides a solid foundation for task management. This enables consistent prioritization and identification of tasks across all event processors.crates/torii/indexer/src/processors/event_message.rs (1)
35-38
: Ohayo sensei! Let's address the TODO for event message parallelization.Returning
0
for all task identifiers could lead to task collisions if parallelization is implemented in the future. Consider implementing a proper task identifier based on event properties, similar to other processors.Would you like me to propose an implementation that generates unique task identifiers based on event properties?
crates/torii/indexer/src/processors/erc20_transfer.rs (1)
45-57
: Ohayo! Excellent implementation of task identification, sensei!The use of canonical representation for transfer addresses is a clever approach. By taking the maximum of the from/to addresses, you ensure that transfers between the same pair of addresses are grouped together regardless of direction. The implementation is well-documented and efficient.
crates/torii/indexer/src/processors/store_del_record.rs (1)
34-36
: Ohayo! Task priority looks good, sensei!The constant priority of 1 is consistent with other processors.
crates/torii/indexer/src/processors/store_set_record.rs (1)
33-35
: Task priority implementation looks good, sensei!Consistent with other processors.
crates/torii/indexer/src/processors/erc721_transfer.rs (2)
40-42
: Task priority looks good, sensei!Consistent priority level of 1 matches other processors.
44-64
: Excellent implementation with clear documentation!The task identification strategy is well thought out:
- Uses canonical address pairs to group related transfers
- Enables safe parallelization by token ID
- Includes clear documentation explaining the approach
This is a great example of how to implement and document task identification logic!
crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (2)
41-43
: Task priority implementation looks good, sensei!Consistent with other processors.
45-65
: Excellent implementation with clear documentation!The task identification strategy mirrors the non-legacy implementation:
- Uses canonical address pairs to group related transfers
- Enables safe parallelization by token ID
- Includes clear documentation explaining the approach
Great consistency with the non-legacy implementation while adapting to the different event structure!
crates/torii/indexer/src/processors/store_update_record.rs (1)
1-1
: Ohayo! Implementation looks good, sensei!The priority value of 1 is appropriate for store update events, as they should be processed after system events (priority 0).
Also applies to: 33-35
crates/torii/indexer/src/processors/register_model.rs (1)
35-37
: Ohayo! Excellent priority choice, sensei!Setting priority 0 for model registration is correct, as models need to be registered before any store operations can be processed.
crates/torii/indexer/src/processors/register_event.rs (1)
35-43
: Ohayo! Implementation is consistent and robust, sensei!The implementation aligns well with RegisterModelProcessor:
- Priority 0 is appropriate for system events
- Comprehensive key hashing approach ensures unique task IDs
crates/torii/indexer/src/processors/upgrade_model.rs (2)
Line range hint
44-136
: Process method implementation looks solid, sensei!The extensive error handling and logging in the process method align well with its high-priority task status.
34-43
: Ohayo! Add bounds checking for event.keys array access, sensei.The
task_identifier
implementation assumesevent.keys[1]
always exists. Consider adding a bounds check to handle cases where the event keys array might be empty or have insufficient elements.Also, consider adding documentation to explain:
- Why priority 0 was chosen for model upgrades
- The significance of using the second key for task identification
crates/torii/indexer/src/processors/upgrade_event.rs (1)
34-43
: Extract common task identification logic, sensei!The task identifier implementation is duplicated across multiple processors. Consider extracting this into a common utility function or trait default implementation.
Also, similar to the model upgrade processor, add bounds checking for
event.keys[1]
access.
pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId { | ||
let event_key = parallelized_event.event.keys[0]; | ||
let processor = self | ||
.processors | ||
.get_event_processor(parallelized_event.contract_type) | ||
.get(&event_key) | ||
.unwrap() | ||
.iter() | ||
.find(|p| p.validate(¶llelized_event.event)) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using .unwrap()
to prevent potential panics
Ohayo, sensei! The usage of .unwrap()
on lines 39 and 42 could lead to a panic if the Option
is None
. To enhance the robustness of the code, consider handling the None
case explicitly or propagating the error using ?
.
Apply this diff to handle the potential None
values:
let processor = self
.processors
.get_event_processor(parallelized_event.contract_type)
- .get(&event_key)
- .unwrap()
+ .get(&event_key)
+ .ok_or_else(|| anyhow!("No processor found for event key"))?
.iter()
.find(|p| p.validate(¶llelized_event.event))
- .unwrap();
+ .ok_or_else(|| anyhow!("No valid processor found for event"))?;
Committable suggestion skipped: line range outside the PR's diff.
@@ -216,7 +208,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> { | |||
config: EngineConfig, | |||
shutdown_tx: Sender<()>, | |||
block_tx: Option<BoundedSender<u64>>, | |||
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>, | |||
task_manager: TaskManager<P>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize task_manager
field in Engine
Ohayo, sensei! On line 211, the Engine
struct has a new field task_manager
but it's not initialized in the constructor. Make sure to initialize task_manager
in the Engine::new
method to prevent a runtime error.
Apply this diff to initialize task_manager
:
block_tx,
contracts,
- tasks: HashMap::new(),
+ task_manager: TaskManager::new(processors.clone()),
}
Committable suggestion skipped: line range outside the PR's diff.
fn task_identifier(&self, event: &Event) -> u64 { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Consider adding bounds checking for array access.
The current implementation assumes keys[1] and keys[2] always exist. Consider adding bounds checking to prevent potential panic:
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
- event.keys[1].hash(&mut hasher);
- event.keys[2].hash(&mut hasher);
+ if event.keys.len() > 2 {
+ event.keys[1].hash(&mut hasher);
+ event.keys[2].hash(&mut hasher);
+ }
hasher.finish()
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn task_identifier(&self, event: &Event) -> u64 { | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} | |
fn task_identifier(&self, event: &Event) -> u64 { | |
let mut hasher = DefaultHasher::new(); | |
if event.keys.len() > 2 { | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
} | |
hasher.finish() | |
} |
15cb6e9
to
49e91b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
crates/torii/indexer/src/task_manager.rs (1)
96-98
:⚠️ Potential issueAvoid using
.expect()
to prevent potential panicsOhayo, sensei! The use of
.expect("Must find at least one processor for the event")
on line 98 could lead to a panic if no processor is found that validates the event. To enhance the robustness of the code, consider handling theNone
case explicitly or propagating the error using?
.Apply this diff to handle the potential
None
value:.find(|p| p.validate(&event)) -.expect("Must find at least one processor for the event"); +.ok_or_else(|| anyhow!("No valid processor found for the event"))?;
🧹 Nitpick comments (7)
crates/torii/indexer/src/processors/register_model.rs (2)
35-37
: Consider documenting the priority scheme, sensei!The constant priority of 0 suggests this is a low-priority processor, but the reasoning and its impact on execution order isn't documented. Adding documentation would help other developers understand the priority hierarchy across different processors.
+ /// Returns the task priority for the RegisterModel processor. + /// Priority 0 indicates lowest priority in the processing queue. fn task_priority(&self) -> usize { 0 }
39-43
: Consider hash collision mitigation, sensei!The current implementation might produce different hashes for the same logical event if the keys are in a different order. Additionally, there's no explicit handling of hash collisions.
Consider these improvements:
- Sort the keys before hashing to ensure consistent results
- Include more event fields in the hash for better uniqueness
fn task_identifier(&self, event: &Event) -> u64 { let mut hasher = DefaultHasher::new(); - event.keys.iter().for_each(|k| k.hash(&mut hasher)); + let mut sorted_keys = event.keys.clone(); + sorted_keys.sort(); + sorted_keys.hash(&mut hasher); + event.from_address.hash(&mut hasher); + event.data.hash(&mut hasher); hasher.finish() }crates/torii/indexer/src/processors/store_update_member.rs (1)
33-35
: Ohayo sensei! Let's document the priority scheme!The constant priority value of
1
would benefit from documentation explaining:
- The range of valid priorities
- What this priority level means
- How it compares to other processors' priorities
Consider adding a doc comment like this:
+ /// Returns the task priority for store update member events. + /// Priority 1 indicates [explain significance of this priority level] + /// Valid priority range is [specify range] fn task_priority(&self) -> usize { 1 }crates/torii/indexer/src/processors/store_del_record.rs (1)
34-36
: Ohayo! Consider documenting the priority scheme, sensei!The priority value of 1 is used without context. Adding documentation would help future maintainers understand the priority hierarchy across different processors.
+ /// Returns the task priority for StoreDelRecord events. + /// Priority 1 indicates high-priority processing (add more context about the priority scheme). fn task_priority(&self) -> usize { 1 }crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)
52-52
: Fix trailing whitespace.There's a trailing whitespace on this line that needs to be removed.
- // regardless of direction (A->B or B->A) + // regardless of direction (A->B or B->A)🧰 Tools
🪛 GitHub Actions: ci
[warning] 52-52: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/processors/erc721_transfer.rs (1)
49-49
: Fix trailing whitespace.There are trailing whitespaces on these lines that need to be removed.
- // regardless of direction (A->B or B->A) + // regardless of direction (A->B or B->A) - // 2. Multiple transfers of the same token must be sequential + // 2. Multiple transfers of the same token must be sequentialAlso applies to: 59-59
🧰 Tools
🪛 GitHub Actions: ci
[warning] 49-49: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/processors/upgrade_model.rs (1)
34-43
: Add documentation for task management methods.Ohayo, sensei! The implementation looks good, but could benefit from documentation explaining:
- Why the priority is set to 0
- Why the second key (index 1) is used for task identification
- What the task identifier represents in the context of model upgrades
+ /// Returns the task priority for model upgrade events. + /// Priority 0 indicates highest priority processing. fn task_priority(&self) -> usize { 0 } + /// Generates a unique task identifier for model upgrade events. + /// Uses the model selector (second key) to ensure upgrades for the same model + /// are processed sequentially. fn task_identifier(&self, event: &Event) -> u64 { let mut hasher = DefaultHasher::new(); // model selector event.keys[1].hash(&mut hasher); hasher.finish() }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
crates/torii/indexer/src/engine.rs
(8 hunks)crates/torii/indexer/src/lib.rs
(1 hunks)crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc20_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/event_message.rs
(1 hunks)crates/torii/indexer/src/processors/metadata_update.rs
(1 hunks)crates/torii/indexer/src/processors/mod.rs
(2 hunks)crates/torii/indexer/src/processors/raw_event.rs
(1 hunks)crates/torii/indexer/src/processors/register_event.rs
(2 hunks)crates/torii/indexer/src/processors/register_model.rs
(2 hunks)crates/torii/indexer/src/processors/store_del_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_set_record.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_member.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_record.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_event.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_model.rs
(2 hunks)crates/torii/indexer/src/task_manager.rs
(1 hunks)crates/torii/sqlite/src/cache.rs
(2 hunks)crates/torii/sqlite/src/erc.rs
(7 hunks)crates/torii/sqlite/src/lib.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
- crates/torii/indexer/src/lib.rs
- crates/torii/indexer/src/processors/mod.rs
- crates/torii/indexer/src/processors/store_update_record.rs
- crates/torii/indexer/src/processors/raw_event.rs
- crates/torii/indexer/src/processors/event_message.rs
- crates/torii/indexer/src/processors/metadata_update.rs
- crates/torii/indexer/src/processors/upgrade_event.rs
- crates/torii/indexer/src/processors/store_set_record.rs
- crates/torii/indexer/src/processors/erc20_transfer.rs
- crates/torii/indexer/src/processors/register_event.rs
🧰 Additional context used
🪛 GitHub Actions: ci
crates/torii/sqlite/src/lib.rs
[warning] 74-74: Code formatting issue: Inconsistent struct formatting
crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
[warning] 52-52: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/processors/erc721_transfer.rs
[warning] 49-49: Code formatting issue: Trailing whitespace detected
[warning] 59-59: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/engine.rs
[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines
🔇 Additional comments (10)
crates/torii/indexer/src/processors/register_model.rs (1)
1-1
: Ohayo! Clean import addition, sensei!The new hash-related imports are well-organized and properly utilized by the task identifier implementation.
crates/torii/indexer/src/processors/store_update_member.rs (1)
1-1
: Ohayo sensei! The hash-related imports look good!The new imports are properly organized and effectively utilized for generating task identifiers.
crates/torii/indexer/src/processors/store_del_record.rs (1)
1-1
: Ohayo! Clean import addition, sensei!The hash-related imports are well-chosen for generating unique task identifiers.
crates/torii/sqlite/src/erc.rs (1)
Line range hint
39-69
: Improvements enhance concurrency and safetyOhayo, sensei! The asynchronous modifications and the use of
RwLock
forerc_cache
andtoken_id_registry
improve concurrency and thread safety in the code. The changes ensure that cache operations are performed safely in an asynchronous context.Also applies to: 95-137, 218-243, 282-288
crates/torii/sqlite/src/lib.rs (1)
46-46
: Ohayo! The thread-safe cache implementation looks good, sensei!The change to wrap
LocalCache
inArc
enables safe concurrent access. This aligns well with the parallelization objectives.Let's verify the cache usage with this script:
Also applies to: 77-82
✅ Verification successful
Ohayo! Your Arc implementation is properly thread-safe, sensei! 🎯
The codebase shows correct usage of Arc with proper read/write locks and async patterns. All cache operations are properly synchronized through
.read().await
and.write().await
. No unsafe mutations or problematic clone patterns were detected.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for proper Arc usage and potential thread safety issues # Search for any direct mutations of LocalCache that might bypass Arc rg -A 3 "local_cache\." # Look for potential clone() calls that might create multiple strong references rg "local_cache\.clone\(\)"Length of output: 2999
crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)
41-57
: Ohayo! Nice implementation of task management, sensei!The task prioritization and identification logic is well thought out. The canonical pair approach ensures consistent grouping of transfers between the same addresses, regardless of direction.
🧰 Tools
🪛 GitHub Actions: ci
[warning] 52-52: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/processors/erc721_transfer.rs (1)
40-64
: Ohayo! Excellent implementation with clear documentation, sensei!The parallelization strategy for ERC721 tokens is well thought out and clearly documented. The three key points about parallel transfers, sequential token handling, and related transfer grouping make the implementation's behavior very clear.
🧰 Tools
🪛 GitHub Actions: ci
[warning] 49-49: Code formatting issue: Trailing whitespace detected
[warning] 59-59: Code formatting issue: Trailing whitespace detected
crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (1)
41-65
: Ohayo! Consistent implementation with great documentation, sensei!The implementation aligns perfectly with the other processors and maintains the same high-quality documentation for the parallelization strategy.
crates/torii/indexer/src/engine.rs (2)
236-256
: LGTM! TaskManager initialization looks good.The TaskManager is properly initialized with all required dependencies: database, world contract, processors, concurrency limits, and processor config.
812-825
: LGTM! Event parallelization logic is well-structured.The implementation correctly:
- Determines if an event can be parallelized using task identifiers
- Routes parallelizable events through the task manager
- Maintains immediate processing for non-parallelizable events
fn task_identifier(&self, event: &Event) -> u64 { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Let's make the task identifier generation more robust!
The current implementation has several areas for improvement:
- No bounds checking before accessing
event.keys[1]
andevent.keys[2]
- Missing documentation about why these specific keys are chosen
Consider this safer implementation:
fn task_identifier(&self, event: &Event) -> u64 {
+ // Ensure we have enough keys
+ if event.keys.len() < 3 {
+ // Either return a default hash or handle the error appropriately
+ return 0;
+ }
+
+ // Hash keys[1] (represents ...) and keys[2] (represents ...)
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Also, please add documentation explaining:
- The significance of keys[1] and keys[2]
- Why these specific keys are chosen for the identifier
- What makes this combination unique for store update member events
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn task_identifier(&self, event: &Event) -> u64 { | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} | |
fn task_identifier(&self, event: &Event) -> u64 { | |
// Ensure we have enough keys | |
if event.keys.len() < 3 { | |
// Either return a default hash or handle the error appropriately | |
return 0; | |
} | |
// Hash keys[1] (represents ...) and keys[2] (represents ...) | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} |
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add bounds checking and documentation for the task identifier, sensei!
The task identifier implementation has potential issues:
- No validation before accessing event.keys indices
- Missing documentation explaining why these specific keys are used for identification
Consider this safer implementation:
+ /// Generates a unique task identifier by hashing specific event keys.
+ /// Keys[1] represents ... and Keys[2] represents ... (please document the meaning of these keys)
fn task_identifier(&self, event: &Event) -> TaskId {
+ if event.keys.len() < 3 {
+ // Either return a default hash or handle the error appropriately
+ let mut hasher = DefaultHasher::new();
+ event.hash(&mut hasher);
+ return hasher.finish();
+ }
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn task_identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} | |
/// Generates a unique task identifier by hashing specific event keys. | |
/// Keys[1] represents ... and Keys[2] represents ... (please document the meaning of these keys) | |
fn task_identifier(&self, event: &Event) -> TaskId { | |
if event.keys.len() < 3 { | |
// Either return a default hash or handle the error appropriately | |
let mut hasher = DefaultHasher::new(); | |
event.hash(&mut hasher); | |
return hasher.finish(); | |
} | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} |
crates/torii/sqlite/src/cache.rs
Outdated
Self { erc_cache: RwLock::new(HashMap::new()), token_id_registry: RwLock::new(HashSet::new()) } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review the Clone
implementation for LocalCache
Ohayo, sensei! In the Clone
implementation of LocalCache
, initializing erc_cache
and token_id_registry
as new empty collections means the cloned cache does not retain the original state. This could lead to unexpected behavior if the cache contents need to be preserved. Consider one of the following options:
- Option 1: Remove the
Clone
implementation if cloning is not necessary. - Option 2: Adjust the implementation to clone the contents of the caches.
Since .await
cannot be used in a synchronous context, cloning the contents safely is non-trivial.
@@ -881,50 +809,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |||
.find(|p| p.validate(event)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix code formatting issue.
Ohayo, sensei! The pipeline indicates a long line that needs to be split. Let's fix that!
- let processor = processors.iter().find(|p| p.validate(event)).expect("Must find atleast one processor for the event");
+ let processor = processors
+ .iter()
+ .find(|p| p.validate(event))
+ .expect("Must find atleast one processor for the event");
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 GitHub Actions: ci
[warning] 809-809: Code formatting issue: Long line needs to be split into multiple lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/store_del_record.rs (1)
37-42
:⚠️ Potential issueAdd bounds checking for array access, sensei!
Similar to other processors, this implementation needs validation for array access safety.
🧹 Nitpick comments (3)
crates/torii/indexer/src/processors/erc20_transfer.rs (1)
40-42
: Consider using an enum for task priorities, sensei!Having a constant priority value of 1 across all processors makes the priority levels implicit. Consider introducing an enum to make priorities more explicit and maintainable:
pub enum TaskPriority { High = 1, Medium = 2, Low = 3, }crates/torii/indexer/src/processors/store_update_member.rs (1)
34-36
: Ohayo sensei! Consider documenting the priority value.Please add documentation explaining why this priority value was chosen and its significance in the task processing order.
+ /// Returns a priority value of 1 for store update member events. + /// Lower values indicate higher priority in task processing. fn task_priority(&self) -> usize { 1 }crates/torii/indexer/src/engine.rs (1)
812-826
: Ohayo sensei! Let's document the parallelization logic!Please add documentation explaining:
- The significance of task_identifier being 0
- The criteria for parallelization
- The relationship between task_priority and task_identifier
+ // Get task priority and identifier from the processor + // task_identifier of 0 indicates that the event cannot be parallelized let (task_priority, task_identifier) = (processor.task_priority(), processor.task_identifier(event)); + // Parallelize events with non-zero task identifiers if task_identifier != 0 { self.task_manager.add_parallelized_event( task_priority, task_identifier, ParallelizedEvent { contract_type, event_id: event_id.to_string(), event: event.clone(), block_number, block_timestamp, }, );
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
crates/torii/indexer/src/engine.rs
(8 hunks)crates/torii/indexer/src/lib.rs
(1 hunks)crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc20_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/event_message.rs
(2 hunks)crates/torii/indexer/src/processors/store_del_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_update_member.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_event.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_model.rs
(2 hunks)crates/torii/indexer/src/task_manager.rs
(1 hunks)crates/torii/sqlite/src/cache.rs
(2 hunks)crates/torii/sqlite/src/lib.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- crates/torii/indexer/src/lib.rs
- crates/torii/indexer/src/processors/upgrade_model.rs
- crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
- crates/torii/indexer/src/processors/upgrade_event.rs
- crates/torii/sqlite/src/cache.rs
- crates/torii/sqlite/src/lib.rs
🔇 Additional comments (7)
crates/torii/indexer/src/processors/event_message.rs (1)
39-50
: LGTM! Well-implemented task identifier with proper error handling.The implementation properly handles deserialization errors and uses appropriate hashing mechanisms for both the selector and entity ID.
crates/torii/indexer/src/processors/store_update_member.rs (1)
38-43
: Ohayo sensei! Let's make the task identifier generation more robust!The current implementation has several areas for improvement:
- No bounds checking before accessing
event.keys[1]
andevent.keys[2]
- Missing documentation about why these specific keys are chosen
crates/torii/indexer/src/task_manager.rs (2)
41-56
: Ohayo sensei! The constructor looks good!The initialization is well-structured and includes all necessary parameters.
58-70
: Ohayo sensei! The event addition logic is clean and efficient!The nested entry API usage ensures proper initialization of default values.
crates/torii/indexer/src/engine.rs (3)
236-256
: Ohayo sensei! The task manager initialization looks good!The dependencies are properly cloned and the initialization is correct.
544-544
: Ohayo sensei! The task processing placement is correct!The task manager's process_tasks call is properly placed after event processing in the pending block handler.
591-591
: Ohayo sensei! The task processing placement is correct!The task manager's process_tasks call is properly placed after event processing in the range block handler.
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add bounds checking for array access, sensei!
The current implementation assumes event.keys
has at least 3 elements. Add validation to handle malformed events safely:
fn task_identifier(&self, event: &Event) -> TaskId {
+ if event.keys.len() < 3 {
+ let mut hasher = DefaultHasher::new();
+ event.hash(&mut hasher);
+ return hasher.finish();
+ }
let mut hasher = DefaultHasher::new();
event.keys[0].hash(&mut hasher);
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn task_identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
// Hash the event key (Transfer) | |
event.keys[0].hash(&mut hasher); | |
// Take the max of from/to addresses to get a canonical representation | |
// This ensures transfers between the same pair of addresses are grouped together | |
// regardless of direction (A->B or B->A) | |
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); | |
canonical_pair.hash(&mut hasher); | |
hasher.finish() | |
} | |
fn task_identifier(&self, event: &Event) -> TaskId { | |
if event.keys.len() < 3 { | |
let mut hasher = DefaultHasher::new(); | |
event.hash(&mut hasher); | |
return hasher.finish(); | |
} | |
let mut hasher = DefaultHasher::new(); | |
// Hash the event key (Transfer) | |
event.keys[0].hash(&mut hasher); | |
// Take the max of from/to addresses to get a canonical representation | |
// This ensures transfers between the same pair of addresses are grouped together | |
// regardless of direction (A->B or B->A) | |
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); | |
canonical_pair.hash(&mut hasher); | |
hasher.finish() | |
} |
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
// For ERC721, we can safely parallelize by token ID since each token is unique | ||
// and can only be owned by one address at a time. This means: | ||
// 1. Transfers of different tokens can happen in parallel | ||
// 2. Multiple transfers of the same token must be sequential | ||
// 3. The canonical address pair ensures related transfers stay together | ||
event.keys[3].hash(&mut hasher); | ||
event.keys[4].hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent implementation with clear documentation, sensei!
The task identifier implementation is well documented and shows good design choices:
- Canonical representation for address pairs
- Smart parallelization strategy using token IDs
- Clear comments explaining the rationale
However, please add bounds checking for array access safety.
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.data[0], event.data[1]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
// For ERC721, we can safely parallelize by token ID since each token is unique | ||
// and can only be owned by one address at a time. This means: | ||
// 1. Transfers of different tokens can happen in parallel | ||
// 2. Multiple transfers of the same token must be sequential | ||
// 3. The canonical address pair ensures related transfers stay together | ||
event.data[2].hash(&mut hasher); | ||
event.data[3].hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add bounds checking and consider sharing code with non-legacy version, sensei!
The implementation mirrors the non-legacy version but operates on event.data
instead of event.keys
. Consider:
- Adding bounds checking for array access
- Extracting common logic into a shared trait or utility function
pub async fn process_tasks(&mut self) -> Result<()> { | ||
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); | ||
|
||
// Process each priority level sequentially | ||
for (priority, task_group) in std::mem::take(&mut self.tasks) { | ||
let mut handles = Vec::new(); | ||
|
||
// Process all tasks within this priority level concurrently | ||
for (task_id, events) in task_group { | ||
let db = self.db.clone(); | ||
let world = self.world.clone(); | ||
let semaphore = semaphore.clone(); | ||
let processors = self.processors.clone(); | ||
let event_processor_config = self.event_processor_config.clone(); | ||
|
||
handles.push(tokio::spawn(async move { | ||
let _permit = semaphore.acquire().await?; | ||
let mut local_db = db.clone(); | ||
|
||
// Process all events for this task sequentially | ||
for ParallelizedEvent { | ||
contract_type, | ||
event, | ||
block_number, | ||
block_timestamp, | ||
event_id, | ||
} in events | ||
{ | ||
let contract_processors = processors.get_event_processor(contract_type); | ||
if let Some(processors) = contract_processors.get(&event.keys[0]) { | ||
let processor = processors | ||
.iter() | ||
.find(|p| p.validate(&event)) | ||
.expect("Must find at least one processor for the event"); | ||
|
||
debug!( | ||
target: LOG_TARGET, | ||
event_name = processor.event_key(), | ||
task_id = %task_id, | ||
priority = %priority, | ||
"Processing parallelized event." | ||
); | ||
|
||
if let Err(e) = processor | ||
.process( | ||
&world, | ||
&mut local_db, | ||
block_number, | ||
block_timestamp, | ||
&event_id, | ||
&event, | ||
&event_processor_config, | ||
) | ||
.await | ||
{ | ||
error!( | ||
target: LOG_TARGET, | ||
event_name = processor.event_key(), | ||
error = %e, | ||
task_id = %task_id, | ||
priority = %priority, | ||
"Processing parallelized event." | ||
); | ||
} | ||
} | ||
} | ||
|
||
Ok::<_, anyhow::Error>(()) | ||
})); | ||
} | ||
|
||
// Wait for all tasks in this priority level to complete before moving to next priority | ||
try_join_all(handles).await?; | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Let's enhance the error handling!
The task processing logic is well-structured, but the error handling could be improved in two areas:
- Line 105: Using
expect
could cause a panic. Consider handling the case when no processor is found. - Lines 115-135: The error is logged but not propagated, which might hide critical issues.
Apply this diff to improve error handling:
- let processor = processors
- .iter()
- .find(|p| p.validate(&event))
- .expect("Must find at least one processor for the event");
+ let processor = processors
+ .iter()
+ .find(|p| p.validate(&event))
+ .ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;
- if let Err(e) = processor
+ processor
.process(
&world,
&mut local_db,
block_number,
block_timestamp,
&event_id,
&event,
&event_processor_config,
)
- .await
- {
- error!(
- target: LOG_TARGET,
- event_name = processor.event_key(),
- error = %e,
- task_id = %task_id,
- priority = %priority,
- "Processing parallelized event."
- );
- }
+ .await
+ .map_err(|e| {
+ error!(
+ target: LOG_TARGET,
+ event_name = processor.event_key(),
+ error = %e,
+ task_id = %task_id,
+ priority = %priority,
+ "Processing parallelized event."
+ );
+ e
+ })?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn process_tasks(&mut self) -> Result<()> { | |
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); | |
// Process each priority level sequentially | |
for (priority, task_group) in std::mem::take(&mut self.tasks) { | |
let mut handles = Vec::new(); | |
// Process all tasks within this priority level concurrently | |
for (task_id, events) in task_group { | |
let db = self.db.clone(); | |
let world = self.world.clone(); | |
let semaphore = semaphore.clone(); | |
let processors = self.processors.clone(); | |
let event_processor_config = self.event_processor_config.clone(); | |
handles.push(tokio::spawn(async move { | |
let _permit = semaphore.acquire().await?; | |
let mut local_db = db.clone(); | |
// Process all events for this task sequentially | |
for ParallelizedEvent { | |
contract_type, | |
event, | |
block_number, | |
block_timestamp, | |
event_id, | |
} in events | |
{ | |
let contract_processors = processors.get_event_processor(contract_type); | |
if let Some(processors) = contract_processors.get(&event.keys[0]) { | |
let processor = processors | |
.iter() | |
.find(|p| p.validate(&event)) | |
.expect("Must find at least one processor for the event"); | |
debug!( | |
target: LOG_TARGET, | |
event_name = processor.event_key(), | |
task_id = %task_id, | |
priority = %priority, | |
"Processing parallelized event." | |
); | |
if let Err(e) = processor | |
.process( | |
&world, | |
&mut local_db, | |
block_number, | |
block_timestamp, | |
&event_id, | |
&event, | |
&event_processor_config, | |
) | |
.await | |
{ | |
error!( | |
target: LOG_TARGET, | |
event_name = processor.event_key(), | |
error = %e, | |
task_id = %task_id, | |
priority = %priority, | |
"Processing parallelized event." | |
); | |
} | |
} | |
} | |
Ok::<_, anyhow::Error>(()) | |
})); | |
} | |
// Wait for all tasks in this priority level to complete before moving to next priority | |
try_join_all(handles).await?; | |
} | |
Ok(()) | |
} | |
pub async fn process_tasks(&mut self) -> Result<()> { | |
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_tasks)); | |
// Process each priority level sequentially | |
for (priority, task_group) in std::mem::take(&mut self.tasks) { | |
let mut handles = Vec::new(); | |
// Process all tasks within this priority level concurrently | |
for (task_id, events) in task_group { | |
let db = self.db.clone(); | |
let world = self.world.clone(); | |
let semaphore = semaphore.clone(); | |
let processors = self.processors.clone(); | |
let event_processor_config = self.event_processor_config.clone(); | |
handles.push(tokio::spawn(async move { | |
let _permit = semaphore.acquire().await?; | |
let mut local_db = db.clone(); | |
// Process all events for this task sequentially | |
for ParallelizedEvent { | |
contract_type, | |
event, | |
block_number, | |
block_timestamp, | |
event_id, | |
} in events | |
{ | |
let contract_processors = processors.get_event_processor(contract_type); | |
if let Some(processors) = contract_processors.get(&event.keys[0]) { | |
let processor = processors | |
.iter() | |
.find(|p| p.validate(&event)) | |
.ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?; | |
debug!( | |
target: LOG_TARGET, | |
event_name = processor.event_key(), | |
task_id = %task_id, | |
priority = %priority, | |
"Processing parallelized event." | |
); | |
processor | |
.process( | |
&world, | |
&mut local_db, | |
block_number, | |
block_timestamp, | |
&event_id, | |
&event, | |
&event_processor_config, | |
) | |
.await | |
.map_err(|e| { | |
error!( | |
target: LOG_TARGET, | |
event_name = processor.event_key(), | |
error = %e, | |
task_id = %task_id, | |
priority = %priority, | |
"Processing parallelized event." | |
); | |
e | |
})?; | |
} | |
} | |
Ok::<_, anyhow::Error>(()) | |
})); | |
} | |
// Wait for all tasks in this priority level to complete before moving to next priority | |
try_join_all(handles).await?; | |
} | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments and let's merge. 👍 Thanks for the work done here @Larkooo!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/store_update_member.rs (1)
38-43
:⚠️ Potential issueThe task identifier implementation still needs improvement!
The previous review comments about bounds checking and documentation remain unaddressed.
For reference, the implementation should include bounds checking and documentation as previously suggested:
+ /// Generates a unique task identifier by combining specific event keys. + /// keys[1]: ... (explain what this key represents) + /// keys[2]: ... (explain what this key represents) fn task_identifier(&self, event: &Event) -> u64 { + // Ensure we have enough keys + if event.keys.len() < 3 { + // Either return a default hash or handle the error appropriately + return 0; + } + let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }
🧹 Nitpick comments (5)
crates/torii/indexer/src/processors/store_del_record.rs (1)
33-35
: Document the task priority value, sensei!Please add documentation explaining:
- The significance of priority level 2
- How it compares to other processors' priorities
- Impact on task scheduling
+ /// Returns the priority level for store deletion tasks. + /// Priority 2 indicates ... (please explain the significance) fn task_priority(&self) -> usize { 2 }crates/torii/indexer/src/processors/metadata_update.rs (2)
39-41
: Ohayo! Consider documenting the priority scale, sensei.The hardcoded priority value of
3
would benefit from documentation explaining its significance in the overall task priority scale. This helps maintainers understand where metadata updates rank in relation to other processor priorities.Add a doc comment explaining the priority scale:
+ /// Returns the task priority (3) for metadata updates. + /// Priority scale: + /// - Higher values indicate higher priority + /// - Current scale: X-Y (document the range) fn task_priority(&self) -> usize { 3 }
43-47
: Ohayo! Great implementation of task identification, sensei!The implementation nicely addresses the parallelization concern by using event keys for task identification. However, consider adding documentation to explain the hashing strategy.
Add documentation and consider defining a constant for default TaskId as suggested by @glihm:
+ /// Computes a unique task identifier by hashing the event keys. + /// This allows parallel processing of metadata updates for different resources. + /// + /// If no keys are present, returns DEFAULT_TASK_ID for serialized processing. + const DEFAULT_TASK_ID: u64 = 0; fn task_identifier(&self, event: &Event) -> u64 { + if event.keys.is_empty() { + return Self::DEFAULT_TASK_ID; + } let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() }crates/torii/indexer/src/processors/store_update_member.rs (1)
34-36
: Ohayo sensei! Let's document the task priority!The priority value of 2 needs documentation explaining:
- Why this specific priority was chosen
- How it relates to other processors' priorities
- What impact it has on processing order
Add documentation above the method:
+ /// Returns a priority of 2 for store update member events. + /// This priority ensures that ... (explain the reasoning) fn task_priority(&self) -> usize { 2 }crates/torii/indexer/src/processors/store_set_record.rs (1)
33-35
: Document the priority scheme, sensei!The hardcoded priority value of 2 needs documentation to explain:
- The priority scale being used (e.g., 1-5, 1-10)
- What this priority level means for StoreSetRecord events
- How it relates to other processors' priorities
Add a doc comment explaining the priority scheme:
+ /// Returns the priority level for StoreSetRecord events. + /// Priority 2 indicates [explain significance here] + /// The priority scale ranges from [min] to [max]. fn task_priority(&self) -> usize { 2 }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
crates/torii/indexer/src/processors/metadata_update.rs
(2 hunks)crates/torii/indexer/src/processors/store_del_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_set_record.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_member.rs
(2 hunks)crates/torii/indexer/src/processors/store_update_record.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_event.rs
(2 hunks)crates/torii/indexer/src/processors/upgrade_model.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- crates/torii/indexer/src/processors/upgrade_model.rs
- crates/torii/indexer/src/processors/store_update_record.rs
- crates/torii/indexer/src/processors/upgrade_event.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
🔇 Additional comments (5)
crates/torii/indexer/src/processors/store_del_record.rs (2)
1-1
: Ohayo! The imports look good, sensei!The new imports are well-organized and properly support the task management functionality.
Also applies to: 13-13
37-42
: Ohayo! The task identifier still needs bounds checking and documentation, sensei!The implementation could panic if event.keys has fewer than 3 elements. Consider adding bounds checking and documentation as suggested in the previous review.
crates/torii/indexer/src/processors/store_update_member.rs (1)
1-1
: Ohayo sensei! The hash-related imports look good!The necessary imports for hash functionality are correctly added.
crates/torii/indexer/src/processors/store_set_record.rs (2)
1-1
: Ohayo! Clean import statement, sensei!The hash-related imports are well-organized and properly scoped for the new task identification functionality.
37-42
: Verify task identifier implementation robustness, sensei!The current implementation has several potential concerns:
- No bounds checking for
event.keys
array access- Unclear why only
keys[1]
andkeys[2]
are used for identification- No documentation about hash collision probability
Consider adding:
- Bounds checking or documentation about expected key length
- Comments explaining the significance of the chosen keys
- Error handling for cases where keys might be missing
+ /// Generates a unique identifier for the event based on specific keys. + /// Uses keys[1] and keys[2] because [explain significance]. + /// + /// # Panics + /// Panics if the event has fewer than 3 keys. fn task_identifier(&self, event: &Event) -> u64 { + debug_assert!(event.keys.len() >= 3, "Event must have at least 3 keys"); let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }Let's verify the key usage pattern:
✅ Verification successful
Ohayo! Task identifier implementation is verified, sensei!
The implementation is correct and intentionally designed for parallel processing:
keys[1]
andkeys[2]
represent entity coordinates in the World contract- This ensures updates to the same entity are processed sequentially
- Different entities can be processed in parallel for better performance
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for StoreSetRecord event key patterns rg -A 5 "StoreSetRecord.*keys\["Length of output: 34
Script:
#!/bin/bash # Find Event struct definition ast-grep --pattern 'struct Event { $$$ }' # Find StoreSetRecord implementation and usage rg -l "StoreSetRecord" # Look for test files fd "test.*\.rs$" --exec rg -l "StoreSetRecord|Event"Length of output: 670
Script:
#!/bin/bash # Check Event struct definition cat crates/dojo/types/src/event.rs # Look for StoreSetRecord usage in tests rg -A 10 "StoreSetRecord" crates/torii/graphql/src/tests/events_test.rs # Check engine implementation cat crates/torii/indexer/src/engine.rsLength of output: 34133
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/torii/indexer/src/processors/erc20_transfer.rs (1)
44-56
:⚠️ Potential issueAdd bounds checking for array access, sensei!
The current implementation assumes
event.keys
has at least 3 elements. Add validation to handle malformed events safely.fn task_identifier(&self, event: &Event) -> TaskId { + if event.keys.len() < 3 { + let mut hasher = DefaultHasher::new(); + event.hash(&mut hasher); + return hasher.finish(); + } let mut hasher = DefaultHasher::new(); event.keys[0].hash(&mut hasher); let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); canonical_pair.hash(&mut hasher); hasher.finish() }
🧹 Nitpick comments (4)
crates/torii/indexer/src/processors/raw_event.rs (1)
27-34
: Ohayo! Add documentation for task management methods, sensei!Consider adding doc comments to explain:
- Why raw events have priority 1
- Why raw events are processed sequentially
- Future plans for parallelization (mentioned in TODO)
+ /// Returns the priority level for raw event processing. + /// Currently set to 1 as a baseline priority. fn task_priority(&self) -> TaskPriority { 1 } + /// Returns a sequential task ID for raw events. + /// Raw events are currently processed sequentially to maintain order consistency. + /// TODO: Evaluate parallelization opportunities for raw events. fn task_identifier(&self, _event: &Event) -> TaskId { // TODO. for now raw events are not parallelized task_manager::TASK_ID_SEQUENTIAL }crates/torii/indexer/src/processors/upgrade_event.rs (1)
40-44
: Consider adding a comment explaining the task identifier generation logic.The implementation is correct, but documenting why we hash the event keys would improve maintainability.
fn task_identifier(&self, event: &Event) -> TaskId { + // Generate a unique task ID by hashing event keys to allow parallel processing + // of different upgrade events let mut hasher = DefaultHasher::new(); event.keys.iter().for_each(|k| k.hash(&mut hasher)); hasher.finish() }crates/torii/indexer/src/task_manager.rs (1)
33-40
: Consider adding documentation for TaskManager fields.Adding documentation for each field would improve code maintainability and help other developers understand the purpose of each component.
pub struct TaskManager<P: Provider + Send + Sync + std::fmt::Debug + 'static> { + /// Database connection for persisting task state db: Sql, + /// World contract reader for blockchain interaction world: Arc<WorldContractReader<P>>, + /// Tasks organized by priority and task ID tasks: BTreeMap<TaskPriority, HashMap<TaskId, Vec<ParallelizedEvent>>>, + /// Available event processors processors: Arc<Processors<P>>, + /// Maximum number of concurrent tasks max_concurrent_tasks: usize, + /// Configuration for event processors event_processor_config: EventProcessorConfig, }crates/torii/indexer/src/engine.rs (1)
250-256
: Consider extracting configuration into a dedicated struct.Ohayo sensei! The TaskManager initialization has multiple parameters. Consider grouping them into a configuration struct for better maintainability.
+#[derive(Clone)] +pub struct TaskManagerConfig<P> { + pub db: Sql, + pub world: Arc<WorldContractReader<P>>, + pub processors: Arc<Processors<P>>, + pub max_concurrent_tasks: usize, + pub event_processor_config: EventProcessorConfig, +} - task_manager: TaskManager::new( - db, - world, - processors, - max_concurrent_tasks, - event_processor_config, - ), + task_manager: TaskManager::new(TaskManagerConfig { + db, + world, + processors, + max_concurrent_tasks, + event_processor_config, + }),
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
crates/torii/indexer/src/engine.rs
(8 hunks)crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc20_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/erc721_transfer.rs
(3 hunks)crates/torii/indexer/src/processors/event_message.rs
(2 hunks)crates/torii/indexer/src/processors/metadata_update.rs
(3 hunks)crates/torii/indexer/src/processors/mod.rs
(2 hunks)crates/torii/indexer/src/processors/raw_event.rs
(2 hunks)crates/torii/indexer/src/processors/register_event.rs
(3 hunks)crates/torii/indexer/src/processors/store_del_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_set_record.rs
(3 hunks)crates/torii/indexer/src/processors/store_update_member.rs
(3 hunks)crates/torii/indexer/src/processors/store_update_record.rs
(3 hunks)crates/torii/indexer/src/processors/upgrade_event.rs
(3 hunks)crates/torii/indexer/src/processors/upgrade_model.rs
(3 hunks)crates/torii/indexer/src/task_manager.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- crates/torii/indexer/src/processors/mod.rs
- crates/torii/indexer/src/processors/event_message.rs
- crates/torii/indexer/src/processors/store_update_record.rs
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: test
- GitHub Check: ensure-windows
- GitHub Check: test-hurl
🔇 Additional comments (11)
crates/torii/indexer/src/processors/store_del_record.rs (1)
33-42
:⚠️ Potential issueAdd safety checks and documentation for task management, sensei!
The implementation needs:
- Documentation explaining why this processor has priority 2
- Bounds checking for array access in task_identifier
+ /// Returns priority 2 for store delete record processing. + /// Higher priority than transfer events to ensure consistent state management. fn task_priority(&self) -> TaskPriority { 2 } + /// Generates a task identifier from event keys to manage parallel processing. fn task_identifier(&self, event: &Event) -> TaskId { + if event.keys.len() < 3 { + let mut hasher = DefaultHasher::new(); + event.hash(&mut hasher); + return hasher.finish(); + } let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }Likely invalid or redundant comment.
crates/torii/indexer/src/processors/erc721_transfer.rs (1)
44-64
:⚠️ Potential issueAdd bounds checking while maintaining excellent parallelization strategy, sensei!
The implementation has well-documented parallelization logic but needs safety checks for array access.
fn task_identifier(&self, event: &Event) -> TaskId { + if event.keys.len() < 5 { + let mut hasher = DefaultHasher::new(); + event.hash(&mut hasher); + return hasher.finish(); + } let mut hasher = DefaultHasher::new(); // Rest of the implementation remains unchanged as it's well-documented // and implements an excellent parallelization strategy event.keys[0].hash(&mut hasher); let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); canonical_pair.hash(&mut hasher); event.keys[3].hash(&mut hasher); event.keys[4].hash(&mut hasher); hasher.finish() }Likely invalid or redundant comment.
crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (1)
40-64
: Excellent implementation of task identification, sensei!The task identifier implementation is well-documented and thoughtfully designed:
- Uses canonical address pairs to group related transfers
- Leverages token ID for safe parallelization
- Clearly explains the rationale in comments
crates/torii/indexer/src/processors/register_event.rs (1)
36-44
: Well-implemented task management, sensei!The implementation is solid:
- Appropriate highest priority (0) for registration events
- Safe iteration over all keys for identifier generation
crates/torii/indexer/src/processors/store_update_member.rs (1)
39-46
: 🛠️ Refactor suggestionOhayo sensei! Great documentation, but let's add bounds checking!
The implementation has good documentation explaining key significance but should include bounds checking.
Consider this safer implementation:
fn task_identifier(&self, event: &Event) -> TaskId { + // Ensure we have enough keys + if event.keys.len() < 3 { + return 0; + } + let mut hasher = DefaultHasher::new(); // model selector event.keys[1].hash(&mut hasher); // entity id event.keys[2].hash(&mut hasher); hasher.finish() }Likely invalid or redundant comment.
crates/torii/indexer/src/processors/upgrade_model.rs (1)
36-44
: Clean and safe implementation, sensei!The implementation is well done:
- Appropriate priority level for upgrade events
- Safe iteration over keys for identifier generation
crates/torii/indexer/src/processors/upgrade_event.rs (1)
36-38
: LGTM! Task priority is appropriate for upgrade events.The priority of 1 is suitable for upgrade events as they are critical but not as time-sensitive as other operations.
crates/torii/indexer/src/processors/metadata_update.rs (2)
40-42
: Verify the task priority value.Ohayo sensei! The priority of 3 seems high compared to upgrade events (priority 1). Consider documenting why metadata updates need higher priority or adjusting the value.
44-48
: LGTM! Task identifier implementation is consistent.The implementation follows the same pattern as other processors, ensuring consistent task identification across the system.
crates/torii/indexer/src/task_manager.rs (1)
74-147
:⚠️ Potential issueEnhance error handling in task processing.
Ohayo sensei! While the task processing logic is well-structured, consider improving error handling:
- Line 107: Using
expect
could cause a panic if no processor is found.- Error propagation could be improved to maintain system stability.
Apply this diff to improve error handling:
- let processor = processors - .iter() - .find(|p| p.validate(&event)) - .expect("Must find at least one processor for the event"); + let processor = processors + .iter() + .find(|p| p.validate(&event)) + .ok_or_else(|| anyhow::anyhow!("No valid processor found for event"))?;Likely invalid or redundant comment.
crates/torii/indexer/src/engine.rs (1)
815-826
: LGTM! Clean implementation of parallelized event handling.The logic for determining whether to parallelize an event and adding it to the task manager is well-implemented. The check against
TASK_ID_SEQUENTIAL
provides a clean way to handle non-parallelizable events.
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// Hash the event key (Transfer) | ||
event.keys[0].hash(&mut hasher); | ||
|
||
// Take the max of from/to addresses to get a canonical representation | ||
// This ensures transfers between the same pair of addresses are grouped together | ||
// regardless of direction (A->B or B->A) | ||
let canonical_pair = std::cmp::max(event.data[0], event.data[1]); | ||
canonical_pair.hash(&mut hasher); | ||
|
||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add bounds checking for data array access, sensei!
The current implementation assumes event.data
has at least 2 elements. Add validation to handle malformed events safely.
fn task_identifier(&self, event: &Event) -> TaskId {
+ if event.data.len() < 2 {
+ let mut hasher = DefaultHasher::new();
+ event.hash(&mut hasher);
+ return hasher.finish();
+ }
let mut hasher = DefaultHasher::new();
event.keys[0].hash(&mut hasher);
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);
hasher.finish()
}
Committable suggestion skipped: line range outside the PR's diff.
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Ohayo sensei! The bounds check is indeed necessary for robustness!
The codebase consistently performs bounds checking before accessing event keys, as seen in other processors like ERC20/ERC721 transfers. This pattern is crucial for handling potentially malformed events and preventing panics.
fn task_identifier(&self, event: &Event) -> TaskId {
// Ensure we have the required keys (selector, model, entity)
if event.keys.len() < 3 {
return 0; // Return default hash for invalid events
}
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher); // model selector
event.keys[2].hash(&mut hasher); // entity ID
hasher.finish()
}
🔗 Analysis chain
Ohayo sensei! Let's make the task identifier more robust!
The implementation should include:
- Bounds checking before accessing
event.keys
- Documentation explaining the significance of the selected keys
Consider this safer implementation:
fn task_identifier(&self, event: &Event) -> TaskId {
+ // Ensure we have enough keys
+ if event.keys.len() < 3 {
+ // Return a default hash for invalid events
+ return 0;
+ }
+
+ // Hash keys[1] (model selector) and keys[2] (entity ID)
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find Event struct definition
ast-grep --pattern 'struct Event {$$$}'
# Find usages of event.keys
rg "event\.keys" -A 2 -B 2
# Look for documentation mentioning Event or keys
rg -i "event.*keys|keys.*event" -g "*.{md,txt,rs}"
Length of output: 65806
Summary by CodeRabbit
Release Notes
New Features
Improvements
TaskManager
to manage parallelized events more efficiently.Technical Updates
LocalCache
for concurrent access using reference counting.