Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement file descriptor budget pool #284

Merged
merged 16 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ kaspa-wrpc-server = { version = "0.1.7", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.1.7", path = "rpc/wrpc/wasm" }
kaspad = { version = "0.1.7", path = "kaspad" }
kaspa-perf-monitor = { path = "metrics/perf_monitor" }

# external
thiserror = "1"
faster-hex = "0.6"
Expand Down
2 changes: 1 addition & 1 deletion components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ mod address_store_with_cache {
// Assert that initial distribution is skewed, and hence not uniform from the outset.
assert!(bucket_reduction_ratio >= 1.25);

let db = create_temp_db!(ConnBuilder::default());
let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let config = Config::new(SIMNET_PARAMS);
let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));

Expand Down
29 changes: 25 additions & 4 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub struct Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
fd_budget: i32,
}

impl Factory {
Expand All @@ -211,15 +212,25 @@ impl Factory {
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
fd_budget: i32,
) -> Self {
assert!(fd_budget > 0, "fd_budget has to be positive");
let mut config = config.clone();
#[cfg(feature = "devnet-prealloc")]
set_genesis_utxo_commitment_from_config(&mut config);
config.process_genesis = false;
let management_store = Arc::new(RwLock::new(MultiConsensusManagementStore::new(management_db)));
management_store.write().set_is_archival_node(config.is_archival);
let factory =
Self { management_store, config, db_root_dir, db_parallelism, notification_root, counters, tx_script_cache_counters };
let factory = Self {
management_store,
config,
db_root_dir,
db_parallelism,
notification_root,
counters,
tx_script_cache_counters,
fd_budget,
};
factory.delete_inactive_consensus_entries();
factory
}
Expand All @@ -245,7 +256,12 @@ impl ConsensusFactory for Factory {
};

let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down Expand Up @@ -274,7 +290,12 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
Expand All @@ -87,7 +87,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and no notifier
pub fn new(config: &Config) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Default::default();
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl RelationsStore for MemoryRelationsStore {
mod tests {
use super::*;
use crate::processes::relations::RelationsStoreExtensions;
use kaspa_database::create_temp_db;

#[test]
fn test_memory_relations_store() {
Expand All @@ -299,9 +300,9 @@ mod tests {

#[test]
fn test_db_relations_store() {
let db_tempdir = kaspa_database::utils::get_kaspa_tempdir();
let db = Arc::new(DB::open_default(db_tempdir.path().to_owned().to_str().unwrap()).unwrap());
let (lt, db) = create_temp_db!(kaspa_database::prelude::ConnBuilder::default().with_files_limit(10));
test_relations_store(DbRelationsStore::new(db, 0, 2));
drop(lt)
}

fn test_relations_store<T: RelationsStore>(mut store: T) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl PruningProofManager {
let proof_pp_header = proof[0].last().expect("checked if empty");
let proof_pp = proof_pp_header.hash;
let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
let headers_store = Arc::new(DbHeadersStore::new(db.clone(), 2 * self.pruning_proof_m)); // TODO: Think about cache size
let ghostdag_stores = (0..=self.max_block_level)
.map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, 2 * self.pruning_proof_m)))
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {
/// Runs a DAG test-case with full verification using the staging store mechanism.
/// Note: runtime is quadratic in the number of blocks so should be used with mildly small DAGs (~50)
fn run_dag_test_case_with_staging(test: &DagTestCase) {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let reachability = RwLock::new(DbReachabilityStore::new(db.clone(), cache_size));
let relations = RwLock::new(DbRelationsStore::with_prefix(db.clone(), &[], 0));
Expand Down Expand Up @@ -533,7 +533,7 @@ mod tests {
run_dag_test_case(&mut relations, &mut reachability, &test);

// Run with direct DB stores
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let mut reachability = DbReachabilityStore::new(db.clone(), cache_size);
let mut relations = DbRelationsStore::new(db, 0, cache_size);
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod tests {

#[test]
fn test_delete_level_relations_zero_cache() {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = 0;
let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size);
relations.insert(ORIGIN, Default::default()).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Core {
}
}

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("... core is shut down");
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl AsyncRuntime {
.collect::<Vec<TaskJoinHandle<AsyncServiceResult<()>>>>();
try_join_all(futures).await.unwrap();

// Drop all services and cleanup
self.services.lock().unwrap().clear();

trace!("async-runtime worker stopped");
}

Expand Down
1 change: 0 additions & 1 deletion database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ tempfile.workspace = true

enum-primitive-derive = "0.2.2"
num-traits = "0.2.15"
rlimit = "0.10.1"
29 changes: 27 additions & 2 deletions database/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;

pub use conn_builder::ConnBuilder;
use kaspa_utils::fd_budget::FDGuard;

mod conn_builder;

/// The DB type used for Kaspad stores
pub type DB = DBWithThreadMode<MultiThreaded>;
pub struct DB {
inner: DBWithThreadMode<MultiThreaded>,
_fd_guard: FDGuard,
}

impl DB {
pub fn new(inner: DBWithThreadMode<MultiThreaded>, fd_guard: FDGuard) -> Self {
Self { inner, _fd_guard: fd_guard }
}
}

impl DerefMut for DB {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl Deref for DB {
type Target = DBWithThreadMode<MultiThreaded>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

/// Deletes an existing DB if it exists
pub fn delete_db(db_dir: PathBuf) {
Expand All @@ -15,5 +40,5 @@ pub fn delete_db(db_dir: PathBuf) {
}
let options = rocksdb::Options::default();
let path = db_dir.to_str().unwrap();
DB::destroy(&options, path).expect("DB is expected to be deletable");
<DBWithThreadMode<MultiThreaded>>::destroy(&options, path).expect("DB is expected to be deletable");
}
Loading