Skip to content

Commit

Permalink
fix(torii/core): rollback transaction when engine retries
Browse files Browse the repository at this point in the history
commit-id:16f58fa9
  • Loading branch information
lambda-0x committed Nov 3, 2024
1 parent 7068232 commit 0f3f2ba
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
Err(e) => {
error!(target: LOG_TARGET, error = %e, "Processing fetched data.");
erroring_out = true;
// incase of error rollback the transaction
self.db.rollback().await?;
sleep(backoff_delay).await;
if backoff_delay < max_backoff_delay {
backoff_delay *= 2;
Expand Down
39 changes: 39 additions & 0 deletions crates/torii/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub enum QueryType {
// similar to execute but doesn't create a new transaction
Flush,
Execute,
// rollback's the current transaction and starts a new one
Rollback,
Other,
}

Expand Down Expand Up @@ -207,6 +209,19 @@ impl QueryMessage {
rx,
)
}

pub fn rollback_recv() -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(
Self {
statement: "".to_string(),
arguments: vec![],
query_type: QueryType::Rollback,
tx: Some(tx),
},
rx,
)
}

Check warning on line 224 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L213-L224

Added lines #L213 - L224 were not covered by tests
}

impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
Expand Down Expand Up @@ -730,6 +745,20 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
// defer executing these queries since they depend on TokenRegister queries
self.deferred_query_messages.push(query_message);
}
QueryType::Rollback => {
debug!(target: LOG_TARGET, "Rolling back the transaction.");

Check warning on line 749 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L749

Added line #L749 was not covered by tests
// rollback's the current transaction and starts a new one
let res = self.rollback().await;
debug!(target: LOG_TARGET, "Rolled back the transaction.");

Check warning on line 752 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L751-L752

Added lines #L751 - L752 were not covered by tests

if let Some(sender) = query_message.tx {
sender
.send(res)
.map_err(|_| anyhow::anyhow!("Failed to send rollback result"))?;

Check warning on line 757 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L754-L757

Added lines #L754 - L757 were not covered by tests
} else {
res?;

Check warning on line 759 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L759

Added line #L759 was not covered by tests
}
}
QueryType::Other => {
query.execute(&mut **tx).await.with_context(|| {
format!(
Expand Down Expand Up @@ -782,6 +811,16 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {

Ok(())
}

async fn rollback(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.rollback().await?;

Check warning on line 817 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L815-L817

Added lines #L815 - L817 were not covered by tests

// NOTE: clear doesn't reset the capacity
self.publish_queue.clear();
self.deferred_query_messages.clear();
Ok(())
}

Check warning on line 823 in crates/torii/core/src/executor/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/executor/mod.rs#L820-L823

Added lines #L820 - L823 were not covered by tests
}

fn send_broker_message(message: BrokerMessage) {
Expand Down
6 changes: 6 additions & 0 deletions crates/torii/core/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1268,4 +1268,10 @@ impl Sql {
self.executor.send(flush)?;
recv.await?
}

pub async fn rollback(&self) -> Result<()> {
let (rollback, recv) = QueryMessage::rollback_recv();
self.executor.send(rollback)?;
recv.await?
}

Check warning on line 1276 in crates/torii/core/src/sql/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql/mod.rs#L1272-L1276

Added lines #L1272 - L1276 were not covered by tests
}

0 comments on commit 0f3f2ba

Please sign in to comment.