Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-grpc): start rework to use 1 single query #2817

Merged
merged 23 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/torii/core/src/sql/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl ModelCache {
}

pub async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, Error> {
if selectors.is_empty() {
return Ok(self.model_cache.read().await.values().cloned().collect());
}

let mut schemas = Vec::with_capacity(selectors.len());
for selector in selectors {
schemas.push(self.model(selector).await?);
Expand Down
126 changes: 54 additions & 72 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,95 +464,77 @@ impl DojoWorld {
}
};

// count query that matches filter_ids
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
{where_clause}
"#
);

// total count of rows without limit and offset
let mut count_query = sqlx::query_scalar(&count_query);
if let Some(hashed_keys) = &hashed_keys {
for key in &hashed_keys.hashed_keys {
let key = Felt::from_bytes_be_slice(key);
count_query = count_query.bind(format!("{:#x}", key));
if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
{where_clause}
"#
);
let total_count = sqlx::query_scalar(&count_query)
.bind(entity_updated_after.clone())
.fetch_one(&self.pool)
.await?;
if total_count == 0 {
return Ok((Vec::new(), 0));
}
}

if let Some(entity_updated_after) = entity_updated_after.clone() {
count_query = count_query.bind(entity_updated_after);
}
let total_count = count_query.fetch_optional(&self.pool).await?.unwrap_or(0);
if total_count == 0 {
return Ok((Vec::new(), 0));
let entities =
self.fetch_historical_event_messages(&format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
), None, limit, offset).await?;
return Ok((entities, total_count));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Conditional binding of entity_updated_after.

In lines 476-477, entity_updated_after.clone() is bound to the count_query without checking if it's Some. If entity_updated_after is None, this could lead to unintended SQL queries or errors. Ensure that you bind entity_updated_after only when it's present.

Apply this diff to conditionally bind entity_updated_after:

 let total_count = sqlx::query_scalar(&count_query)
-    .bind(entity_updated_after.clone())
+    .apply(|query| {
+        if let Some(updated_after) = &entity_updated_after {
+            query.bind(updated_after.clone())
+        } else {
+            query
+        }
+    })
     .fetch_one(&self.pool)
     .await?;

This ensures that entity_updated_after is only bound when it's available.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
{where_clause}
"#
);
let total_count = sqlx::query_scalar(&count_query)
.bind(entity_updated_after.clone())
.fetch_one(&self.pool)
.await?;
if total_count == 0 {
return Ok((Vec::new(), 0));
}
}
if let Some(entity_updated_after) = entity_updated_after.clone() {
count_query = count_query.bind(entity_updated_after);
}
let total_count = count_query.fetch_optional(&self.pool).await?.unwrap_or(0);
if total_count == 0 {
return Ok((Vec::new(), 0));
let entities =
self.fetch_historical_event_messages(&format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
), None, limit, offset).await?;
return Ok((entities, total_count));
if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
{where_clause}
"#
);
let total_count = sqlx::query_scalar(&count_query)
.apply(|query| {
if let Some(updated_after) = &entity_updated_after {
query.bind(updated_after.clone())
} else {
query
}
})
.fetch_one(&self.pool)
.await?;
if total_count == 0 {
return Ok((Vec::new(), 0));
}
let entities =
self.fetch_historical_event_messages(&format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
), None, limit, offset).await?;
return Ok((entities, total_count));

}

// Query to get entity IDs and their model IDs
let mut query = if table == EVENT_MESSAGES_HISTORICAL_TABLE {
format!(
r#"
SELECT {table}.id, {table}.data, {table}.model_id, group_concat({model_relation_table}.model_id) as model_ids
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.event_id
ORDER BY {table}.event_id DESC
"#
)
} else {
format!(
r#"
SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids
let count_query = format!(
r#"
SELECT count(*)
FROM {table}
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id
{where_clause}
GROUP BY {table}.id
ORDER BY {table}.event_id DESC
"#
)
};

if limit.is_some() {
query += " LIMIT ?"
}

if offset.is_some() {
query += " OFFSET ?"
}

if table == EVENT_MESSAGES_HISTORICAL_TABLE {
let entities =
self.fetch_historical_event_messages(&query, None, limit, offset).await?;
return Ok((entities, total_count));
}
"#
);

let mut query = sqlx::query_as(&query);
// retrieve all schemas
let schemas = self
.model_cache
.models(&[])
.await?
.iter()
.map(|m| m.schema.clone())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Performance concern when loading all models.

In lines 505-511, calling self.model_cache.models(&[]).await? with an empty slice loads all models from the cache. If the cache contains many models, this could impact performance.

Consider specifying only the required model selectors or implementing pagination to limit the number of models loaded.

.collect::<Vec<_>>();
let (query, count_query) = build_sql_query(
&schemas,
table,
entity_relation_column,
Some(&where_clause),
order_by,
limit,
offset,
)?;
let query = sqlx::query(&query);
if let Some(hashed_keys) = hashed_keys {
for key in hashed_keys.hashed_keys {
let key = Felt::from_bytes_be_slice(&key);
query = query.bind(format!("{:#x}", key));
}
}

if let Some(entity_updated_after) = entity_updated_after.clone() {
query = query.bind(entity_updated_after);
}
query = query.bind(limit).bind(offset);
let db_entities: Vec<(String, String)> = query.fetch_all(&self.pool).await?;
let entities = query.fetch_all(&self.pool).await?;
let entities = entities
.iter()
.map(|row| map_row_to_entity(row, &schemas, dont_include_hashed_keys))
.collect::<Result<Vec<_>, Error>>()?;

let entities = self
.fetch_entities(
table,
entity_relation_column,
db_entities,
dont_include_hashed_keys,
order_by,
entity_models,
)
.await?;
Ok((entities, total_count))
}

Expand Down
Loading