Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-core): enqueue models & events #2471

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct DeleteEntityQuery {
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(DeleteEntityQuery),
EventMessage(Ty),
RegisterModel,
StoreEvent,
Other,
}

Expand Down Expand Up @@ -137,6 +140,29 @@ impl QueryQueue {
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::RegisterModel => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let model_registered = ModelRegistered::from_row(&row)?;
self.push_publish(BrokerMessage::ModelRegistered(model_registered));
}
QueryType::EventMessage(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let mut event_message = EventMessageUpdated::from_row(&row)?;
event_message.updated_model = Some(entity);
let broker_message = BrokerMessage::EventMessageUpdated(event_message);
self.push_publish(broker_message);
}
QueryType::StoreEvent => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let event = EventEmitted::from_row(&row)?;
self.push_publish(BrokerMessage::EventEmitted(event));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to reduce code duplication in query execution and error handling.

Sensei, the QueryType::RegisterModel, QueryType::EventMessage, and QueryType::StoreEvent variants share similar logic for executing a query, handling errors, fetching a row, and publishing a broker message. Refactoring this repeated code into a helper function would enhance maintainability and reduce redundancy.

Here's how you might implement the refactor:

  1. Create a helper function for executing queries and publishing messages:

    async fn execute_query_and_publish<T, F>(
        &mut self,
        query: sqlx::Query<'_, sqlx::Sqlite, sqlx::SqliteArguments<'_>>,
        tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
        statement: &str,
        arguments: &Vec<Argument>,
        map_row: F,
    ) -> Result<()>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> + Send,
        F: Fn(T) -> BrokerMessage,
    {
        let row = query.fetch_one(tx).await.with_context(|| {
            format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
        })?;
        let item = T::from_row(&row)?;
        self.push_publish(map_row(item));
        Ok(())
    }
  2. Use the helper function in each variant:

    QueryType::RegisterModel => {
        self.execute_query_and_publish(
            query,
            &mut tx,
            &statement,
            &arguments,
            |model_registered| BrokerMessage::ModelRegistered(model_registered),
        ).await?;
    }
    QueryType::EventMessage(entity) => {
        self.execute_query_and_publish(
            query,
            &mut tx,
            &statement,
            &arguments,
            move |mut event_message| {
                event_message.updated_model = Some(entity.clone());
                BrokerMessage::EventMessageUpdated(event_message)
            },
        ).await?;
    }
    QueryType::StoreEvent => {
        self.execute_query_and_publish(
            query,
            &mut tx,
            &statement,
            &arguments,
            |event| BrokerMessage::EventEmitted(event),
        ).await?;
    }

This refactor reduces duplication and makes the code more modular and easier to maintain.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper handling when no rows are returned in StoreEvent.

Sensei, in the QueryType::StoreEvent variant, if fetch_one does not find a matching event, it will result in an error. Consider using fetch_optional and handling the case where no event is found to prevent potential runtime errors.

Here is a suggested change to handle this scenario:

 let row = query.fetch_one(&mut *tx).await.with_context(|| {
     format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
 })?;
 let event = EventEmitted::from_row(&row)?;
 self.push_publish(BrokerMessage::EventEmitted(event));

Apply this diff:

-let row = query.fetch_one(&mut *tx).await.with_context(|| {
+let row = query.fetch_optional(&mut *tx).await.with_context(|| {
     format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
 })?;
-let event = EventEmitted::from_row(&row)?;
-self.push_publish(BrokerMessage::EventEmitted(event));
+if let Some(row) = row {
+    let event = EventEmitted::from_row(&row)?;
+    self.push_publish(BrokerMessage::EventEmitted(event));
+} else {
+    // Optionally handle the case where no event is found
+    // For example, log a warning or take alternative action
+}

This change ensures that your code gracefully handles situations where the query returns no results.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
QueryType::StoreEvent => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let event = EventEmitted::from_row(&row)?;
self.push_publish(BrokerMessage::EventEmitted(event));
}
QueryType::StoreEvent => {
let row = query.fetch_optional(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
if let Some(row) = row {
let event = EventEmitted::from_row(&row)?;
self.push_publish(BrokerMessage::EventEmitted(event));
} else {
// Optionally handle the case where no event is found
// For example, log a warning or take alternative action
}
}

QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down
70 changes: 27 additions & 43 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_world::contracts::abi::model::Layout;
Expand All @@ -16,11 +15,8 @@ use starknet_crypto::poseidon_hash_many;
use tracing::{debug, warn};

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};
use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType};
use crate::utils::utc_dt_string_from_timestamp;

type IsEventMessage = bool;
type IsStoreUpdate = bool;
Expand Down Expand Up @@ -173,19 +169,20 @@ impl Sql {
class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \
packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \
executed_at=EXCLUDED.executed_at RETURNING *";
let model_registered: ModelRegistered = sqlx::query_as(insert_models)
// this is temporary until the model hash is precomputed
.bind(format!("{:#x}", selector))
.bind(namespace)
.bind(model.name())
.bind(format!("{class_hash:#x}"))
.bind(format!("{contract_address:#x}"))
.bind(serde_json::to_string(&layout)?)
.bind(packed_size)
.bind(unpacked_size)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

let arguments = vec![
Argument::String(format!("{:#x}", selector)),
Argument::String(namespace.to_string()),
Argument::String(model.name().to_string()),
Argument::String(format!("{class_hash:#x}")),
Argument::String(format!("{contract_address:#x}")),
Argument::String(serde_json::to_string(&layout)?),
Argument::Int(packed_size as i64),
Argument::Int(unpacked_size as i64),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel);

let mut model_idx = 0_i64;
self.build_register_queries_recursive(
Expand Down Expand Up @@ -220,7 +217,6 @@ impl Sql {
},
)
.await;
self.query_queue.push_publish(BrokerMessage::ModelRegistered(model_registered));

Ok(())
}
Expand Down Expand Up @@ -316,15 +312,16 @@ impl Sql {
VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let mut event_message_updated: EventMessageUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;

event_message_updated.updated_model = Some(entity.clone());
self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::Other,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this should be QueryType::EntityMessage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
Expand All @@ -336,8 +333,6 @@ impl Sql {
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EventMessageUpdated(event_message_updated));

Ok(())
}

Expand Down Expand Up @@ -504,19 +499,8 @@ impl Sql {
"INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
(?, ?, ?, ?, ?)",
vec![id, keys, data, hash, executed_at],
QueryType::Other,
QueryType::StoreEvent,
);

let emitted = EventEmitted {
id: event_id.to_string(),
keys: felts_sql_string(&event.keys),
data: felts_sql_string(&event.data),
transaction_hash: format!("{:#x}", transaction_hash),
created_at: Utc::now(),
executed_at: must_utc_datetime_from_timestamp(block_timestamp),
};

self.query_queue.push_publish(BrokerMessage::EventEmitted(emitted));
}

#[allow(clippy::too_many_arguments)]
Expand Down
Loading