Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
Different approach: Don't start scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
tuommaki committed Mar 11, 2024
1 parent f9ffb38 commit 8a00c96
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 206 deletions.
175 changes: 51 additions & 124 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use cli::{
};
use eyre::Result;
use gevulot_node::types;
use gevulot_node::types::transaction::Received;
use libsecp256k1::{PublicKey, SecretKey};
use pea2pea::Pea2Pea;
use rand::{rngs::StdRng, SeedableRng};
Expand All @@ -18,18 +19,15 @@ use std::{
io::{ErrorKind, Write},
net::ToSocketAddrs,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
thread::sleep,
time::Duration,
};
use systemstat::{ByteSize, Platform, System};
use tokio::sync::mpsc;
use tokio::sync::{Mutex as TMutex, RwLock};
use tokio::sync::RwLock;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Server;
use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan, EnvFilter};
use types::{transaction::Validated, Hash, Transaction};
use workflow::WorkflowEngine;

mod cli;
mod mempool;
Expand All @@ -46,6 +44,7 @@ use mempool::Mempool;
use storage::{database::entity, Database};

use crate::networking::WhitelistSyncer;
use crate::txvalidation::{CallbackSender, ValidatedTxReceiver};

fn start_logger(default_level: LevelFilter) {
let filter = match EnvFilter::try_from_default_env() {
Expand Down Expand Up @@ -185,24 +184,6 @@ async fn run(config: Arc<Config>) -> Result<()> {

let database = Arc::new(Database::new(&config.db_url).await?);

let http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();

let mempool = Arc::new(RwLock::new(
Mempool::new(database.clone(), config.no_execution).await?,
));

// Start Tx process event loop.
let (txevent_loop_jh, tx_sender, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
mempool.clone(),
)
.await?;

// Launch the ACL whitelist syncing early in the startup.
if let Some(ref whitelist_url) = config.acl_whitelist_url {
let acl_whitelist_syncer = WhitelistSyncer::new(whitelist_url.clone(), database.clone());
Expand All @@ -218,6 +199,53 @@ async fn run(config: Arc<Config>) -> Result<()> {
});
}

let http_peer_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>> =
Default::default();

// Define the channel that receives transactions from the outside of the node.
// These transactions must be validated first.
let (tx_sender, rcv_tx_event_rx) =
mpsc::unbounded_channel::<(Transaction<Received>, Option<CallbackSender>)>();

//To show to idea. Should use your config definition
let new_validated_tx_receiver: Arc<RwLock<dyn ValidatedTxReceiver>> = if !config.no_execution {
let mempool = Arc::new(RwLock::new(Mempool::new(database.clone()).await?));

let scheduler = scheduler::start_scheduler(
config.clone(),
database.clone(),
mempool.clone(),
node_key,
tx_sender.clone(),
)
.await;

// Run Scheduler in its own task.
tokio::spawn(async move { scheduler.run().await });
mempool
} else {
struct ArchiveMempool(Arc<Database>);
#[async_trait]
impl ValidatedTxReceiver for ArchiveMempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.0.as_ref().add_transaction(&tx).await
}
}
Arc::new(RwLock::new(ArchiveMempool(database.clone())))
};

// Start Tx process event loop.
let (txevent_loop_jh, p2p_stream) = txvalidation::spawn_event_loop(
config.data_directory.clone(),
config.p2p_listen_addr,
config.http_download_port,
http_peer_list.clone(),
database.clone(),
rcv_tx_event_rx,
new_validated_tx_receiver.clone(),
)
.await?;

