Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Add flag for --no-execution to disable tx exec (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuommaki authored Mar 13, 2024
1 parent 84ba84e commit 692f7c4
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 115 deletions.
8 changes: 8 additions & 0 deletions crates/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ pub struct Config {
)]
pub log_directory: PathBuf,

#[arg(
long,
long_help = "No execution flag. When set as true, the node does not execute transactions.",
env = "GEVULOT_NODE_NO_EXECUTION",
default_value_t = false
)]
pub no_execution: bool,

#[arg(
long,
long_help = "File where the node key is persisted",
Expand Down
148 changes: 51 additions & 97 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use cli::{
};
use eyre::Result;
use gevulot_node::types;
use gevulot_node::types::transaction::Received;
use libsecp256k1::{PublicKey, SecretKey};
use pea2pea::Pea2Pea;
use rand::{rngs::StdRng, SeedableRng};
Expand All @@ -18,18 +19,15 @@ use std::{
io::{ErrorKind, Write},
net::ToSocketAddrs,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
thread::sleep,
time::Duration,
};
use systemstat::{ByteSize, Platform, System};
use tokio::sync::mpsc;
use tokio::sync::{Mutex as TMutex, RwLock};
use tokio::sync::RwLock;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Server;
use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan, EnvFilter};
use types::{transaction::Validated, Hash, Transaction};
use workflow::WorkflowEngine;

mod cli;
mod mempool;
Expand All @@ -46,6 +44,7 @@ use mempool::Mempool;
use storage::{database::entity, Database};

use crate::networking::WhitelistSyncer;
use crate::txvalidation::{CallbackSender, ValidatedTxReceiver};

fn start_logger(default_level: LevelFilter) {
let filter = match EnvFilter::try_from_default_env() {
Expand Down Expand Up @@ -185,22 +184,6 @@ async fn run(config: Arc<Config>) -> Result<()> {

let database = Arc::new(Database::new(&config.db_url).await?);

let http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();

let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));

// Start Tx process event loop.
let (txevent_loop_jh, tx_sender, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
mempool.clone(),
)
.await?;

// Launch the ACL whitelist syncing early in the startup.
if let Some(ref whitelist_url) = config.acl_whitelist_url {
let acl_whitelist_syncer = WhitelistSyncer::new(whitelist_url.clone(), database.clone());
Expand All @@ -216,6 +199,53 @@ async fn run(config: Arc<Config>) -> Result<()> {
});
}

let http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();

// Define the channel that receives transactions from the outside of the node.
// These transactions must be validated first.
let (tx_sender, rcv_tx_event_rx) =
mpsc::unbounded_channel::<(Transaction<Received>, Option<CallbackSender>)>();

//To show to idea. Should use your config definition
let new_validated_tx_receiver: Arc<RwLock<dyn ValidatedTxReceiver>> = if !config.no_execution {
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));

let scheduler = scheduler::start_scheduler(
config.clone(),
database.clone(),
mempool.clone(),
node_key,
tx_sender.clone(),
)
.await;

// Run Scheduler in its own task.
tokio::spawn(async move { scheduler.run().await });
mempool
} else {
struct ArchiveMempool(Arc<Database>);
#[async_trait]
impl ValidatedTxReceiver for ArchiveMempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.0.as_ref().add_transaction(&tx).await
}
}
Arc::new(RwLock::new(ArchiveMempool(database.clone())))
};

// Start Tx process event loop.
let (txevent_loop_jh, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
rcv_tx_event_rx,
new_validated_tx_receiver.clone(),
)
.await?;

let public_node_key = PublicKey::from_secret_key(&node_key);
let p2p = Arc::new(
networking::P2P::new(
Expand All @@ -235,82 +265,6 @@ async fn run(config: Arc<Config>) -> Result<()> {
let p2p_listen_addr = p2p.node().start_listening().await?;
tracing::info!("listening for p2p at {}", p2p_listen_addr);

let sys = System::new();
let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 };
let num_cpus = match config.num_cpus {
Some(cpus) => cpus,
None => num_cpus::get() as u64,
};
let available_mem = match config.mem_gb {
Some(mem_gb) => mem_gb * 1024 * 1024 * 1024,
None => {
let mem = sys.memory()?;
mem.total.as_u64()
}
};

tracing::info!(
"node configured with {} CPUs, {} MEM and {} GPUs",
num_cpus,
ByteSize(available_mem).to_string_as(true),
num_gpus
);

let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new(
available_mem,
num_cpus,
num_gpus,
)));

