Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement file descriptor budget pool #284

Merged
merged 16 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
pub fd_total_budget: i32,
fd_budget: i32,
}

impl Factory {
Expand All @@ -212,7 +212,7 @@ impl Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
fd_total_budget: i32,
fd_budget: i32,
) -> Self {
let mut config = config.clone();
#[cfg(feature = "devnet-prealloc")]
Expand All @@ -228,7 +228,7 @@ impl Factory {
notification_root,
counters,
tx_script_cache_counters,
fd_total_budget,
fd_budget,
};
factory.delete_inactive_consensus_entries();
factory
Expand Down Expand Up @@ -258,7 +258,7 @@ impl ConsensusFactory for Factory {
let db = kaspa_database::prelude::ConnBuilder::default()
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(200.max(self.fd_total_budget * 70 / 100))
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

Expand Down Expand Up @@ -292,7 +292,7 @@ impl ConsensusFactory for Factory {
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(10.max(self.fd_total_budget * 10 / 100))
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Core {
}
}

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("... core is shut down");
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl AsyncRuntime {
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
try_join_all(futures).await.unwrap();

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("async-runtime worker stopped");
}

Expand Down
18 changes: 12 additions & 6 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const DEFAULT_DATA_DIR: &str = "datadir";
const CONSENSUS_DB: &str = "consensus";
const UTXOINDEX_DB: &str = "utxoindex";
const META_DB: &str = "meta";
const UTXO_INDEX_DB_FILE_LIMIT: i32 = 100;
const META_DB_FILE_LIMIT: i32 = 5;
const DEFAULT_LOG_DIR: &str = "logs";

Expand Down Expand Up @@ -177,7 +176,14 @@ pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreS
///
pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService>) {
let network = args.network();

let mut fd_remaining = fd_total_budget;
let utxo_files_limit = if args.utxoindex {
let utxo_files_limit = fd_remaining * 10 / 100;
fd_remaining -= utxo_files_limit;
utxo_files_limit
} else {
0
};
// Make sure args forms a valid set of properties
if let Err(err) = validate_args(args) {
println!("{}", err);
Expand Down Expand Up @@ -233,7 +239,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
// DB used for addresses store and for multi-consensus management
let mut meta_db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(meta_db_dir.clone())
.with_files_limit(META_DB_FILE_LIMIT.max(fd_total_budget * 5 / 100))
.with_files_limit(META_DB_FILE_LIMIT)
.build()
.unwrap();

Expand All @@ -258,7 +264,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
// Reopen the DB
meta_db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(meta_db_dir)
.with_files_limit(META_DB_FILE_LIMIT.max(fd_total_budget * 5 / 100))
.with_files_limit(META_DB_FILE_LIMIT)
.build()
.unwrap();
}
Expand Down Expand Up @@ -299,7 +305,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
notification_root.clone(),
processing_counters.clone(),
tx_script_cache_counters.clone(),
fd_total_budget,
fd_remaining,
));
let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory));
let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone()));
Expand All @@ -323,7 +329,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
// Use only a single thread for none-consensus databases
let utxoindex_db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(utxoindex_db_dir)
.with_files_limit(UTXO_INDEX_DB_FILE_LIMIT.max(fd_total_budget * 15 / 100))
.with_files_limit(utxo_files_limit)
.build()
.unwrap();
let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap());
Expand Down
4 changes: 2 additions & 2 deletions kaspad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ extern crate kaspa_hashes;
use std::sync::Arc;

use kaspa_core::{info, signals::Signals};
use kaspa_utils::fd_budget::get_limit;
use kaspa_utils::fd_budget::limit;
use kaspad_lib::{args::parse_args, daemon::create_core};

