-
Notifications
You must be signed in to change notification settings - Fork 191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(torii-core): enqueue models & events #2471
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -49,6 +49,9 @@ pub struct DeleteEntityQuery { | |||||||||||||||||||||||||||||||||||||||
pub enum QueryType { | ||||||||||||||||||||||||||||||||||||||||
SetEntity(Ty), | ||||||||||||||||||||||||||||||||||||||||
DeleteEntity(DeleteEntityQuery), | ||||||||||||||||||||||||||||||||||||||||
EventMessage(Ty), | ||||||||||||||||||||||||||||||||||||||||
RegisterModel, | ||||||||||||||||||||||||||||||||||||||||
StoreEvent, | ||||||||||||||||||||||||||||||||||||||||
Other, | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
@@ -137,6 +140,29 @@ impl QueryQueue { | |||||||||||||||||||||||||||||||||||||||
let broker_message = BrokerMessage::EntityUpdated(entity_updated); | ||||||||||||||||||||||||||||||||||||||||
self.push_publish(broker_message); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
QueryType::RegisterModel => { | ||||||||||||||||||||||||||||||||||||||||
let row = query.fetch_one(&mut *tx).await.with_context(|| { | ||||||||||||||||||||||||||||||||||||||||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||||||||||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||||||||||
let model_registered = ModelRegistered::from_row(&row)?; | ||||||||||||||||||||||||||||||||||||||||
self.push_publish(BrokerMessage::ModelRegistered(model_registered)); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
QueryType::EventMessage(entity) => { | ||||||||||||||||||||||||||||||||||||||||
let row = query.fetch_one(&mut *tx).await.with_context(|| { | ||||||||||||||||||||||||||||||||||||||||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||||||||||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||||||||||
let mut event_message = EventMessageUpdated::from_row(&row)?; | ||||||||||||||||||||||||||||||||||||||||
event_message.updated_model = Some(entity); | ||||||||||||||||||||||||||||||||||||||||
let broker_message = BrokerMessage::EventMessageUpdated(event_message); | ||||||||||||||||||||||||||||||||||||||||
self.push_publish(broker_message); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
QueryType::StoreEvent => { | ||||||||||||||||||||||||||||||||||||||||
let row = query.fetch_one(&mut *tx).await.with_context(|| { | ||||||||||||||||||||||||||||||||||||||||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||||||||||||||||||||||||||||||||||||||||
})?; | ||||||||||||||||||||||||||||||||||||||||
let event = EventEmitted::from_row(&row)?; | ||||||||||||||||||||||||||||||||||||||||
self.push_publish(BrokerMessage::EventEmitted(event)); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure proper handling when no rows are returned in Sensei, in the Here is a suggested change to handle this scenario: let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let event = EventEmitted::from_row(&row)?;
self.push_publish(BrokerMessage::EventEmitted(event)); Apply this diff: -let row = query.fetch_one(&mut *tx).await.with_context(|| {
+let row = query.fetch_optional(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
-let event = EventEmitted::from_row(&row)?;
-self.push_publish(BrokerMessage::EventEmitted(event));
+if let Some(row) = row {
+ let event = EventEmitted::from_row(&row)?;
+ self.push_publish(BrokerMessage::EventEmitted(event));
+} else {
+ // Optionally handle the case where no event is found
+ // For example, log a warning or take alternative action
+} This change ensures that your code gracefully handles situations where the query returns no results. Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
QueryType::Other => { | ||||||||||||||||||||||||||||||||||||||||
query.execute(&mut *tx).await.with_context(|| { | ||||||||||||||||||||||||||||||||||||||||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ use std::str::FromStr; | |
use std::sync::Arc; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use chrono::Utc; | ||
use dojo_types::primitive::Primitive; | ||
use dojo_types::schema::{EnumOption, Member, Struct, Ty}; | ||
use dojo_world::contracts::abi::model::Layout; | ||
|
@@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many; | |
use tracing::{debug, warn}; | ||
|
||
use crate::cache::{Model, ModelCache}; | ||
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; | ||
use crate::types::{ | ||
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, | ||
}; | ||
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; | ||
use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType}; | ||
use crate::utils::utc_dt_string_from_timestamp; | ||
|
||
type IsEventMessage = bool; | ||
type IsStoreUpdate = bool; | ||
|
@@ -173,19 +169,20 @@ impl Sql { | |
class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \ | ||
packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \ | ||
executed_at=EXCLUDED.executed_at RETURNING *"; | ||
let model_registered: ModelRegistered = sqlx::query_as(insert_models) | ||
// this is temporary until the model hash is precomputed | ||
.bind(format!("{:#x}", selector)) | ||
.bind(namespace) | ||
.bind(model.name()) | ||
.bind(format!("{class_hash:#x}")) | ||
.bind(format!("{contract_address:#x}")) | ||
.bind(serde_json::to_string(&layout)?) | ||
.bind(packed_size) | ||
.bind(unpacked_size) | ||
.bind(utc_dt_string_from_timestamp(block_timestamp)) | ||
.fetch_one(&self.pool) | ||
.await?; | ||
|
||
let arguments = vec![ | ||
Argument::String(format!("{:#x}", selector)), | ||
Argument::String(namespace.to_string()), | ||
Argument::String(model.name().to_string()), | ||
Argument::String(format!("{class_hash:#x}")), | ||
Argument::String(format!("{contract_address:#x}")), | ||
Argument::String(serde_json::to_string(&layout)?), | ||
Argument::Int(packed_size as i64), | ||
Argument::Int(unpacked_size as i64), | ||
Argument::String(utc_dt_string_from_timestamp(block_timestamp)), | ||
]; | ||
|
||
self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel); | ||
|
||
let mut model_idx = 0_i64; | ||
self.build_register_queries_recursive( | ||
|
@@ -220,7 +217,6 @@ impl Sql { | |
}, | ||
) | ||
.await; | ||
self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); | ||
|
||
Ok(()) | ||
} | ||
|
@@ -316,15 +312,16 @@ impl Sql { | |
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ | ||
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ | ||
event_id=EXCLUDED.event_id RETURNING *"; | ||
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities) | ||
.bind(&entity_id) | ||
.bind(&keys_str) | ||
.bind(event_id) | ||
.bind(utc_dt_string_from_timestamp(block_timestamp)) | ||
.fetch_one(&self.pool) | ||
.await?; | ||
|
||
event_message_updated.updated_model = Some(entity.clone()); | ||
self.query_queue.enqueue( | ||
insert_entities, | ||
vec![ | ||
Argument::String(entity_id.clone()), | ||
Argument::String(keys_str), | ||
Argument::String(event_id.to_string()), | ||
Argument::String(utc_dt_string_from_timestamp(block_timestamp)), | ||
], | ||
QueryType::Other, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think this should be QueryType::EntityMessage There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
); | ||
|
||
let path = vec![namespaced_name]; | ||
self.build_set_entity_queries_recursive( | ||
|
@@ -336,8 +333,6 @@ impl Sql { | |
&vec![], | ||
); | ||
|
||
self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated)); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -504,19 +499,8 @@ impl Sql { | |
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ | ||
(?, ?, ?, ?, ?)", | ||
vec![id, keys, data, hash, executed_at], | ||
QueryType::Other, | ||
QueryType::StoreEvent, | ||
); | ||
|
||
let emitted = EventEmitted { | ||
id: event_id.to_string(), | ||
keys: felts_sql_string(&event.keys), | ||
data: felts_sql_string(&event.data), | ||
transaction_hash: format!("{:#x}", transaction_hash), | ||
created_at: Utc::now(), | ||
executed_at: must_utc_datetime_from_timestamp(block_timestamp), | ||
}; | ||
|
||
self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted)); | ||
} | ||
|
||
#[allow(clippy::too_many_arguments)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider refactoring to reduce code duplication in query execution and error handling.
Sensei, the
QueryType::RegisterModel
,QueryType::EventMessage
, andQueryType::StoreEvent
variants share similar logic for executing a query, handling errors, fetching a row, and publishing a broker message. Refactoring this repeated code into a helper function would enhance maintainability and reduce redundancy.Here's how you might implement the refactor:
Create a helper function for executing queries and publishing messages:
Use the helper function in each variant:
This refactor reduces duplication and makes the code more modular and easier to maintain.