// TODO(tuommaki): Handle provider from config.
let qemu_provider = vmm::qemu::Qemu::new(config.clone());
let vsock_stream = qemu_provider.vm_server_listener().expect("vsock bind");

let provider = Arc::new(TMutex::new(qemu_provider));
let program_manager = scheduler::ProgramManager::new(
database.clone(),
provider.clone(),
resource_manager.clone(),
);

let workflow_engine = Arc::new(WorkflowEngine::new(database.clone()));
let download_url_prefix = format!(
"http://{}:{}",
config
.p2p_advertised_listen_addr
.unwrap_or(p2p_listen_addr)
.ip(),
config.http_download_port
);

let scheduler = Arc::new(scheduler::Scheduler::new(
mempool.clone(),
database.clone(),
program_manager,
workflow_engine,
node_key,
config.data_directory.clone(),
download_url_prefix,
txvalidation::TxEventSender::<txvalidation::TxResultSender>::build(tx_sender.clone()),
));

let vm_server =
vmm::vm_server::VMServer::new(scheduler.clone(), provider, config.data_directory.clone());

// Start gRPC VSOCK server.
tokio::spawn(async move {
Server::builder()
.add_service(vm_server.grpc_server())
.serve_with_incoming(vsock_stream)
.await
});

// Start Scheduler.
tokio::spawn({
let scheduler = scheduler.clone();
async move { scheduler.run().await }
});

