From 54a891ee45d0f593c7d0cea28c5b72f56008412e Mon Sep 17 00:00:00 2001 From: Larko <59736843+Larkooo@users.noreply.github.com> Date: Wed, 21 Aug 2024 16:32:00 -0400 Subject: [PATCH] opt(torii): indexing model schema retrieval (#2319) * opt(torii): indexing model schema retrieval * :fix and optimize grpc * chore: clippy --- crates/torii/core/src/cache.rs | 75 +++++++++++++++---- .../core/src/processors/event_message.rs | 5 +- .../core/src/processors/store_del_record.rs | 5 +- .../core/src/processors/store_set_record.rs | 5 +- .../src/processors/store_update_member.rs | 7 +- .../src/processors/store_update_record.rs | 7 +- crates/torii/core/src/sql.rs | 20 ++--- crates/torii/grpc/src/server/mod.rs | 49 +++++------- .../grpc/src/server/subscriptions/entity.rs | 2 +- .../src/server/subscriptions/event_message.rs | 2 +- crates/torii/libp2p/src/server/mod.rs | 8 +- 11 files changed, 106 insertions(+), 79 deletions(-) diff --git a/crates/torii/core/src/cache.rs b/crates/torii/core/src/cache.rs index a680f39ec7..a77c88642f 100644 --- a/crates/torii/core/src/cache.rs +++ b/crates/torii/core/src/cache.rs @@ -1,17 +1,36 @@ use std::collections::HashMap; use dojo_types::schema::Ty; +use dojo_world::contracts::abi::model::Layout; use sqlx::SqlitePool; use starknet_crypto::Felt; use tokio::sync::RwLock; -use crate::error::{Error, QueryError}; +use crate::error::{Error, ParseError, QueryError}; use crate::model::{parse_sql_model_members, SqlModelMember}; +#[derive(Debug, Clone)] +pub struct Model { + /// Namespace of the model + pub namespace: String, + /// The name of the model + pub name: String, + /// The selector of the model + pub selector: Felt, + /// The class hash of the model + pub class_hash: Felt, + /// The contract address of the model + pub contract_address: Felt, + pub packed_size: u32, + pub unpacked_size: u32, + pub layout: Layout, + pub schema: Ty, +} + #[derive(Debug)] pub struct ModelCache { pool: SqlitePool, - cache: RwLock>, + cache: RwLock>, } impl ModelCache { @@ -19,16 +38,16 @@ impl ModelCache { Self { pool, cache: RwLock::new(HashMap::new()) } } - pub async fn schemas(&self, selectors: &[Felt]) -> Result, Error> { + pub async fn models(&self, selectors: &[Felt]) -> Result, Error> { let mut schemas = Vec::with_capacity(selectors.len()); for selector in selectors { - schemas.push(self.schema(selector).await?); + schemas.push(self.model(selector).await?); } Ok(schemas) } - pub async fn schema(&self, selector: &Felt) -> Result { + pub async fn model(&self, selector: &Felt) -> Result { { let cache = self.cache.read().await; if let Some(model) = cache.get(selector).cloned() { @@ -36,17 +55,33 @@ impl ModelCache { } } - self.update_schema(selector).await + self.update_model(selector).await } - async fn update_schema(&self, selector: &Felt) -> Result { + async fn update_model(&self, selector: &Felt) -> Result { let formatted_selector = format!("{:#x}", selector); - let (namespace, name): (String, String) = - sqlx::query_as("SELECT namespace, name FROM models WHERE id = ?") - .bind(formatted_selector.clone()) - .fetch_one(&self.pool) - .await?; + let (namespace, name, class_hash, contract_address, packed_size, unpacked_size, layout): ( + String, + String, + String, + String, + u32, + u32, + String, + ) = sqlx::query_as( + "SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \ + layout FROM models WHERE id = ?", + ) + .bind(format!("{:#x}", selector)) + .fetch_one(&self.pool) + .await?; + + let class_hash = Felt::from_hex(&class_hash).map_err(ParseError::FromStr)?; + let contract_address = Felt::from_hex(&contract_address).map_err(ParseError::FromStr)?; + + let layout = serde_json::from_str(&layout).map_err(ParseError::FromJsonStr)?; + let model_members: Vec = sqlx::query_as( "SELECT id, model_idx, member_idx, name, type, type_enum, enum_options, key FROM \ model_members WHERE model_id = ? ORDER BY model_idx ASC, member_idx ASC", @@ -61,9 +96,21 @@ impl ModelCache { let schema = parse_sql_model_members(&namespace, &name, &model_members); let mut cache = self.cache.write().await; - cache.insert(*selector, schema.clone()); - Ok(schema) + let model = Model { + namespace, + name, + selector: *selector, + class_hash, + contract_address, + packed_size, + unpacked_size, + layout, + schema, + }; + cache.insert(*selector, model.clone()); + + Ok(model) } pub async fn clear(&self) { diff --git a/crates/torii/core/src/processors/event_message.rs b/crates/torii/core/src/processors/event_message.rs index a3c06fc762..dcbe66aa58 100644 --- a/crates/torii/core/src/processors/event_message.rs +++ b/crates/torii/core/src/processors/event_message.rs @@ -1,6 +1,5 @@ use anyhow::{Error, Result}; use async_trait::async_trait; -use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, TransactionReceiptWithBlockInfo}; use starknet::providers::Provider; @@ -54,7 +53,7 @@ where info!( target: LOG_TARGET, - model = %model.name(), + model = %model.name, "Store event message." ); @@ -63,7 +62,7 @@ where let mut keys_and_unpacked = [event.keys[1..event.keys.len() - 1].to_vec(), event.data.clone()].concat(); - let mut entity = model.schema().await?; + let mut entity = model.schema.clone(); entity.deserialize(&mut keys_and_unpacked)?; db.set_event_message(entity, event_id, block_timestamp).await?; diff --git a/crates/torii/core/src/processors/store_del_record.rs b/crates/torii/core/src/processors/store_del_record.rs index 05c3b0e3a7..fcc901bd20 100644 --- a/crates/torii/core/src/processors/store_del_record.rs +++ b/crates/torii/core/src/processors/store_del_record.rs @@ -1,6 +1,5 @@ use anyhow::{Error, Ok, Result}; use async_trait::async_trait; -use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; use starknet::core::types::{Event, TransactionReceiptWithBlockInfo}; use starknet::providers::Provider; @@ -53,12 +52,12 @@ where info!( target: LOG_TARGET, - name = %model.name(), + name = %model.name, "Store delete record." ); let entity_id = event.data[ENTITY_ID_INDEX]; - let entity = model.schema().await?; + let entity = model.schema; db.delete_entity(entity_id, entity, event_id, block_timestamp).await?; diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index 6641e19b16..6fa9fc5394 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Error, Ok, Result}; use async_trait::async_trait; -use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::world::WorldContractReader; use num_traits::ToPrimitive; use starknet::core::types::{Event, TransactionReceiptWithBlockInfo}; @@ -54,7 +53,7 @@ where info!( target: LOG_TARGET, - name = %model.name(), + name = %model.name, "Store set record.", ); @@ -72,7 +71,7 @@ where let values = event.data[values_start..values_end].to_vec(); let mut keys_and_unpacked = [keys, values].concat(); - let mut entity = model.schema().await?; + let mut entity = model.schema; entity.deserialize(&mut keys_and_unpacked)?; db.set_entity(entity, event_id, block_timestamp).await?; diff --git a/crates/torii/core/src/processors/store_update_member.rs b/crates/torii/core/src/processors/store_update_member.rs index a7ddcab839..ef17b6f321 100644 --- a/crates/torii/core/src/processors/store_update_member.rs +++ b/crates/torii/core/src/processors/store_update_member.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Error, Result}; use async_trait::async_trait; -use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::naming; use dojo_world::contracts::world::WorldContractReader; use num_traits::ToPrimitive; @@ -57,7 +56,7 @@ where let member_selector = event.data[MEMBER_INDEX]; let model = db.model(selector).await?; - let schema = model.schema().await?; + let schema = model.schema; let mut member = schema .as_struct() @@ -73,7 +72,7 @@ where info!( target: LOG_TARGET, - name = %model.name(), + name = %model.name, entity_id = format!("{:#x}", entity_id), member = %member.name, "Store update member.", @@ -86,7 +85,7 @@ where // Skip the length to only get the values as they will be deserialized. let mut values = event.data[values_start + 1..=values_end].to_vec(); - let tag = naming::get_tag(model.namespace(), model.name()); + let tag = naming::get_tag(&model.namespace, &model.name); if !db.does_entity_exist(tag.clone(), entity_id).await? { warn!( diff --git a/crates/torii/core/src/processors/store_update_record.rs b/crates/torii/core/src/processors/store_update_record.rs index 9fdc3a03c7..5dd2309c14 100644 --- a/crates/torii/core/src/processors/store_update_record.rs +++ b/crates/torii/core/src/processors/store_update_record.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Error, Ok, Result}; use async_trait::async_trait; -use dojo_world::contracts::model::ModelReader; use dojo_world::contracts::naming; use dojo_world::contracts::world::WorldContractReader; use num_traits::ToPrimitive; @@ -56,7 +55,7 @@ where info!( target: LOG_TARGET, - name = %model.name(), + name = %model.name, entity_id = format!("{:#x}", entity_id), "Store update record.", ); @@ -68,14 +67,14 @@ where // Skip the length to only get the values as they will be deserialized. let values = event.data[values_start + 1..=values_end].to_vec(); - let tag = naming::get_tag(model.namespace(), model.name()); + let tag = naming::get_tag(&model.namespace, &model.name); // Keys are read from the db, since we don't have access to them when only // the entity id is passed. let keys = db.get_entity_keys(entity_id, &tag).await?; let mut keys_and_unpacked = [keys, values].concat(); - let mut entity = model.schema().await?; + let mut entity = model.schema; entity.deserialize(&mut keys_and_unpacked)?; db.set_entity(entity, event_id, block_timestamp).await?; diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 026d4dde1a..49284a52e3 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::str::FromStr; +use std::sync::Arc; use anyhow::{anyhow, Result}; use chrono::Utc; @@ -13,7 +14,7 @@ use sqlx::{Pool, Row, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; -use crate::model::ModelSQLReader; +use crate::cache::{Model, ModelCache}; use crate::query_queue::{Argument, QueryQueue}; use crate::simple_broker::SimpleBroker; use crate::types::{ @@ -37,6 +38,7 @@ pub struct Sql { world_address: Felt, pub pool: Pool, query_queue: QueryQueue, + model_cache: Arc, } impl Sql { @@ -54,7 +56,12 @@ impl Sql { query_queue.execute_all().await?; - Ok(Self { pool, world_address, query_queue }) + Ok(Self { + pool: pool.clone(), + world_address, + query_queue, + model_cache: Arc::new(ModelCache::new(pool)), + }) } pub async fn head(&self) -> Result<(u64, Option)> { @@ -411,13 +418,8 @@ impl Sql { Ok(()) } - pub async fn model(&self, selector: Felt) -> Result { - match ModelSQLReader::new(selector, self.pool.clone()).await { - Ok(reader) => Ok(reader), - Err(e) => { - Err(anyhow::anyhow!("Failed to get model from db for selector {selector:#x}: {e}")) - } - } + pub async fn model(&self, selector: Felt) -> Result { + self.model_cache.model(&selector).await.map_err(|e| e.into()) } /// Retrieves the keys definition for a given model. diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index d015d590b4..f833881939 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -161,8 +161,9 @@ impl DojoWorld { for model in models { let schema = self .model_cache - .schema(&Felt::from_str(&model.id).map_err(ParseError::FromStr)?) - .await?; + .model(&Felt::from_str(&model.id).map_err(ParseError::FromStr)?) + .await? + .schema; models_metadata.push(proto::types::ModelMetadata { namespace: model.namespace, name: model.name, @@ -292,7 +293,8 @@ impl DojoWorld { .map(Felt::from_str) .collect::>() .map_err(ParseError::FromStr)?; - let schemas = self.model_cache.schemas(&model_ids).await?; + let schemas = + self.model_cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let (entity_query, arrays_queries, _) = build_sql_query( &schemas, @@ -433,7 +435,8 @@ impl DojoWorld { .map(Felt::from_str) .collect::>() .map_err(ParseError::FromStr)?; - let schemas = self.model_cache.schemas(&model_ids).await?; + let schemas = + self.model_cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let (entity_query, arrays_queries, _) = build_sql_query( &schemas, @@ -525,7 +528,8 @@ impl DojoWorld { .map(Felt::from_str) .collect::, _>>() .map_err(ParseError::FromStr)?; - let schemas = self.model_cache.schemas(&model_ids).await?; + let schemas = + self.model_cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let table_name = member_clause.model; let column_name = format!("external_{}", member_clause.member); @@ -702,7 +706,8 @@ impl DojoWorld { .map(Felt::from_str) .collect::>() .map_err(ParseError::FromStr)?; - let schemas = self.model_cache.schemas(&model_ids).await?; + let schemas = + self.model_cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let (entity_query, arrays_queries, _) = build_sql_query( &schemas, @@ -735,33 +740,17 @@ impl DojoWorld { // selector let model = compute_selector_from_names(namespace, name); - let (name, class_hash, contract_address, packed_size, unpacked_size, layout): ( - String, - String, - String, - u32, - u32, - String, - ) = sqlx::query_as( - "SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \ - layout FROM models WHERE id = ?", - ) - .bind(format!("{:#x}", model)) - .fetch_one(&self.pool) - .await?; - - let schema = self.model_cache.schema(&model).await?; - let layout = layout.as_bytes().to_vec(); + let model = self.model_cache.model(&model).await?; Ok(proto::types::ModelMetadata { namespace: namespace.to_string(), - name, - layout, - class_hash, - contract_address, - packed_size, - unpacked_size, - schema: serde_json::to_vec(&schema).unwrap(), + name: name.to_string(), + class_hash: format!("{:#x}", model.class_hash), + contract_address: format!("{:#x}", model.contract_address), + packed_size: model.packed_size, + unpacked_size: model.unpacked_size, + layout: serde_json::to_vec(&model.layout).unwrap(), + schema: serde_json::to_vec(&model.schema).unwrap(), }) } diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 2f412e0c62..7c03d096a7 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -218,7 +218,7 @@ impl Service { .map(Felt::from_str) .collect::>() .map_err(ParseError::FromStr)?; - let schemas = cache.schemas(&model_ids).await?; + let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let (entity_query, arrays_queries, _) = build_sql_query( &schemas, diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index 71a708b49d..fabd5510aa 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -197,7 +197,7 @@ impl Service { .map(Felt::from_str) .collect::>() .map_err(ParseError::FromStr)?; - let schemas = cache.schemas(&model_ids).await?; + let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); let (entity_query, arrays_queries, _) = build_sql_query( &schemas, diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index f96aebe08b..43b6d4a8e0 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -32,8 +32,6 @@ use crate::errors::Error; mod events; -use dojo_world::contracts::model::ModelReader; - use crate::server::events::ServerEvent; use crate::typed_data::{parse_value_to_ty, PrimitiveType}; use crate::types::Message; @@ -455,11 +453,7 @@ async fn validate_message( .model(selector) .await .map_err(|e| Error::InvalidMessageError(format!("Model {} not found: {}", model, e)))? - .schema() - .await - .map_err(|e| { - Error::InvalidMessageError(format!("Failed to get schema for model {}: {}", model, e)) - })?; + .schema; if let Some(object) = message.get(model) { parse_value_to_ty(object, &mut ty)?;