From 5550d4b212796da82bdaed67e51801ccae1cf3b8 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:35:22 -0400 Subject: [PATCH 1/6] refactor: update to use executor --- crates/torii/core/src/executor.rs | 38 ++++++++++++++++++- crates/torii/core/src/types.rs | 32 ++++++++++++++++ .../grpc/src/server/subscriptions/entity.rs | 14 +++++-- .../src/server/subscriptions/event_message.rs | 11 ++++-- 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 503759e43f..2d285be452 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -15,7 +15,7 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, + Model as ModelRegistered, OptimisticEntity, OptimisticEventMessage, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor"; @@ -185,6 +185,19 @@ impl<'c> Executor<'c> { let mut entity_updated = EntityUpdated::from_row(&row)?; entity_updated.updated_model = Some(entity); entity_updated.deleted = false; + + let optimistic_entity = OptimisticEntity { + id: entity_updated.id.clone(), + keys: entity_updated.keys.clone(), + event_id: entity_updated.event_id.clone(), + executed_at: entity_updated.executed_at.clone(), + created_at: entity_updated.created_at.clone(), + updated_at: entity_updated.updated_at.clone(), + updated_model: entity_updated.updated_model.clone(), + deleted: entity_updated.deleted, + }; + SimpleBroker::publish(optimistic_entity); + let broker_message = BrokerMessage::EntityUpdated(entity_updated); self.publish_queue.push(broker_message); } @@ -225,6 +238,17 @@ impl<'c> Executor<'c> { entity_updated.deleted = true; } + let optimistic_entity = OptimisticEntity { + id: entity_updated.id.clone(), + keys: entity_updated.keys.clone(), + event_id: entity_updated.event_id.clone(), + executed_at: entity_updated.executed_at.clone(), + created_at: entity_updated.created_at.clone(), + updated_at: entity_updated.updated_at.clone(), + updated_model: entity_updated.updated_model.clone(), + deleted: entity_updated.deleted, + }; + SimpleBroker::publish(optimistic_entity); let broker_message = BrokerMessage::EntityUpdated(entity_updated); self.publish_queue.push(broker_message); } @@ -241,6 +265,18 @@ impl<'c> Executor<'c> { })?; let mut event_message = EventMessageUpdated::from_row(&row)?; event_message.updated_model = Some(entity); + + let optimistic_event_message = OptimisticEventMessage { + id: event_message.id.clone(), + keys: event_message.keys.clone(), + event_id: event_message.event_id.clone(), + executed_at: event_message.executed_at.clone(), + created_at: event_message.created_at.clone(), + updated_at: event_message.updated_at.clone(), + updated_model: event_message.updated_model.clone(), + }; + SimpleBroker::publish(optimistic_event_message); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); self.publish_queue.push(broker_message); } diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..7d9bd9bf21 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -46,6 +46,23 @@ pub struct Entity { pub deleted: bool, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticEntity { + pub id: String, + pub keys: String, + pub event_id: String, + pub executed_at: DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, + + // this should never be None + #[sqlx(skip)] + pub updated_model: Option, + #[sqlx(skip)] + pub deleted: bool, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct EventMessage { @@ -61,6 +78,21 @@ pub struct EventMessage { pub updated_model: Option, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticEventMessage { + pub id: String, + pub keys: String, + pub event_id: String, + pub executed_at: DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, + + // this should never be None + #[sqlx(skip)] + pub updated_model: Option, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index d7b03ae26e..33b3b5f472 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -14,7 +14,7 @@ use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; -use torii_core::types::Entity; +use torii_core::types::OptimisticEntity; use tracing::{error, trace}; use crate::proto; @@ -78,15 +78,21 @@ impl EntityManager { #[allow(missing_debug_implementations)] pub struct Service { subs_manager: Arc, - simple_broker: Pin + Send>>, + simple_broker: Pin + Send>>, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + Self { + subs_manager, + simple_broker: Box::pin(SimpleBroker::::subscribe()), + } } - async fn publish_updates(subs: Arc, entity: &Entity) -> Result<(), Error> { + async fn publish_updates( + subs: Arc, + entity: &OptimisticEntity, + ) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index 9bac36fb84..ab2611d1ac 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -14,7 +14,7 @@ use tokio::sync::RwLock; use torii_core::error::{Error, ParseError}; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; -use torii_core::types::EventMessage; +use torii_core::types::OptimisticEventMessage; use tracing::{error, trace}; use super::entity::EntitiesSubscriber; @@ -72,17 +72,20 @@ impl EventMessageManager { #[allow(missing_debug_implementations)] pub struct Service { subs_manager: Arc, - simple_broker: Pin + Send>>, + simple_broker: Pin + Send>>, } impl Service { pub fn new(subs_manager: Arc) -> Self { - Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } + Self { + subs_manager, + simple_broker: Box::pin(SimpleBroker::::subscribe()), + } } async fn publish_updates( subs: Arc, - entity: &EventMessage, + entity: &OptimisticEventMessage, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; From 7f43a45204c6a2bc59fe92647ace1df070381f2c Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 15:50:22 -0400 Subject: [PATCH 2/6] clippy --- crates/torii/core/src/executor.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 2d285be452..f450d963be 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -190,9 +190,9 @@ impl<'c> Executor<'c> { id: entity_updated.id.clone(), keys: entity_updated.keys.clone(), event_id: entity_updated.event_id.clone(), - executed_at: entity_updated.executed_at.clone(), - created_at: entity_updated.created_at.clone(), - updated_at: entity_updated.updated_at.clone(), + executed_at: entity_updated.executed_at, + created_at: entity_updated.created_at, + updated_at: entity_updated.updated_at, updated_model: entity_updated.updated_model.clone(), deleted: entity_updated.deleted, }; @@ -242,9 +242,9 @@ impl<'c> Executor<'c> { id: entity_updated.id.clone(), keys: entity_updated.keys.clone(), event_id: entity_updated.event_id.clone(), - executed_at: entity_updated.executed_at.clone(), - created_at: entity_updated.created_at.clone(), - updated_at: entity_updated.updated_at.clone(), + executed_at: entity_updated.executed_at, + created_at: entity_updated.created_at, + updated_at: entity_updated.updated_at, updated_model: entity_updated.updated_model.clone(), deleted: entity_updated.deleted, }; @@ -270,9 +270,9 @@ impl<'c> Executor<'c> { id: event_message.id.clone(), keys: event_message.keys.clone(), event_id: event_message.event_id.clone(), - executed_at: event_message.executed_at.clone(), - created_at: event_message.created_at.clone(), - updated_at: event_message.updated_at.clone(), + executed_at: event_message.executed_at, + created_at: event_message.created_at, + updated_at: event_message.updated_at, updated_model: event_message.updated_model.clone(), }; SimpleBroker::publish(optimistic_event_message); From 10945429c636f77b82feba9cda96cc3276790927 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 17:06:38 -0400 Subject: [PATCH 3/6] make sure order --- crates/torii/core/src/simple_broker.rs | 35 ++++++++++++++++++++------ 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/crates/torii/core/src/simple_broker.rs b/crates/torii/core/src/simple_broker.rs index 37bdb2118d..b05810c564 100644 --- a/crates/torii/core/src/simple_broker.rs +++ b/crates/torii/core/src/simple_broker.rs @@ -1,5 +1,5 @@ use std::any::{Any, TypeId}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Mutex; @@ -13,7 +13,10 @@ use slab::Slab; static SUBSCRIBERS: Lazy>>> = Lazy::new(Default::default); #[derive(Debug)] -pub struct Senders(pub Slab>); +pub struct Senders { + pub slab: Slab>, + pub message_queue: VecDeque, +} struct BrokerStream(usize, UnboundedReceiver); @@ -25,13 +28,16 @@ where let mut map = SUBSCRIBERS.lock().unwrap(); let senders = map .entry(TypeId::of::>()) - .or_insert_with(|| Box::new(Senders::(Default::default()))); + .or_insert_with(|| Box::new(Senders:: { + slab: Default::default(), + message_queue: VecDeque::new(), + })); f(senders.downcast_mut::>().unwrap()) } impl Drop for BrokerStream { fn drop(&mut self) { - with_senders::(|senders| senders.0.remove(self.0)); + with_senders::(|senders| senders.slab.remove(self.0)); } } @@ -51,17 +57,30 @@ impl SimpleBroker { /// Publish a message that all subscription streams can receive. pub fn publish(msg: T) { with_senders::(|senders| { - for (_, sender) in senders.0.iter_mut() { - sender.start_send(msg.clone()).ok(); - } + senders.message_queue.push_back(msg.clone()); + Self::send_messages(senders); }); } + fn send_messages(senders: &mut Senders) { + while let Some(msg) = senders.message_queue.pop_front() { + let mut failed_senders = Vec::new(); + for (id, sender) in senders.slab.iter_mut() { + if sender.start_send(msg.clone()).is_err() { + failed_senders.push(id); + } + } + for id in failed_senders { + senders.slab.remove(id); + } + } + } + /// Subscribe to the message of the specified type and returns a `Stream`. pub fn subscribe() -> impl Stream { with_senders::(|senders| { let (tx, rx) = mpsc::unbounded(); - let id = senders.0.insert(tx); + let id = senders.slab.insert(tx); BrokerStream(id, rx) }) } From 7e3fe7c915cdf4b5a70af57cf759f453eb65ebb2 Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 4 Oct 2024 17:08:20 -0400 Subject: [PATCH 4/6] fmt --- crates/torii/core/src/simple_broker.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/torii/core/src/simple_broker.rs b/crates/torii/core/src/simple_broker.rs index b05810c564..895f6369c6 100644 --- a/crates/torii/core/src/simple_broker.rs +++ b/crates/torii/core/src/simple_broker.rs @@ -26,12 +26,9 @@ where F: FnOnce(&mut Senders) -> R, { let mut map = SUBSCRIBERS.lock().unwrap(); - let senders = map - .entry(TypeId::of::>()) - .or_insert_with(|| Box::new(Senders:: { - slab: Default::default(), - message_queue: VecDeque::new(), - })); + let senders = map.entry(TypeId::of::>()).or_insert_with(|| { + Box::new(Senders:: { slab: Default::default(), message_queue: VecDeque::new() }) + }); f(senders.downcast_mut::>().unwrap()) } From 597f38e2d2a37743fdfe7083e41c7fc11791c9a9 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 8 Oct 2024 10:38:28 -0400 Subject: [PATCH 5/6] c --- crates/torii/core/src/simple_broker.rs | 36 +++++++------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/crates/torii/core/src/simple_broker.rs b/crates/torii/core/src/simple_broker.rs index 895f6369c6..37bdb2118d 100644 --- a/crates/torii/core/src/simple_broker.rs +++ b/crates/torii/core/src/simple_broker.rs @@ -1,5 +1,5 @@ use std::any::{Any, TypeId}; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Mutex; @@ -13,10 +13,7 @@ use slab::Slab; static SUBSCRIBERS: Lazy>>> = Lazy::new(Default::default); #[derive(Debug)] -pub struct Senders { - pub slab: Slab>, - pub message_queue: VecDeque, -} +pub struct Senders(pub Slab>); struct BrokerStream(usize, UnboundedReceiver); @@ -26,15 +23,15 @@ where F: FnOnce(&mut Senders) -> R, { let mut map = SUBSCRIBERS.lock().unwrap(); - let senders = map.entry(TypeId::of::>()).or_insert_with(|| { - Box::new(Senders:: { slab: Default::default(), message_queue: VecDeque::new() }) - }); + let senders = map + .entry(TypeId::of::>()) + .or_insert_with(|| Box::new(Senders::(Default::default()))); f(senders.downcast_mut::>().unwrap()) } impl Drop for BrokerStream { fn drop(&mut self) { - with_senders::(|senders| senders.slab.remove(self.0)); + with_senders::(|senders| senders.0.remove(self.0)); } } @@ -54,30 +51,17 @@ impl SimpleBroker { /// Publish a message that all subscription streams can receive. pub fn publish(msg: T) { with_senders::(|senders| { - senders.message_queue.push_back(msg.clone()); - Self::send_messages(senders); - }); - } - - fn send_messages(senders: &mut Senders) { - while let Some(msg) = senders.message_queue.pop_front() { - let mut failed_senders = Vec::new(); - for (id, sender) in senders.slab.iter_mut() { - if sender.start_send(msg.clone()).is_err() { - failed_senders.push(id); - } - } - for id in failed_senders { - senders.slab.remove(id); + for (_, sender) in senders.0.iter_mut() { + sender.start_send(msg.clone()).ok(); } - } + }); } /// Subscribe to the message of the specified type and returns a `Stream`. pub fn subscribe() -> impl Stream { with_senders::(|senders| { let (tx, rx) = mpsc::unbounded(); - let id = senders.slab.insert(tx); + let id = senders.0.insert(tx); BrokerStream(id, rx) }) } From 37b5f60ace5ae6d85c93af11a5a1f48b07685881 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 8 Oct 2024 10:49:15 -0400 Subject: [PATCH 6/6] fmt --- crates/torii/core/src/executor.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 30249797db..a1e5bf2e43 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -14,8 +14,9 @@ use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; use crate::types::{ - Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, OptimisticEntity, OptimisticEventMessage, + Contract as ContractUpdated, Entity as EntityUpdated, Event as EventEmitted, + EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity, + OptimisticEventMessage, }; pub(crate) const LOG_TARGET: &str = "torii_core::executor";