for addr in config.p2p_discovery_addrs.clone() {
tracing::info!("connecting to p2p peer {}", addr);
match addr.to_socket_addrs() {
Expand Down
8 changes: 8 additions & 0 deletions crates/node/src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::txvalidation::ValidatedTxReceiver;
use crate::types::{transaction::Validated, Hash, Transaction};
use async_trait::async_trait;
use eyre::Result;
Expand Down Expand Up @@ -52,3 +53,10 @@ impl Mempool {
self.deque.len()
}
}

#[async_trait::async_trait]
impl ValidatedTxReceiver for Mempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.add(tx).await
}
}
1 change: 1 addition & 0 deletions crates/node/src/rpc_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ mod tests {
db_url: "postgres://gevulot:gevulot@localhost/gevulot".to_string(),
json_rpc_listen_addr: "127.0.0.1:0".parse().unwrap(),
log_directory: temp_dir(),
no_execution: true,
node_key_file: PathBuf::new().join("node.key"),
p2p_discovery_addrs: vec![],
p2p_listen_addr: "127.0.0.1:9999".parse().unwrap(),
Expand Down
85 changes: 85 additions & 0 deletions crates/node/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
mod program_manager;
mod resource_manager;

use crate::cli::Config;
use crate::storage::Database;
use crate::txvalidation;
use crate::txvalidation::CallbackSender;
use crate::txvalidation::TxEventSender;
use crate::txvalidation::TxResultSender;
use crate::types::file::{move_vmfile, Output, TaskVmFile, TxFile, VmOutput};
use crate::types::TaskState;
use crate::vmm::qemu::Qemu;
use crate::vmm::vm_server::grpc;
use crate::vmm::vm_server::TaskManager;
use crate::vmm::vm_server::VMServer;
use crate::workflow::{WorkflowEngine, WorkflowError};
use crate::{
mempool::Mempool,
Expand All @@ -16,6 +21,7 @@ use crate::{
use async_trait::async_trait;
use eyre::Result;
use gevulot_node::types::transaction::Payload;
use gevulot_node::types::transaction::Received;
use gevulot_node::types::{TaskKind, Transaction};
use libsecp256k1::SecretKey;
pub use program_manager::ProgramManager;
Expand All @@ -28,10 +34,15 @@ use std::{
sync::Arc,
time::Duration,
};
use systemstat::ByteSize;
use systemstat::Platform;
use systemstat::System;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
sync::{Mutex, RwLock},
time::sleep,
};
use tonic::transport::Server;

use self::program_manager::{ProgramError, ProgramHandle};
use self::resource_manager::ResourceError;
Expand Down Expand Up @@ -85,6 +96,80 @@ pub struct Scheduler {
tx_sender: TxEventSender<TxResultSender>,
}

pub async fn start_scheduler(
config: Arc<Config>,
storage: Arc<Database>,
mempool: Arc<RwLock<Mempool>>,
node_key: SecretKey,
tx_sender: UnboundedSender<(Transaction<Received>, Option<CallbackSender>)>,
) -> Arc<Scheduler> {
let sys = System::new();
let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 };
let num_cpus = match config.num_cpus {
Some(cpus) => cpus,
None => num_cpus::get() as u64,
};
let available_mem = match config.mem_gb {
Some(mem_gb) => mem_gb * 1024 * 1024 * 1024,
None => {
let mem = sys
.memory()
.expect("failed to lookup available system memory");
mem.total.as_u64()
}
};

tracing::info!(
"node configured with {} CPUs, {} MEM and {} GPUs",
num_cpus,
ByteSize(available_mem).to_string_as(true),
num_gpus
);

let resource_manager = Arc::new(std::sync::Mutex::new(ResourceManager::new(
available_mem,
num_cpus,
num_gpus,
)));

// TODO(tuommaki): Handle provider from config.
let qemu_provider = Qemu::new(config.clone());
let vsock_stream = qemu_provider.vm_server_listener().expect("vsock bind");

let provider = Arc::new(Mutex::new(qemu_provider));
let program_manager =
ProgramManager::new(storage.clone(), provider.clone(), resource_manager.clone());

let workflow_engine = Arc::new(WorkflowEngine::new(storage.clone()));
let download_url_prefix = format!(
"http://{}:{}",
config.p2p_listen_addr.ip(),
config.http_download_port
);

let scheduler = Arc::new(Scheduler::new(
mempool.clone(),
storage.clone(),
program_manager,
workflow_engine,
node_key,
config.data_directory.clone(),
download_url_prefix,
txvalidation::TxEventSender::<txvalidation::TxResultSender>::build(tx_sender.clone()),
));

let vm_server = VMServer::new(scheduler.clone(), provider, config.data_directory.clone());

// Start gRPC VSOCK server.
tokio::spawn(async move {
Server::builder()
.add_service(vm_server.grpc_server())
.serve_with_incoming(vsock_stream)
.await
});
scheduler
}

impl Scheduler {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down
12 changes: 8 additions & 4 deletions crates/node/src/txvalidation/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use crate::types::{
transaction::{Received, Validated},
Transaction,
};
use crate::Mempool;
use futures::future::join_all;
use futures_util::TryFutureExt;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use tokio::sync::mpsc::UnboundedSender;

use super::ValidatedTxReceiver;

//event type.
#[derive(Debug, Clone)]
pub struct ReceivedTx;
Expand Down Expand Up @@ -175,7 +176,10 @@ impl TxEvent<PropagateTx> {
}

impl TxEvent<NewTx> {
pub async fn process_event(self, mempool: &mut Mempool) -> Result<(), EventProcessError> {
pub async fn process_event(
self,
newtx_receiver: &mut dyn ValidatedTxReceiver,
) -> Result<(), EventProcessError> {
let tx = Transaction {
author: self.tx.author,
hash: self.tx.hash,
Expand All @@ -192,8 +196,8 @@ impl TxEvent<NewTx> {
tx.hash.to_string(),
tx.payload
);
mempool
.add(tx)
newtx_receiver
.send_new_tx(tx)
.map_err(|err| EventProcessError::SaveTxError(format!("{err}")))
.await
}
Expand Down
Loading

0 comments on commit 692f7c4

Please sign in to comment.