Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
correct output file VM missing and remove Task from DB
Browse files Browse the repository at this point in the history
  • Loading branch information
musitdev committed Feb 22, 2024
1 parent f246a34 commit 4d6cf3b
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 159 deletions.
2 changes: 1 addition & 1 deletion crates/node/src/networking/p2p/pea2pea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ mod tests {
use crate::txvalidation::CallbackSender;
use crate::txvalidation::EventProcessError;
use eyre::Result;
use gevulot_node::types::transaction::Payload;
use gevulot_node::types::transaction::Received;
use gevulot_node::types::{transaction::Payload, Transaction};
use libsecp256k1::SecretKey;
use rand::{rngs::StdRng, SeedableRng};
use tokio::sync::mpsc::UnboundedReceiver;
Expand Down
3 changes: 2 additions & 1 deletion crates/node/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Scheduler {

let mut task = match self.pick_task().await {
Some(t) => {
tracing::trace!("Pick a new task:{t:#?}");
tracing::debug!("task {}/{} scheduled for running", t.id, t.tx);
t
}
Expand Down Expand Up @@ -410,7 +411,7 @@ impl TaskManager for Scheduler {
id: task.tx.to_string(),
name: task.name.clone(),
args: task.args.clone(),
files: task.files.iter().map(|x| x.name.clone()).collect(),
files: task.files.iter().map(|x| x.file_path.clone()).collect(),
});
}
}
Expand Down
193 changes: 95 additions & 98 deletions crates/node/src/storage/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use crate::types::file::DbFile;
use crate::types::{
self,
transaction::{ProgramData, Validated},
Hash, Program, Task,
Hash, Program,
};
use eyre::Result;
use gevulot_node::types::program::ResourceRequest;
use libsecp256k1::PublicKey;
use sqlx::{self, postgres::PgPoolOptions, FromRow, Row};
use sqlx::{postgres::PgPoolOptions, FromRow, Row};
use std::time::Duration;
use uuid::Uuid;

const MAX_DB_CONNS: u32 = 64;
const DB_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -133,96 +132,96 @@ impl Database {
Ok(programs)
}

pub async fn add_task(&self, t: &Task) -> Result<()> {
let mut tx = self.pool.begin().await?;

if let Err(err) = sqlx::query(
"INSERT INTO task ( id, name, args, state, program_id ) VALUES ( $1, $2, $3, $4, $5 )",
)
.bind(t.id)
.bind(&t.name)
.bind(&t.args)
.bind(&t.state)
.bind(t.program_id)
.execute(&mut *tx)
.await
{
tx.rollback().await?;
return Err(err.into());
}

{
let mut query_builder =
sqlx::QueryBuilder::new("INSERT INTO file ( task_id, name, url, checksum )");
query_builder.push_values(&t.files, |mut b, new_file| {
b.push_bind(t.id)
.push_bind(&new_file.name)
.push_bind(&new_file.url)
.push_bind(new_file.checksum);
});

let query = query_builder.build();
if let Err(err) = query.execute(&mut *tx).await {
tx.rollback().await?;
return Err(err.into());
}
}

tx.commit().await.map_err(|e| e.into())
}

pub async fn find_task(&self, id: Uuid) -> Result<Option<Task>> {
let mut tx = self.pool.begin().await?;

// non-macro query_as used because of sqlx limitations with enums.
let task = sqlx::query_as::<_, Task>("SELECT * FROM task WHERE id = $1")
.bind(id)
.fetch_optional(&mut *tx)
.await?;

// Fetch accompanied Files for the Task.
match task {
Some(mut task) => {
let mut files =
sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1")
.bind(id)
.fetch_all(&mut *tx)
.await?;
task.files.append(&mut files);
Ok(Some(task))
}
None => Ok(None),
}
}

pub async fn get_tasks(&self) -> Result<Vec<Task>> {
let mut tx = self.pool.begin().await?;

// non-macro query_as used because of sqlx limitations with enums.
let mut tasks = sqlx::query_as::<_, Task>("SELECT * FROM task")
.fetch_all(&mut *tx)
.await?;

for task in &mut tasks {
let mut files = sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1")
.bind(task.id)
.fetch_all(&mut *tx)
.await?;

task.files.append(&mut files);
}

Ok(tasks)
}

pub async fn update_task_state(&self, t: &Task) -> Result<()> {
sqlx::query("UPDATE task SET state = $1 WHERE id = $2")
.bind(&t.state)
.bind(t.id)
.execute(&self.pool)
.await?;
Ok(())
}
// pub async fn add_task_old(&self, t: &Task) -> Result<()> {
// let mut tx = self.pool.begin().await?;

// if let Err(err) = sqlx::query(
// "INSERT INTO task ( id, name, args, state, program_id ) VALUES ( $1, $2, $3, $4, $5 )",
// )
// .bind(t.id)
// .bind(&t.name)
// .bind(&t.args)
// .bind(&t.state)
// .bind(t.program_id)
// .execute(&mut *tx)
// .await
// {
// tx.rollback().await?;
// return Err(err.into());
// }

// {
// let mut query_builder =
// sqlx::QueryBuilder::new("INSERT INTO file ( task_id, name, url, checksum )");
// query_builder.push_values(&t.files, |mut b, new_file| {
// b.push_bind(t.id)
// .push_bind(&new_file.name)
// .push_bind(&new_file.url)
// .push_bind(new_file.checksum);
// });

// let query = query_builder.build();
// if let Err(err) = query.execute(&mut *tx).await {
// tx.rollback().await?;
// return Err(err.into());
// }
// }

// tx.commit().await.map_err(|e| e.into())
// }

// pub async fn find_task(&self, id: Uuid) -> Result<Option<Task>> {
// let mut tx = self.pool.begin().await?;

