Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-grpc): start rework to use 1 single query #2817

Merged
merged 23 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ pub fn build_sql_query(
entity_relation_column: &str,
where_clause: Option<&str>,
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<(String, String), Error> {
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Expand Down Expand Up @@ -205,13 +203,7 @@ pub fn build_sql_query(
query += &format!(" ORDER BY {}.event_id DESC", table_name);
}

if let Some(limit) = limit {
query += &format!(" LIMIT {}", limit);
}

if let Some(offset) = offset {
query += &format!(" OFFSET {}", offset);
}
query += " LIMIT ? OFFSET ?";

Ok((query, count_query))
}
Expand Down
4 changes: 4 additions & 0 deletions crates/torii/core/src/sql/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl ModelCache {
}

pub async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, Error> {
if selectors.is_empty() {
return Ok(self.model_cache.read().await.values().cloned().collect());
}

let mut schemas = Vec::with_capacity(selectors.len());
for selector in selectors {
schemas.push(self.model(selector).await?);
Expand Down
157 changes: 72 additions & 85 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use crypto_bigint::rand_core::le;
use dojo_types::naming::compute_selector_from_tag;
use dojo_types::primitive::{Primitive, PrimitiveError};
use dojo_types::schema::Ty;
Expand Down Expand Up @@ -388,20 +389,16 @@ impl DojoWorld {
async fn fetch_historical_event_messages(
&self,
query: &str,
keys_pattern: Option<&str>,
bind_values: Vec<String>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<proto::types::Entity>, Error> {
let db_entities: Vec<(String, String, String, String)> = if keys_pattern.is_some() {
sqlx::query_as(query)
.bind(keys_pattern.unwrap())
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?
} else {
sqlx::query_as(query).bind(limit).bind(offset).fetch_all(&self.pool).await?
};
let mut query = sqlx::query_as(query);
for value in bind_values {
query = query.bind(value);
}
let db_entities: Vec<(String, String, String, String)> =
query.bind(limit).bind(offset).fetch_all(&self.pool).await?;
Comment on lines +267 to +276
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Verify the parameter binding order in fetch_historical_event_messages

Ensure that the bind_values, limit, and offset parameters are bound in the correct order to match the placeholders in your SQL query. Misalignment may lead to unexpected behavior or SQL errors.


let mut entities = HashMap::new();
for (id, data, model_id, _) in db_entities {
Expand Down Expand Up @@ -463,96 +460,86 @@ impl DojoWorld {
}
}
};

// count query that matches filter_ids
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
{where_clause}
"#
);

// total count of rows without limit and offset
let mut count_query = sqlx::query_scalar(&count_query);
if let Some(hashed_keys) = &hashed_keys {
for key in &hashed_keys.hashed_keys {
let key = Felt::from_bytes_be_slice(key);
count_query = count_query.bind(format!("{:#x}", key));
}
let mut bind_values = vec![];
if let Some(hashed_keys) = hashed_keys {
bind_values = hashed_keys
.hashed_keys
.iter()
.map(|key| format!("{:#x}", Felt::from_bytes_be_slice(key)))
.collect::<Vec<_>>()
}

if let Some(entity_updated_after) = entity_updated_after.clone() {
count_query = count_query.bind(entity_updated_after);
bind_values.push(entity_updated_after);
}
let total_count = count_query.fetch_optional(&self.pool).await?.unwrap_or(0);
if total_count == 0 {
return Ok((Vec::new(), 0));

if let Some(limit) = limit {
bind_values.push(limit.to_string());
}
if let Some(offset) = offset {
bind_values.push(offset.to_string());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Ensure correct data types when binding limit and offset

Currently, limit and offset are converted to strings before being added to bind_values. When binding these values to the query, ensure they are bound as integers to match the expected data types in the SQL query and prevent potential type mismatch issues.

}

// Query to get entity IDs and their model IDs
let mut query = if table == EVENT_MESSAGES_HISTORICAL_TABLE {
format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
)
} else {
format!(
r#"
SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.id
ORDER BY {table}.event_id DESC
"#
)
};

if limit.is_some() {
query += " LIMIT ?"
"#
);
let mut count_query = sqlx::query_scalar(&count_query);
for value in &bind_values {
count_query = count_query.bind(value);
}

if offset.is_some() {
query += " OFFSET ?"
let total_count = count_query.fetch_one(&self.pool).await?;
if total_count == 0 {
return Ok((Vec::new(), 0));
}

if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let entities =
self.fetch_historical_event_messages(&query, None, limit, offset).await?;
self.fetch_historical_event_messages(&format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
), bind_values, limit, offset).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Consider refactoring for readability and performance.

The logic that constructs a separate count query and then conditionally fetches historical event messages is lengthy. Splitting it into smaller helper functions could improve maintainability and testability.

return Ok((entities, total_count));
}

let mut query = sqlx::query_as(&query);
if let Some(hashed_keys) = hashed_keys {
for key in hashed_keys.hashed_keys {
let key = Felt::from_bytes_be_slice(&key);
query = query.bind(format!("{:#x}", key));
}
}

if let Some(entity_updated_after) = entity_updated_after.clone() {
query = query.bind(entity_updated_after);
// retrieve all schemas
let schemas = self
.model_cache
.models(&[])
.await?
.iter()
.map(|m| m.schema.clone())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Performance concern when loading all models.

In lines 505-511, calling self.model_cache.models(&[]).await? with an empty slice loads all models from the cache. If the cache contains many models, this could impact performance.

Consider specifying only the required model selectors or implementing pagination to limit the number of models loaded.

.collect::<Vec<_>>();
let (query, _) = build_sql_query(
&schemas,
table,
entity_relation_column,
if where_clause.is_empty() {
None
} else {
Some(&where_clause)
},
order_by,
)?;
println!("query: {}", query);
let mut query = sqlx::query(&query);
for value in &bind_values {
query = query.bind(value);
}
query = query.bind(limit).bind(offset);
let db_entities: Vec<(String, String)> = query.fetch_all(&self.pool).await?;
let entities = query.fetch_all(&self.pool).await?;
let entities = entities
.iter()
.map(|row| map_row_to_entity(row, &schemas, dont_include_hashed_keys))
.collect::<Result<Vec<_>, Error>>()?;

let entities = self
.fetch_entities(
table,
entity_relation_column,
db_entities,
dont_include_hashed_keys,
order_by,
entity_models,
)
.await?;
Ok((entities, total_count))
}

Expand Down Expand Up @@ -702,7 +689,7 @@ impl DojoWorld {

if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let entities = self
.fetch_historical_event_messages(&models_query, Some(&keys_pattern), limit, offset)
.fetch_historical_event_messages(&models_query, vec![keys_pattern], limit, offset)
.await?;
return Ok((entities, total_count));
}
Expand Down
Loading