diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index e0a99700f8..a1e5bf2e43 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -15,7 +15,8 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted, - EventMessage as EventMessageUpdated, Model as ModelRegistered, + EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity, + OptimisticEventMessage, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; @@ -224,6 +225,19 @@ impl<'c> Executor<'c> { let mut entity_updated = EntityUpdated::from_row(&row)?; entity_updated.updated_model = Some(entity); entity_updated.deleted = false; + + let optimistic_entity = OptimisticEntity { + id: entity_updated.id.clone(), + keys: entity_updated.keys.clone(), + event_id: entity_updated.event_id.clone(), + executed_at: entity_updated.executed_at, + created_at: entity_updated.created_at, + updated_at: entity_updated.updated_at, + updated_model: entity_updated.updated_model.clone(), + deleted: entity_updated.deleted, + }; + SimpleBroker::publish(optimistic_entity); + let broker_message = BrokerMessage::EntityUpdated(entity_updated); self.publish_queue.push(broker_message); } @@ -264,6 +278,17 @@ impl<'c> Executor<'c> { entity_updated.deleted = true; } + let optimistic_entity = OptimisticEntity { + id: entity_updated.id.clone(), + keys: entity_updated.keys.clone(), + event_id: entity_updated.event_id.clone(), + executed_at: entity_updated.executed_at, + created_at: entity_updated.created_at, + updated_at: entity_updated.updated_at, + updated_model: entity_updated.updated_model.clone(), + deleted: entity_updated.deleted, + }; + SimpleBroker::publish(optimistic_entity); let broker_message = BrokerMessage::EntityUpdated(entity_updated); self.publish_queue.push(broker_message); } @@ -280,6 +305,18 @@ impl<'c> Executor<'c> { })?; let mut event_message = EventMessageUpdated::from_row(&row)?; event_message.updated_model = Some(entity); + + let optimistic_event_message = OptimisticEventMessage { + id: event_message.id.clone(), + keys: event_message.keys.clone(), + event_id: event_message.event_id.clone(), + executed_at: event_message.executed_at, + created_at: event_message.created_at, + updated_at: event_message.updated_at, + updated_model: event_message.updated_model.clone(), + }; + SimpleBroker::publish(optimistic_event_message); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); self.publish_queue.push(broker_message); } diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index e87a1205e7..f24607a91a 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -46,6 +46,23 @@ pub struct Entity { pub deleted: bool, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticEntity { + pub id: String, + pub keys: String, + pub event_id: String, + pub executed_at: DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, + + // this should never be None + #[sqlx(skip)] + pub updated_model: Option, + #[sqlx(skip)] + pub deleted: bool, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct EventMessage { @@ -61,6 +78,21 @@ pub struct EventMessage { pub updated_model: Option, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticEventMessage { + pub id: String, + pub keys: String, + pub event_id: String, + pub executed_at: DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, + + // this should never be None + #[sqlx(skip)] + pub updated_model: Option, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index d7b03ae26e..33b3b5f472 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -14,7 +14,7 @@ use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; -use torii_core::types::Entity; +use torii_core::types::OptimisticEntity; use tracing::{error, trace}; use crate::proto; @@ -78,15 +78,21 @@ impl EntityManager { #[allow(missing_debug_implementations)] pub struct Service { subs_manager: Arc, - simple_broker: Pin + Send>>, + simple_broker: Pin + Send>>, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + Self { + subs_manager, + simple_broker: Box::pin(SimpleBroker::::subscribe()), + } } - async fn publish_updates(subs: Arc, entity: &Entity) -> Result<(), Error> { + async fn publish_updates( + subs: Arc, + entity: &OptimisticEntity, + ) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index 9bac36fb84..ab2611d1ac 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -14,7 +14,7 @@ use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; -use torii_core::types::EventMessage; +use torii_core::types::OptimisticEventMessage; use tracing::{error, trace}; use super::entity::EntitiesSubscriber; @@ -72,17 +72,20 @@ impl EventMessageManager { #[allow(missing_debug_implementations)] pub struct Service { subs_manager: Arc, - simple_broker: Pin + Send>>, + simple_broker: Pin + Send>>, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + Self { + subs_manager, + simple_broker: Box::pin(SimpleBroker::::subscribe()), + } } async fn publish_updates( subs: Arc, - entity: &EventMessage, + entity: &OptimisticEventMessage, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;