#[cfg(feature = "heap")]
Expand All @@ -17,7 +17,7 @@ pub fn main() {
let _profiler = dhat::Profiler::builder().file_name("kaspad-heap.json").build();

let args = parse_args();
let fd_total_budget = get_limit() - 64;
let fd_total_budget = limit() - args.rpc_max_clients as i32 - args.inbound_limit as i32 - args.outbound_target as i32;
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
let (core, _) = create_core(args, fd_total_budget);

// Bind the keyboard signal to the core
Expand Down
6 changes: 4 additions & 2 deletions simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use kaspa_database::prelude::ConnBuilder;
use kaspa_database::{create_temp_db, load_existing_db};
use kaspa_hashes::Hash;
use kaspa_perf_monitor::builder::Builder;
use kaspa_utils::fd_budget::limit;
use simulator::network::KaspaNetworkSimulator;
use std::{collections::VecDeque, sync::Arc, time::Duration};

Expand Down Expand Up @@ -169,7 +170,8 @@ fn main() {
builder = builder.set_archival();
}
let config = Arc::new(builder.build());
let mut conn_builder = ConnBuilder::default().with_parallelism(num_cpus::get()).with_files_limit(200);
let default_fd = limit() / 2;
let mut conn_builder = ConnBuilder::default().with_parallelism(num_cpus::get()).with_files_limit(default_fd);
if let Some(rocksdb_files_limit) = args.rocksdb_files_limit {
conn_builder = conn_builder.with_files_limit(rocksdb_files_limit);
}
Expand Down Expand Up @@ -220,7 +222,7 @@ fn main() {
}

// Benchmark the DAG validation time
let (_lifetime2, db2) = create_temp_db!(ConnBuilder::default().with_parallelism(num_cpus::get()).with_files_limit(200));
let (_lifetime2, db2) = create_temp_db!(ConnBuilder::default().with_parallelism(num_cpus::get()).with_files_limit(default_fd));
let (dummy_notification_sender, _) = unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let consensus2 = Arc::new(Consensus::new(
Expand Down
3 changes: 2 additions & 1 deletion simpa/src/simulator/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use kaspa_consensus_core::block::Block;
use kaspa_database::prelude::ConnBuilder;
use kaspa_database::utils::DbLifetime;
use kaspa_database::{create_permanent_db, create_temp_db};
use kaspa_utils::fd_budget::limit;
use kaspa_utils::sim::Simulation;

type ConsensusWrapper = (Arc<Consensus>, Vec<JoinHandle<()>>, DbLifetime);
Expand Down Expand Up @@ -53,7 +54,7 @@ impl KaspaNetworkSimulator {
let secp = secp256k1::Secp256k1::new();
let mut rng = rand::thread_rng();
for i in 0..num_miners {
let mut builder = ConnBuilder::default().with_files_limit(200);
let mut builder = ConnBuilder::default().with_files_limit(limit() / 2 / num_miners as i32);
if let Some(rocksdb_files_limit) = rocksdb_files_limit {
builder = builder.with_files_limit(rocksdb_files_limit);
}
Expand Down
4 changes: 2 additions & 2 deletions testing/integration/src/mempool_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use kaspa_notify::{
};
use kaspa_rpc_core::{api::rpc::RpcApi, Notification, RpcError};
use kaspa_txscript::pay_to_address_script;
use kaspa_utils::fd_budget::get_limit;
use kaspa_utils::fd_budget::limit;
use kaspad_lib::args::Args;
use parking_lot::Mutex;
use rand::thread_rng;
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn bench_bbt_latency() {
verify_tx_dag(&utxoset, &txs);
info!("Generated overall {} txs", txs.len());

let fd_total_budget = get_limit() - 64;
let fd_total_budget = limit();
let mut daemon = Daemon::new_random_with_args(args, fd_total_budget);
let client = daemon.start().await;
let bbt_client = daemon.new_client().await;
Expand Down
12 changes: 8 additions & 4 deletions utils/src/fd_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Error {
pub fn acquire_guard(value: i32) -> Result<FDGuard, Error> {
loop {
let acquired = ACQUIRED_FD.load(Ordering::SeqCst); // todo ordering??
let limit = get_limit();
let limit = limit();
if acquired + value > limit {
return Err(Error { acquired, limit });
}
Expand All @@ -50,7 +50,7 @@ pub fn acquire_guard(value: i32) -> Result<FDGuard, Error> {
}
}

pub fn get_limit() -> i32 {
pub fn limit() -> i32 {
cfg_if::cfg_if! {
if #[cfg(test)] {
100
Expand All @@ -62,17 +62,21 @@ pub fn get_limit() -> i32 {
rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap().0 as i32
}
else {
panic!("unsupported OS")
512
}
}
}

pub fn remainder() -> i32 {
limit() - ACQUIRED_FD.load(Ordering::Relaxed)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
fn test_acquire_and_release_guards() {
let guard = acquire_guard(30).unwrap();
assert_eq!(guard.acquired(), 30);
assert_eq!(ACQUIRED_FD.load(Ordering::Relaxed), 30);
Expand Down