diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 6ea55ce344..0d91e95ed3 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -284,6 +284,8 @@ impl Engine

{ Err(e) => { error!(target: LOG_TARGET, error = %e, "Processing fetched data."); erroring_out = true; + // incase of error rollback the transaction + self.db.rollback().await?; sleep(backoff_delay).await; if backoff_delay < max_backoff_delay { backoff_delay *= 2; diff --git a/crates/torii/core/src/executor/mod.rs b/crates/torii/core/src/executor/mod.rs index c823aa1255..18c76f6d64 100644 --- a/crates/torii/core/src/executor/mod.rs +++ b/crates/torii/core/src/executor/mod.rs @@ -116,6 +116,8 @@ pub enum QueryType { // similar to execute but doesn't create a new transaction Flush, Execute, + // rollback's the current transaction and starts a new one + Rollback, Other, } @@ -208,6 +210,19 @@ impl QueryMessage { rx, ) } + + pub fn rollback_recv() -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + ( + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Rollback, + tx: Some(tx), + }, + rx, + ) + } } impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { @@ -733,6 +748,20 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { // defer executing these queries since they depend on TokenRegister queries self.deferred_query_messages.push(query_message); } + QueryType::Rollback => { + debug!(target: LOG_TARGET, "Rolling back the transaction."); + // rollback's the current transaction and starts a new one + let res = self.rollback().await; + debug!(target: LOG_TARGET, "Rolled back the transaction."); + + if let Some(sender) = query_message.tx { + sender + .send(res) + .map_err(|_| anyhow::anyhow!("Failed to send rollback result"))?; + } else { + res?; + } + } QueryType::Other => { query.execute(&mut **tx).await.with_context(|| { format!( @@ -785,6 +814,16 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { Ok(()) } + + async fn rollback(&mut self) -> Result<()> { + let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); + transaction.rollback().await?; + + // NOTE: clear doesn't reset the capacity + self.publish_queue.clear(); + self.deferred_query_messages.clear(); + Ok(()) + } } fn send_broker_message(message: BrokerMessage) { diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index 11eedf6a3c..9c61405d99 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -1311,4 +1311,10 @@ impl Sql { self.executor.send(flush)?; recv.await? } + + pub async fn rollback(&self) -> Result<()> { + let (rollback, recv) = QueryMessage::rollback_recv(); + self.executor.send(rollback)?; + recv.await? + } }