diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index 5dfad77113..dd80b04e9e 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -32,9 +32,6 @@ pub enum BrokerMessage { pub struct QueryQueue { pool: Pool, pub queue: VecDeque<(String, Vec, QueryType)>, - // publishes that are related to queries in the queue, they should be sent - // after the queries are executed - pub publish_queue: VecDeque, } #[derive(Debug, Clone)] @@ -49,12 +46,15 @@ pub struct DeleteEntityQuery { pub enum QueryType { SetEntity(Ty), DeleteEntity(DeleteEntityQuery), + EventMessage(Ty), + RegisterModel, + StoreEvent, Other, } impl QueryQueue { pub fn new(pool: Pool) -> Self { - QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() } + QueryQueue { pool, queue: VecDeque::new() } } pub fn enqueue>( @@ -66,12 +66,11 @@ impl QueryQueue { self.queue.push_back((statement.into(), arguments, query_type)); } - pub fn push_publish(&mut self, value: BrokerMessage) { - self.publish_queue.push_back(value); - } - pub async fn execute_all(&mut self) -> Result<()> { let mut tx = self.pool.begin().await?; + // publishes that are related to queries in the queue, they should be sent + // after the queries are executed + let mut publish_queue = VecDeque::new(); while let Some((statement, arguments, query_type)) = self.queue.pop_front() { let mut query = sqlx::query(&statement); @@ -95,7 +94,7 @@ impl QueryQueue { entity_updated.updated_model = Some(entity); entity_updated.deleted = false; let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.push_publish(broker_message); + publish_queue.push_back(broker_message); } QueryType::DeleteEntity(entity) => { let delete_model = query.execute(&mut *tx).await.with_context(|| { @@ -135,7 +134,30 @@ impl QueryQueue { } let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.push_publish(broker_message); + publish_queue.push_back(broker_message); + } + QueryType::RegisterModel => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let model_registered = ModelRegistered::from_row(&row)?; + publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered)); + } + QueryType::EventMessage(entity) => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut event_message = EventMessageUpdated::from_row(&row)?; + event_message.updated_model = Some(entity); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); + publish_queue.push_back(broker_message); + } + QueryType::StoreEvent => { + let row = query.fetch_one(&mut *tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let event = EventEmitted::from_row(&row)?; + publish_queue.push_back(BrokerMessage::EventEmitted(event)); } QueryType::Other => { query.execute(&mut *tx).await.with_context(|| { @@ -147,7 +169,7 @@ impl QueryQueue { tx.commit().await?; - while let Some(message) = self.publish_queue.pop_front() { + while let Some(message) = publish_queue.pop_front() { send_broker_message(message); } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 249a3c4fef..acd0f04f83 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -3,7 +3,6 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Result}; -use chrono::Utc; use dojo_types::primitive::Primitive; use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; @@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; -use crate::types::{ - Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, -}; -use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; +use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType}; +use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; type IsStoreUpdate = bool; @@ -79,7 +75,6 @@ impl Sql { pub fn merge(&mut self, other: Sql) -> Result<()> { // Merge query queue self.query_queue.queue.extend(other.query_queue.queue); - self.query_queue.publish_queue.extend(other.query_queue.publish_queue); // This should never happen if self.world_address != other.world_address { @@ -173,19 +168,20 @@ impl Sql { class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \ packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \ executed_at=EXCLUDED.executed_at RETURNING *"; - let model_registered: ModelRegistered = sqlx::query_as(insert_models) - // this is temporary until the model hash is precomputed - .bind(format!("{:#x}", selector)) - .bind(namespace) - .bind(model.name()) - .bind(format!("{class_hash:#x}")) - .bind(format!("{contract_address:#x}")) - .bind(serde_json::to_string(&layout)?) - .bind(packed_size) - .bind(unpacked_size) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; + + let arguments = vec![ + Argument::String(format!("{:#x}", selector)), + Argument::String(namespace.to_string()), + Argument::String(model.name().to_string()), + Argument::String(format!("{class_hash:#x}")), + Argument::String(format!("{contract_address:#x}")), + Argument::String(serde_json::to_string(&layout)?), + Argument::Int(packed_size as i64), + Argument::Int(unpacked_size as i64), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ]; + + self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel); let mut model_idx = 0_i64; self.build_register_queries_recursive( @@ -220,7 +216,6 @@ impl Sql { }, ) .await; - self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered)); Ok(()) } @@ -304,6 +299,21 @@ impl Sql { let entity_id = format!("{:#x}", poseidon_hash_many(&keys)); let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name)); + let keys_str = felts_sql_string(&keys); + let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \ + VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ + updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ + event_id=EXCLUDED.event_id RETURNING *"; + self.query_queue.enqueue( + insert_entities, + vec![ + Argument::String(entity_id.clone()), + Argument::String(keys_str), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); self.query_queue.enqueue( "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ model_id) DO NOTHING", @@ -311,21 +321,6 @@ impl Sql { QueryType::Other, ); - let keys_str = felts_sql_string(&keys); - let insert_entities = "INSERT INTO event_messages (id, keys, event_id, executed_at) \ - VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ - updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ - event_id=EXCLUDED.event_id RETURNING *"; - let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities) - .bind(&entity_id) - .bind(&keys_str) - .bind(event_id) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; - - event_message_updated.updated_model = Some(entity.clone()); - let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( path, @@ -336,8 +331,6 @@ impl Sql { &vec![], ); - self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated)); - Ok(()) } @@ -502,21 +495,10 @@ impl Sql { self.query_queue.enqueue( "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ - (?, ?, ?, ?, ?)", + (?, ?, ?, ?, ?) RETURNING *", vec![id, keys, data, hash, executed_at], - QueryType::Other, + QueryType::StoreEvent, ); - - let emitted = EventEmitted { - id: event_id.to_string(), - keys: felts_sql_string(&event.keys), - data: felts_sql_string(&event.data), - transaction_hash: format!("{:#x}", transaction_hash), - created_at: Utc::now(), - executed_at: must_utc_datetime_from_timestamp(block_timestamp), - }; - - self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted)); } #[allow(clippy::too_many_arguments)]