let public_node_key = PublicKey::from_secret_key(&node_key);
let p2p = Arc::new(
networking::P2P::new(
Expand All @@ -237,107 +265,6 @@ async fn run(config: Arc<Config>) -> Result<()> {
let p2p_listen_addr = p2p.node().start_listening().await?;
tracing::info!("listening for p2p at {}", p2p_listen_addr);

let sys = System::new();
let num_gpus = if config.gpu_devices.is_some() { 1 } else { 0 };
let num_cpus = match config.num_cpus {
Some(cpus) => cpus,
None => num_cpus::get() as u64,
};
let available_mem = match config.mem_gb {
Some(mem_gb) => mem_gb * 1024 * 1024 * 1024,
None => {
let mem = sys.memory()?;
mem.total.as_u64()
}
};

tracing::info!(
"node configured with {} CPUs, {} MEM and {} GPUs",
num_cpus,
ByteSize(available_mem).to_string_as(true),
num_gpus
);

let resource_manager = Arc::new(Mutex::new(scheduler::ResourceManager::new(
available_mem,
num_cpus,
num_gpus,
)));

let workflow_engine = Arc::new(WorkflowEngine::new(database.clone()));
let download_url_prefix = format!(
"http://{}:{}",
config
.p2p_advertised_listen_addr
.unwrap_or(p2p_listen_addr)
.ip(),
config.http_download_port
);

let scheduler = if config.no_execution {
let provider = Arc::new(tokio::sync::Mutex::new(vmm::NoExecProvider::new()));
let program_manager = scheduler::ProgramManager::new(
database.clone(),
provider.clone(),
resource_manager.clone(),
);

Arc::new(scheduler::Scheduler::new(
mempool.clone(),
database.clone(),
program_manager,
workflow_engine,
node_key,
config.data_directory.clone(),
download_url_prefix,
txvalidation::TxEventSender::<txvalidation::TxResultSender>::build(tx_sender.clone()),
))
} else {
let qemu_provider = vmm::qemu::Qemu::new(config.clone());
let vsock_stream = qemu_provider.vm_server_listener().expect("vsock bind");

let provider = Arc::new(TMutex::new(qemu_provider));

let program_manager = scheduler::ProgramManager::new(
database.clone(),
provider.clone(),
resource_manager.clone(),
);

let scheduler = Arc::new(scheduler::Scheduler::new(
mempool.clone(),
database.clone(),
program_manager,
workflow_engine,
node_key,
config.data_directory.clone(),
download_url_prefix,
txvalidation::TxEventSender::<txvalidation::TxResultSender>::build(tx_sender.clone()),
));

let vm_server = vmm::vm_server::VMServer::new(
scheduler.clone(),
provider,
config.data_directory.clone(),
);

// Start gRPC VSOCK server.
tokio::spawn(async move {
Server::builder()
.add_service(vm_server.grpc_server())
.serve_with_incoming(vsock_stream)
.await
});

scheduler
};

// Start Scheduler.
tokio::spawn({
let scheduler = scheduler.clone();
async move { scheduler.run().await }
});

for addr in config.p2p_discovery_addrs.clone() {
tracing::info!("connecting to p2p peer {}", addr);
match addr.to_socket_addrs() {
Expand Down
35 changes: 12 additions & 23 deletions crates/node/src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::txvalidation::ValidatedTxReceiver;
use crate::types::{transaction::Validated, Hash, Transaction};
use async_trait::async_trait;
use eyre::Result;
Expand All @@ -23,27 +24,14 @@ pub enum MempoolError {
pub struct Mempool {
storage: Arc<dyn Storage>,
deque: VecDeque<Transaction<Validated>>,

persist_only: bool,
}

impl Mempool {
// If `persist_only` flag is set as `true`, the mempool won't populate
// `deque` with transactions. This can be used to disable transaction
// execution, while keeping rest of the node functionality as-is.
// Main use case for this is as a JSON-RPC API + Archive node.
pub async fn new(storage: Arc<dyn Storage>, persist_only: bool) -> Result<Self> {
pub async fn new(storage: Arc<dyn Storage>) -> Result<Self> {
let mut deque = VecDeque::new();
storage.fill_deque(&mut deque).await?;

if !persist_only {
storage.fill_deque(&mut deque).await?;
}

Ok(Self {
storage,
deque,
persist_only,
})
Ok(Self { storage, deque })
}

pub fn next(&mut self) -> Option<Transaction<Validated>> {
Expand All @@ -57,17 +45,18 @@ impl Mempool {

pub async fn add(&mut self, tx: Transaction<Validated>) -> Result<()> {
self.storage.set(&tx).await?;

// If `persist_only` is set, don't provide transactions available
// for consumption.
if !self.persist_only {
self.deque.push_back(tx);
}

self.deque.push_back(tx);
Ok(())
}

pub fn size(&self) -> usize {
self.deque.len()
}
}

#[async_trait::async_trait]
impl ValidatedTxReceiver for Mempool {
async fn send_new_tx(&mut self, tx: Transaction<Validated>) -> eyre::Result<()> {
self.add(tx).await
}
}
Loading

0 comments on commit 8a00c96

Please sign in to comment.