-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii-core): move off queryqueue for executing tx #2460
Changes from 12 commits
01ce338
e0ec767
6097a60
043f669
9d7d0e7
9314438
f9a136f
b883343
7771fdf
60c9069
cd52f0f
045eed0
045e4ae
388ba1e
b7acef5
b94ad7a
c13ff59
7fc27d5
260845c
a7e4f1f
8cf4452
2bcf226
3242ac4
ef3e4ba
299c0b9
e4404f1
663234a
c998428
994abc5
65612fa
13b1ba7
0c31327
afa2a0a
ef9fafc
4ec379c
d393896
1730bfc
b708081
7758cf9
607cd06
d48dd30
6d4b99f
baf7f35
c4f288a
5dac220
28633b4
fd3c377
4cabea5
ee86042
43246b6
706c7fb
9c9e0a3
6b6f5a6
61f0a4b
63cca75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
use std::collections::VecDeque; | ||
use std::mem; | ||
|
||
use anyhow::{Context, Result}; | ||
use dojo_types::schema::{Struct, Ty}; | ||
use sqlx::query::Query; | ||
use sqlx::sqlite::SqliteArguments; | ||
use sqlx::{FromRow, Pool, Sqlite, Transaction}; | ||
use starknet::core::types::Felt; | ||
use tokio::sync::broadcast::{Receiver, Sender}; | ||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; | ||
|
||
use crate::simple_broker::SimpleBroker; | ||
use crate::types::{ | ||
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, | ||
Model as ModelRegistered, | ||
}; | ||
|
||
#[derive(Debug, Clone)] | ||
pub enum Argument { | ||
Null, | ||
Int(i64), | ||
Bool(bool), | ||
String(String), | ||
FieldElement(Felt), | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub enum BrokerMessage { | ||
ModelRegistered(ModelRegistered), | ||
EntityUpdated(EntityUpdated), | ||
EventMessageUpdated(EventMessageUpdated), | ||
EventEmitted(EventEmitted), | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct DeleteEntityQuery { | ||
pub entity_id: String, | ||
pub event_id: String, | ||
pub block_timestamp: String, | ||
pub ty: Ty, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub enum QueryType { | ||
SetEntity(Ty), | ||
DeleteEntity(DeleteEntityQuery), | ||
EventMessage(Ty), | ||
RegisterModel, | ||
StoreEvent, | ||
Execute, | ||
Other, | ||
} | ||
|
||
pub struct Executor<'c> { | ||
pool: Pool<Sqlite>, | ||
transaction: Transaction<'c, Sqlite>, | ||
publish_queue: VecDeque<BrokerMessage>, | ||
rx: UnboundedReceiver<QueryMessage>, | ||
shutdown_rx: Receiver<()>, | ||
} | ||
|
||
pub struct QueryMessage { | ||
pub statement: String, | ||
pub arguments: Vec<Argument>, | ||
pub query_type: QueryType, | ||
} | ||
|
||
impl QueryMessage { | ||
pub fn new(statement: String, arguments: Vec<Argument>, query_type: QueryType) -> Self { | ||
Self { statement, arguments, query_type } | ||
} | ||
|
||
pub fn other(statement: String, arguments: Vec<Argument>) -> Self { | ||
Self { statement, arguments, query_type: QueryType::Other } | ||
} | ||
|
||
pub fn execute() -> Self { | ||
Self { statement: "".to_string(), arguments: vec![], query_type: QueryType::Execute } | ||
} | ||
} | ||
|
||
impl<'c> Executor<'c> { | ||
pub async fn new( | ||
pool: Pool<Sqlite>, | ||
shutdown_tx: Sender<()>, | ||
) -> Result<(Self, UnboundedSender<QueryMessage>)> { | ||
let (tx, rx) = unbounded_channel(); | ||
lambda-0x marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let transaction = pool.begin().await?; | ||
let publish_queue = VecDeque::new(); | ||
let shutdown_rx = shutdown_tx.subscribe(); | ||
|
||
Ok((Executor { pool, transaction, publish_queue, rx, shutdown_rx }, tx)) | ||
} | ||
|
||
pub async fn run(&mut self) -> Result<()> { | ||
loop { | ||
tokio::select! { | ||
_ = self.shutdown_rx.recv() => { | ||
break Ok(()); | ||
} | ||
Some(msg) = self.rx.recv() => { | ||
let QueryMessage { statement, arguments, query_type } = msg; | ||
let mut query = sqlx::query(&statement); | ||
|
||
for arg in &arguments { | ||
query = match arg { | ||
Argument::Null => query.bind(None::<String>), | ||
Argument::Int(integer) => query.bind(integer), | ||
Argument::Bool(bool) => query.bind(bool), | ||
Argument::String(string) => query.bind(string), | ||
Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), | ||
} | ||
} | ||
|
||
self.handle_query_type(query, query_type, &statement, &arguments).await?; | ||
} | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider improving error handling in the transaction lifecycle. Ohayo, sensei! While the overall implementation is solid, there's room for improvement in error handling, particularly in the transaction lifecycle. Currently, if an error occurs during query execution, the transaction may remain open, potentially leading to database inconsistencies. Consider wrapping the main loop in a pub async fn run(&mut self) -> Result<()> {
loop {
match self.process_messages().await {
Ok(should_break) => {
if should_break {
break;
}
}
Err(e) => {
// Rollback the transaction on error
self.transaction.rollback().await?;
// Start a new transaction for the next iteration
self.transaction = self.pool.begin().await?;
// Log the error or handle it as appropriate
eprintln!("Error processing messages: {:?}", e);
}
}
}
Ok(())
}
async fn process_messages(&mut self) -> Result<bool> {
tokio::select! {
_ = self.shutdown_rx.recv() => {
return Ok(true);
}
Some(msg) = self.rx.recv() => {
let QueryMessage { statement, arguments, query_type } = msg;
// ... (rest of the existing code)
self.handle_query_type(query, query_type, &statement, &arguments).await?;
}
}
Ok(false)
} This approach ensures that the transaction is rolled back in case of errors, maintaining database consistency. |
||
|
||
async fn handle_query_type<'a>( | ||
&mut self, | ||
query: Query<'a, Sqlite, SqliteArguments<'a>>, | ||
query_type: QueryType, | ||
statement: &str, | ||
arguments: &[Argument], | ||
) -> Result<()> { | ||
let tx = &mut self.transaction; | ||
|
||
match query_type { | ||
QueryType::SetEntity(entity) => { | ||
let row = query.fetch_one(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
let mut entity_updated = EntityUpdated::from_row(&row)?; | ||
entity_updated.updated_model = Some(entity); | ||
entity_updated.deleted = false; | ||
let broker_message = BrokerMessage::EntityUpdated(entity_updated); | ||
self.publish_queue.push_back(broker_message); | ||
} | ||
QueryType::DeleteEntity(entity) => { | ||
let delete_model = query.execute(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
if delete_model.rows_affected() == 0 { | ||
return Ok(()); | ||
} | ||
|
||
let row = sqlx::query( | ||
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? \ | ||
WHERE id = ? RETURNING *", | ||
) | ||
.bind(entity.block_timestamp) | ||
.bind(entity.event_id) | ||
.bind(entity.entity_id) | ||
.fetch_one(&mut **tx) | ||
.await?; | ||
let mut entity_updated = EntityUpdated::from_row(&row)?; | ||
entity_updated.updated_model = | ||
Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); | ||
|
||
let count = sqlx::query_scalar::<_, i64>( | ||
"SELECT count(*) FROM entity_model WHERE entity_id = ?", | ||
) | ||
.bind(entity_updated.id.clone()) | ||
.fetch_one(&mut **tx) | ||
.await?; | ||
|
||
// Delete entity if all of its models are deleted | ||
if count == 0 { | ||
sqlx::query("DELETE FROM entities WHERE id = ?") | ||
.bind(entity_updated.id.clone()) | ||
.execute(&mut **tx) | ||
.await?; | ||
entity_updated.deleted = true; | ||
} | ||
|
||
let broker_message = BrokerMessage::EntityUpdated(entity_updated); | ||
self.publish_queue.push_back(broker_message); | ||
} | ||
QueryType::RegisterModel => { | ||
let row = query.fetch_one(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
let model_registered = ModelRegistered::from_row(&row)?; | ||
self.publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered)); | ||
} | ||
QueryType::EventMessage(entity) => { | ||
let row = query.fetch_one(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
let mut event_message = EventMessageUpdated::from_row(&row)?; | ||
event_message.updated_model = Some(entity); | ||
let broker_message = BrokerMessage::EventMessageUpdated(event_message); | ||
self.publish_queue.push_back(broker_message); | ||
} | ||
QueryType::StoreEvent => { | ||
let row = query.fetch_one(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
let event = EventEmitted::from_row(&row)?; | ||
self.publish_queue.push_back(BrokerMessage::EventEmitted(event)); | ||
} | ||
QueryType::Execute => { | ||
self.execute().await?; | ||
} | ||
QueryType::Other => { | ||
query.execute(&mut **tx).await.with_context(|| { | ||
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) | ||
})?; | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn execute(&mut self) -> Result<()> { | ||
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); | ||
transaction.commit().await?; | ||
|
||
while let Some(message) = self.publish_queue.pop_front() { | ||
send_broker_message(message); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider asynchronous message publishing for improved performance. Ohayo, sensei! To potentially improve performance, especially under heavy load, consider publishing broker messages asynchronously. This approach can prevent blocking the main execution flow while messages are being published. Here's a suggestion for modifying the loop: use futures::future::join_all;
// ...
let publish_futures: Vec<_> = self.publish_queue
.drain(..)
.map(|message| {
tokio::spawn(async move {
send_broker_message(message);
})
})
.collect();
join_all(publish_futures).await; This modification spawns a new task for each message, allowing them to be published concurrently. The
Larkooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
fn send_broker_message(message: BrokerMessage) { | ||
match message { | ||
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), | ||
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), | ||
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), | ||
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,7 @@ where | |
uri = %uri_str, | ||
"Resource metadata set." | ||
); | ||
db.set_metadata(resource, &uri_str, block_timestamp); | ||
db.set_metadata(resource, &uri_str, block_timestamp)?; | ||
|
||
let db = db.clone(); | ||
let resource = *resource; | ||
|
@@ -83,9 +83,7 @@ where | |
async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) { | ||
match metadata(uri_str.clone()).await { | ||
Ok((metadata, icon_img, cover_img)) => { | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img) | ||
.await | ||
.unwrap(); | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! We might want to reconsider this change. The modification to
Consider the following improvements: match db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).await {
Ok(_) => info!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
"Updated resource metadata from ipfs."
),
Err(e) => error!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
error = %e,
"Failed to update resource metadata from ipfs."
),
} This approach maintains the asynchronous nature of the call and provides proper error handling, logging any issues that might occur during the update process. |
||
info!( | ||
target: LOG_TARGET, | ||
resource = %format!("{:#x}", resource), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensei, handle errors from joined tasks in the
JoinSet
The loop
while let Some(_) = set.join_next().await {}
ignores the results of the tasks. By not handling theResult
fromjoin_next()
, any errors occurring in the spawned tasks may be missed. Consider capturing and handling these results to ensure that any task failures are properly addressed.