// // non-macro query_as used because of sqlx limitations with enums.
// let task = sqlx::query_as::<_, Task>("SELECT * FROM task WHERE id = $1")
// .bind(id)
// .fetch_optional(&mut *tx)
// .await?;

// // Fetch accompanied Files for the Task.
// match task {
// Some(mut task) => {
// let mut files =
// sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1")
// .bind(id)
// .fetch_all(&mut *tx)
// .await?;
// task.files.append(&mut files);
// Ok(Some(task))
// }
// None => Ok(None),
// }
// }

// pub async fn get_tasks(&self) -> Result<Vec<Task>> {
// let mut tx = self.pool.begin().await?;

// // non-macro query_as used because of sqlx limitations with enums.
// let mut tasks = sqlx::query_as::<_, Task>("SELECT * FROM task")
// .fetch_all(&mut *tx)
// .await?;

// for task in &mut tasks {
// let mut files = sqlx::query_as::<_, DbFile>("SELECT * FROM file WHERE task_id = $1")
// .bind(task.id)
// .fetch_all(&mut *tx)
// .await?;

// task.files.append(&mut files);
// }

// Ok(tasks)
// }

// pub async fn update_task_state(&self, t: &Task) -> Result<()> {
// sqlx::query("UPDATE task SET state = $1 WHERE id = $2")
// .bind(&t.state)
// .bind(t.id)
// .execute(&self.pool)
// .await?;
// Ok(())
// }

// NOTE: There are plenty of opportunities for optimizations in following
// transaction related operations. They are implemented naively on purpose
Expand Down Expand Up @@ -681,14 +680,12 @@ impl Database {

#[cfg(test)]
mod tests {
use crate::types::transaction::Payload;
use gevulot_node::types::transaction::Created;
use libsecp256k1::{PublicKey, SecretKey};
use libsecp256k1::SecretKey;
use rand::{rngs::StdRng, Rng, SeedableRng};

use crate::types::{
transaction::{Payload, ProgramMetadata},
Signature, Transaction,
};
use crate::types::{transaction::ProgramMetadata, Signature, Transaction};

use super::*;

Expand Down
78 changes: 51 additions & 27 deletions crates/node/src/types/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,78 @@ use std::path::PathBuf;

pub async fn open_task_file(
data_dir: &PathBuf,
task_id: &str,
path: &str,
) -> Result<tokio::io::BufReader<tokio::fs::File>> {
let mut path = Path::new(path);
if path.is_absolute() {
path = path.strip_prefix("/")?;
}
let path = PathBuf::new().join(data_dir).join(task_id).join(path);
let path = PathBuf::new().join(data_dir).join(path);
let fd = tokio::fs::File::open(path).await?;
Ok(tokio::io::BufReader::new(fd))
}

//describe file data that is stored in the database.
// to manipulate file on disk use the equivalent type state definition File<T>
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, sqlx::FromRow)]
pub struct DbFile {
pub name: String,
pub url: String,
pub checksum: Hash,
// Describe a file use by an executed task.
#[derive(Clone, Debug)]
pub struct TaskVMFile {
pub file_path: String,
}

impl DbFile {
impl TaskVMFile {
pub fn try_from_prg_data(
tx_hash: Hash,
parent_output_files: &[File<ProofVerif>],
value: &transaction::ProgramData,
) -> Result<Option<Self>, &'static str> {
let file = match value {
transaction::ProgramData::Input {
file_name,
file_url,
checksum,
} => Some(DbFile {
name: file_name.clone(),
url: file_url.clone(),
checksum: checksum.clone().into(),
}),
) -> Result<TaskVMFile, String> {
match value {
transaction::ProgramData::Input { file_name, .. } => {
let mut file_path = Path::new(file_name);
if file_path.is_absolute() {
file_path = file_path.strip_prefix("/").unwrap(); // Unwrap tested in `is_absolute()`.
}

let file_path = PathBuf::new()
.join(tx_hash.to_string())
.join(file_path)
.to_str()
.unwrap()
.to_string();

Ok(TaskVMFile { file_path })
}
transaction::ProgramData::Output {
source_program: _,
file_name: _,
file_name,
} => {
//Output file are not use by Tx management.
None
//get the file path from the parent tx file list.
match parent_output_files
.iter()
.find(|file| &file.name == file_name)
{
Some(file) => {
let file_path =
file.get_relatif_path(tx_hash).to_str().unwrap().to_string();
Ok(TaskVMFile { file_path })
}
None => Err(format!(
"Tx:{} program output file:{file_name} not found",
tx_hash,
)),
}
}
};

Ok(file)
}
}
}

//describe file data that is stored in the database.
// to manipulate file on disk use the equivalent type state definition File<T>
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, sqlx::FromRow)]
pub struct DbFile {
pub name: String,
pub url: String,
pub checksum: Hash,
}

// Type state definition of a file to manage all the different case of file manipulation.
// Download: Use to download the file data and move them to the right place. Constructed using the other type.
// ProofVerif: A file attached to a proof or verify Tx: Only downloaded on distant host. Nothing done on the host when the Tx has been executed.
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/types/hash.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libsecp256k1::Message;
use rand::Rng;
use serde::{de, Deserialize, Serialize};
use sqlx::{self, Decode, Encode, Postgres, Type};
use sqlx::{Decode, Encode, Postgres, Type};
use std::fmt;

pub const HASH_SIZE: usize = 32;
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/types/signature.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde::{de, Deserialize, Serialize};
use sqlx::{self, Decode, Encode, Postgres, Type};
use sqlx::{Decode, Encode, Postgres, Type};
use std::fmt;
use thiserror::Error;

Expand Down
Loading

0 comments on commit 4d6cf3b

Please sign in to comment.