Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor-opt(torii-grpc): subscriptions no db fetch #2455

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;

use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use dojo_types::schema::{Struct, Ty};
use sqlx::{FromRow, Pool, Sqlite};
use starknet::core::types::Felt;

Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct DeleteEntityQuery {
pub entity_id: String,
pub event_id: String,
pub block_timestamp: String,
pub entity: Ty,
pub ty: Ty,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -115,7 +115,10 @@ impl QueryQueue {
.fetch_one(&mut *tx)
.await?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity.entity);
entity_updated.updated_model = Some(Ty::Struct(Struct {
name: entity.ty.name(),
children: vec![],
}));

let count = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM entity_model WHERE entity_id = ?",
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Sql {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
entity: entity.clone(),
ty: entity.clone(),
}),
);

Expand Down
57 changes: 8 additions & 49 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@ use std::task::{Context, Poll};
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Entity;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
Expand Down Expand Up @@ -81,30 +77,22 @@ impl EntityManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = Entity> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EntityManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<Entity>::subscribe()),
}
}

async fn publish_updates(
subs: Arc<EntityManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &Entity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -204,41 +192,14 @@ impl Service {
continue;
}

let models_query = r#"
SELECT group_concat(entity_model.model_id) as model_ids
FROM entities
JOIN entity_model ON entities.id = entity_model.entity_id
WHERE entities.id = ?
GROUP BY entities.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"entities",
"entity_id",
Some("entities.id = ?"),
Some("entities.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let row = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, row);
}

let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![
model.into()
],
}),
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
subscription_id: *idx,
};

Expand All @@ -264,10 +225,8 @@ impl Future for Service {

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down
58 changes: 8 additions & 50 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ use std::task::{Context, Poll};
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::RwLock;
use torii_core::cache::ModelCache;
use torii_core::error::{Error, ParseError};
use torii_core::model::build_sql_query;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::FELT_DELIMITER;
use torii_core::types::EventMessage;
Expand All @@ -23,7 +20,6 @@ use tracing::{error, trace};
use super::entity::EntitiesSubscriber;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::server::map_row_to_entity;
use crate::types::{EntityKeysClause, PatternMatching};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";
Expand Down Expand Up @@ -75,30 +71,22 @@ impl EventMessageManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
simple_broker: Pin<Box<dyn Stream<Item = EventMessage> + Send>>,
}

impl Service {
pub fn new(
pool: Pool<Sqlite>,
subs_manager: Arc<EventMessageManager>,
model_cache: Arc<ModelCache>,
) -> Self {
Self {
pool,
subs_manager,
model_cache,
simple_broker: Box::pin(SimpleBroker::<EventMessage>::subscribe()),
}
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
cache: Arc<ModelCache>,
pool: Pool<Sqlite>,
entity: &EventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -182,42 +170,14 @@ impl Service {
continue;
}

// publish all updates if ids is empty or only ids that are subscribed to
let models_query = r#"
SELECT group_concat(event_model.model_id) as model_ids
FROM event_messages
JOIN event_model ON event_messages.id = event_model.entity_id
WHERE event_messages.id = ?
GROUP BY event_messages.id
"#;
let (model_ids,): (String,) =
sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?;
let model_ids: Vec<Felt> = model_ids
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
"event_messages",
"event_message_id",
Some("event_messages.id = ?"),
Some("event_messages.id = ?"),
None,
None,
)?;

let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?;
let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let rows = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?;
arrays_rows.insert(name, rows);
}

let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone();
let resp = proto::world::SubscribeEntityResponse {
entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?),
entity: Some(proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![
model.into()
],
}),
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
subscription_id: *idx,
};

Expand All @@ -243,10 +203,8 @@ impl Future for Service {

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
let cache = Arc::clone(&pin.model_cache);
let pool = pin.pool.clone();
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
Expand Down
Loading