Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Persist tx execution state (#44)
Browse files Browse the repository at this point in the history
In order to avoid re-executing old transactions on every restart,
maintain execution state.
  • Loading branch information
tuommaki authored Jan 21, 2024
1 parent 6004b2b commit c4ea88a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 3 deletions.
3 changes: 2 additions & 1 deletion crates/node/migrations/20231128120351_transactions-table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ CREATE TABLE transaction (
kind transaction_kind NOT NULL,
nonce NUMERIC NOT NULL,
signature VARCHAR(128) NOT NULL,
propagated BOOLEAN
propagated BOOLEAN,
executed BOOLEAN
);

CREATE TABLE deploy (
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl mempool::Storage for storage::Database {
}

async fn fill_deque(&self, deque: &mut std::collections::VecDeque<Transaction>) -> Result<()> {
for t in self.get_transactions().await? {
for t in self.get_unexecuted_transactions().await? {
deque.push_back(t);
}

Expand Down
7 changes: 7 additions & 0 deletions crates/node/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ impl TaskManager for Scheduler {
running_task.task_started.elapsed().as_secs()
);

if let Err(err) = self.database.mark_tx_executed(&running_task.task.tx).await {
tracing::error!(
"failed to update transaction.executed => true - tx.hash: {}",
&running_task.task.tx
);
}

let nonce = rand::thread_rng().next_u64();
let tx = match running_task.task.kind {
TaskKind::Proof => Transaction::new(
Expand Down
3 changes: 3 additions & 0 deletions crates/node/src/storage/database/entity/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Transaction {
pub nonce: sqlx::types::Decimal,
pub signature: Signature,
pub propagated: bool,
pub executed: bool,
}

impl From<&types::Transaction> for Transaction {
Expand All @@ -50,6 +51,7 @@ impl From<&types::Transaction> for Transaction {
nonce: value.nonce.into(),
signature: value.signature,
propagated: value.propagated,
executed: value.executed,
}
}
}
Expand All @@ -65,6 +67,7 @@ impl From<Transaction> for types::Transaction {
.expect("invalid nonce in db"),
signature: value.signature,
propagated: value.propagated,
executed: value.executed,
}
}
}
30 changes: 29 additions & 1 deletion crates/node/src/storage/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,19 +369,38 @@ impl Database {
Ok(txs)
}

pub async fn get_unexecuted_transactions(&self) -> Result<Vec<types::Transaction>> {
let mut db_tx = self.pool.begin().await?;
let refs: Vec<Hash> = sqlx::query("SELECT hash FROM transaction WHERE executed IS false")
.map(|row: sqlx::postgres::PgRow| row.get(0))
.fetch_all(&mut *db_tx)
.await?;

let mut txs = Vec::with_capacity(refs.len());
for tx_hash in refs {
let tx = self.find_transaction(&tx_hash).await?;
if let Some(tx) = tx {
txs.push(tx);
}
}

Ok(txs)
}

pub async fn add_transaction(&self, tx: &types::Transaction) -> Result<()> {
let entity = entity::Transaction::from(tx);

let mut db_tx = self.pool.begin().await?;

sqlx::query(
"INSERT INTO transaction ( author, hash, kind, nonce, signature, propagated ) VALUES ( $1, $2, $3, $4, $5, $6 ) ON CONFLICT (hash) DO UPDATE SET propagated = $6")
"INSERT INTO transaction ( author, hash, kind, nonce, signature, propagated, executed ) VALUES ( $1, $2, $3, $4, $5, $6, $7 ) ON CONFLICT (hash) DO UPDATE SET propagated = $6, executed = $7")
.bind(entity.author)
.bind(entity.hash)
.bind(entity.kind)
.bind(entity.nonce)
.bind(entity.signature)
.bind(entity.propagated)
.bind(entity.executed)
.execute(&mut *db_tx)
.await?;

Expand Down Expand Up @@ -499,6 +518,14 @@ impl Database {
db_tx.commit().await.map_err(|e| e.into())
}

pub async fn mark_tx_executed(&self, tx_hash: &Hash) -> Result<()> {
sqlx::query("UPDATE transaction SET executed = true WHERE id = $1")
.bind(tx_hash)
.execute(&self.pool)
.await?;
Ok(())
}

pub async fn acl_whitelist_has(&self, key: &entity::PublicKey) -> Result<bool> {
let res: Option<i32> = sqlx::query("SELECT 1 FROM acl_whitelist WHERE key = $1")
.bind(key)
Expand Down Expand Up @@ -593,6 +620,7 @@ mod tests {
nonce: 64,
signature: Signature::default(),
propagated: false,
executed: false,
};

database
Expand Down
6 changes: 6 additions & 0 deletions crates/node/src/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ pub struct Transaction {
pub signature: Signature,
#[serde(skip_serializing, skip_deserializing)]
pub propagated: bool,
#[serde(skip_serializing, skip_deserializing)]
pub executed: bool,
}

impl Default for Transaction {
Expand All @@ -252,6 +254,7 @@ impl Default for Transaction {
nonce: 0,
signature: Signature::default(),
propagated: false,
executed: false,
}
}
}
Expand All @@ -267,6 +270,7 @@ impl Transaction {
nonce: 0,
signature: Signature::default(),
propagated: false,
executed: false,
};

tx.sign(signing_key);
Expand Down Expand Up @@ -368,6 +372,7 @@ mod tests {
nonce: 0,
signature: Signature::default(),
propagated: false,
executed: false,
};

assert!(tx.validate().is_err());
Expand All @@ -382,6 +387,7 @@ mod tests {
nonce: 0,
signature: Signature::default(),
propagated: false,
executed: false,
};

assert!(tx.validate().is_err());
Expand Down

0 comments on commit c4ea88a

Please sign in to comment.