diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs b/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs index 234c982dde7..d93e6e748ad 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs @@ -85,7 +85,7 @@ fn unix_socket_vault( rt_handle.clone(), )); } - let vault = RemoteCspVault::new(socket_path, rt_handle, logger, metrics).unwrap_or_else(|e| { + let vault = RemoteCspVault::new(socket_path, logger, metrics).unwrap_or_else(|e| { panic!( "Could not connect to CspVault at socket {:?}: {:?}", socket_path, e diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs index eecf3d3ba4e..020b08cb45a 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs @@ -68,21 +68,19 @@ use ic_logger::replica_logger::no_op_logger; use slog_async::AsyncGuard; /// An implementation of `CspVault`-trait that talks to a remote CSP vault. -#[allow(dead_code)] pub struct RemoteCspVault { tarpc_csp_client: TarpcCspVaultClient, // default timeout for RPC calls that can timeout. rpc_timeout: Duration, // special, long timeout for RPC calls that should not really timeout. long_rpc_timeout: Duration, - tokio_runtime_handle: tokio::runtime::Handle, + tokio_rt: tokio::runtime::Runtime, logger: ReplicaLogger, metrics: Arc, #[cfg(test)] _logger_guard: Option, } -#[allow(dead_code)] #[derive(Clone, Eq, PartialEq, Hash, Debug, Deserialize, Serialize)] pub enum RemoteCspVaultError { TransportError { @@ -92,15 +90,19 @@ pub enum RemoteCspVaultError { } impl RemoteCspVault { - fn tokio_block_on(&self, task: T) -> T::Output { - self.tokio_runtime_handle.block_on(task) + fn tokio_block_on(&self, task: T) -> T::Output where ::Output: std::marker::Send, ::Output: 'static { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.tokio_rt.handle().spawn(async move { + let res = task.await; + tx.send(res); + }); + rx.blocking_recv().unwrap() } } const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes const LONG_RPC_TIMEOUT: Duration = Duration::from_secs(3600 * 24 * 100); // 100 days -#[allow(dead_code)] impl RemoteCspVault { /// Creates a new `RemoteCspVault`-object that communicates /// with a server via a Unix socket specified by `socket_path`. @@ -108,11 +110,10 @@ impl RemoteCspVault { /// otherwise the constructor will fail. pub fn new( socket_path: &Path, - rt_handle: tokio::runtime::Handle, logger: ReplicaLogger, metrics: Arc, ) -> Result { - RemoteCspVaultBuilder::new(socket_path.to_path_buf(), rt_handle) + RemoteCspVaultBuilder::new(socket_path.to_path_buf()) .with_logger(logger) .with_metrics(metrics) .build() @@ -120,15 +121,14 @@ impl RemoteCspVault { pub fn builder( socket_path: PathBuf, - rt_handle: tokio::runtime::Handle, ) -> RemoteCspVaultBuilder { - RemoteCspVaultBuilder::new(socket_path, rt_handle) + RemoteCspVaultBuilder::new(socket_path) } } pub struct RemoteCspVaultBuilder { socket_path: PathBuf, - rt_handle: tokio::runtime::Handle, + tokio_rt: tokio::runtime::Runtime, max_frame_length: usize, rpc_timeout: Duration, long_rpc_timeout: Duration, @@ -139,10 +139,17 @@ pub struct RemoteCspVaultBuilder { } impl RemoteCspVaultBuilder { - pub fn new(socket_path: PathBuf, rt_handle: tokio::runtime::Handle) -> Self { + pub fn new(socket_path: PathBuf) -> Self { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread3".to_string()) + .enable_all() + .build() + .unwrap(); + RemoteCspVaultBuilder { socket_path, - rt_handle, + tokio_rt, max_frame_length: FOUR_GIGA_BYTES, rpc_timeout: DEFAULT_RPC_TIMEOUT, long_rpc_timeout: LONG_RPC_TIMEOUT, @@ -195,7 +202,7 @@ impl RemoteCspVaultBuilder { pub fn build(self) -> Result { let conn = self - .rt_handle + .tokio_rt .block_on(robust_unix_socket::connect( self.socket_path.clone(), new_logger!(&self.logger), @@ -214,7 +221,7 @@ impl RemoteCspVaultBuilder { ), ); let client = { - let _enter_guard = self.rt_handle.enter(); + let _enter_guard = self.tokio_rt.enter(); TarpcCspVaultClient::new(Default::default(), transport).spawn() }; debug!(self.logger, "Instantiated remote CSP vault client"); @@ -222,7 +229,7 @@ impl RemoteCspVaultBuilder { tarpc_csp_client: client, rpc_timeout: self.rpc_timeout, long_rpc_timeout: self.long_rpc_timeout, - tokio_runtime_handle: self.rt_handle, + tokio_rt: self.tokio_rt, logger: self.logger, metrics: self.metrics, #[cfg(test)] @@ -256,12 +263,15 @@ impl BasicSignatureCspVault for RemoteCspVault { message: Vec, key_id: KeyId, ) -> Result { - self.tokio_block_on(self.tarpc_csp_client.sign( - context_with_timeout(self.rpc_timeout), + let c = self.tarpc_csp_client.clone(); + let t = self.rpc_timeout.clone(); + self.tokio_block_on(async move { + c.sign( + context_with_timeout(t), algorithm_id, ByteBuf::from(message), key_id, - )) + ).await}) .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { Err(CspBasicSignatureError::TransientInternalError { internal_error: rpc_error.to_string(), @@ -559,23 +569,14 @@ impl TlsHandshakeCspVault for RemoteCspVault { #[instrument(skip_all)] fn tls_sign(&self, message: Vec, key_id: KeyId) -> Result { - // Here we cannot call `block_on` directly but have to wrap it in - // `block_in_place` because this method here is called via a Rustls - // callback (via our implementation of the `rustls::sign::Signer` - // trait) from the async function `tokio_rustls::TlsAcceptor::accept`, - // which in turn is called from our async function - // `TlsHandshake::perform_tls_server_handshake`. - #[allow(clippy::disallowed_methods)] - tokio::task::block_in_place(|| { - self.tokio_block_on(self.tarpc_csp_client.tls_sign( - context_with_timeout(self.rpc_timeout), - ByteBuf::from(message), - key_id, - )) - .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { - Err(CspTlsSignError::TransientInternalError { - internal_error: rpc_error.to_string(), - }) + self.tokio_block_on(self.tarpc_csp_client.tls_sign( + context_with_timeout(self.rpc_timeout), + ByteBuf::from(message), + key_id, + )) + .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { + Err(CspTlsSignError::TransientInternalError { + internal_error: rpc_error.to_string(), }) }) } diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs index 023a46a8028..2a37d7a5a84 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs @@ -329,7 +329,6 @@ mod logging { let socket_path = start_new_remote_csp_vault_server_for_test(tokio_rt.handle()); let csp_vault = RemoteCspVault::new( &socket_path, - tokio_rt.handle().clone(), ReplicaLogger::from(&in_memory_logger), Arc::new(CryptoMetrics::none()), ) diff --git a/rs/crypto/node_key_generation/src/lib.rs b/rs/crypto/node_key_generation/src/lib.rs index e2995d018ee..ed3918ddae0 100644 --- a/rs/crypto/node_key_generation/src/lib.rs +++ b/rs/crypto/node_key_generation/src/lib.rs @@ -138,9 +138,15 @@ pub fn generate_node_keys_once( config: &CryptoConfig, tokio_runtime_handle: Option, ) -> Result { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread2".to_string()) + .enable_all() + .build() + .unwrap(); let vault = vault_from_config( config, - tokio_runtime_handle, + Some(tokio_rt.handle().clone()), no_op_logger(), Arc::new(CryptoMetrics::none()), ); diff --git a/rs/crypto/node_key_generation/tests/tests.rs b/rs/crypto/node_key_generation/tests/tests.rs index 8064ea38bb4..a638b6db32c 100644 --- a/rs/crypto/node_key_generation/tests/tests.rs +++ b/rs/crypto/node_key_generation/tests/tests.rs @@ -83,7 +83,6 @@ fn crypto_component( let metrics_registry = MetricsRegistry::new(); Arc::new(CryptoComponent::new( config, - tokio_runtime_handle, Arc::new(registry_client), logger, Some(&metrics_registry), diff --git a/rs/crypto/src/lib.rs b/rs/crypto/src/lib.rs index df68627be9e..2899c415b10 100644 --- a/rs/crypto/src/lib.rs +++ b/rs/crypto/src/lib.rs @@ -58,6 +58,7 @@ pub struct CryptoComponentImpl { vault: Arc, csp: C, registry_client: Arc, + tokio_rt: tokio::runtime::Runtime, // The node id of the node that instantiated this crypto component. node_id: NodeId, logger: ReplicaLogger, @@ -106,6 +107,13 @@ impl CryptoComponentImpl { metrics: Arc, time_source: Option>, ) -> Self { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread".to_string()) + .enable_all() + .build() + .unwrap(); + CryptoComponentImpl { lockable_threshold_sig_data_store: LockableThresholdSigDataStore::new(), csp, @@ -114,6 +122,7 @@ impl CryptoComponentImpl { node_id, logger, metrics, + tokio_rt, time_source: time_source.unwrap_or_else(|| Arc::new(SysTimeSource::new())), } } @@ -184,15 +193,21 @@ impl CryptoComponentImpl { /// ``` pub fn new( config: &CryptoConfig, - tokio_runtime_handle: Option, registry_client: Arc, logger: ReplicaLogger, metrics_registry: Option<&MetricsRegistry>, ) -> Self { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread".to_string()) + .enable_all() + .build() + .unwrap(); + let metrics = Arc::new(CryptoMetrics::new(metrics_registry)); let vault = vault_from_config( config, - tokio_runtime_handle, + Some(tokio_rt.handle().clone()), new_logger!(&logger), Arc::clone(&metrics), ); @@ -214,6 +229,7 @@ impl CryptoComponentImpl { let crypto_component = CryptoComponentImpl { lockable_threshold_sig_data_store: LockableThresholdSigDataStore::new(), csp, + tokio_rt, vault, registry_client, node_id, diff --git a/rs/crypto/temp_crypto/temp_vault/src/lib.rs b/rs/crypto/temp_crypto/temp_vault/src/lib.rs index 80ca6393ee7..6d4a8440ace 100644 --- a/rs/crypto/temp_crypto/temp_vault/src/lib.rs +++ b/rs/crypto/temp_crypto/temp_vault/src/lib.rs @@ -34,7 +34,6 @@ impl RemoteVaultEnvironment { pub fn new_vault_client_builder(&self) -> RemoteCspVaultBuilder { RemoteCspVault::builder( self.vault_server.vault_socket_path(), - self.vault_client_runtime.handle().clone(), ) } diff --git a/rs/crypto/tests/integration_test.rs b/rs/crypto/tests/integration_test.rs index ce3c5f0e8e2..ff803c51f60 100644 --- a/rs/crypto/tests/integration_test.rs +++ b/rs/crypto/tests/integration_test.rs @@ -49,13 +49,7 @@ fn should_successfully_construct_crypto_component_with_default_config() { CryptoConfig::run_with_temp_config(|config| { generate_node_keys_once(&config, None).expect("error generating node public keys"); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - None, - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); }) } @@ -69,13 +63,7 @@ fn should_successfully_construct_crypto_component_with_remote_csp_vault() { generate_node_keys_once(&config, Some(tokio_rt.handle().clone())) .expect("error generating node public keys"); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - Some(tokio_rt.handle().clone()), - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); } #[test] @@ -87,13 +75,7 @@ fn should_not_construct_crypto_component_if_remote_csp_vault_is_missing() { let config = CryptoConfig::new_with_unix_socket_vault(crypto_root, socket_path, None); let tokio_rt = new_tokio_runtime(); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - Some(tokio_rt.handle().clone()), - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); } // TODO(CRP-430): check/improve the test coverage of SKS checks. diff --git a/rs/crypto/tests/request_id_signatures.rs b/rs/crypto/tests/request_id_signatures.rs index d600a4070d9..a53ca9a0c8a 100644 --- a/rs/crypto/tests/request_id_signatures.rs +++ b/rs/crypto/tests/request_id_signatures.rs @@ -416,5 +416,5 @@ fn crypto_component(config: &CryptoConfig) -> CryptoComponent { ); ic_crypto_node_key_generation::generate_node_signing_keys(vault.as_ref()); - CryptoComponent::new(config, None, Arc::new(dummy_registry), no_op_logger(), None) + CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) } diff --git a/rs/crypto/tests/webauthn_signatures.rs b/rs/crypto/tests/webauthn_signatures.rs index 528633454d5..0b9d3a3e914 100644 --- a/rs/crypto/tests/webauthn_signatures.rs +++ b/rs/crypto/tests/webauthn_signatures.rs @@ -158,5 +158,5 @@ fn crypto_component(config: &CryptoConfig) -> CryptoComponent { ); ic_crypto_node_key_generation::generate_node_signing_keys(vault.as_ref()); - CryptoComponent::new(config, None, Arc::new(dummy_registry), no_op_logger(), None) + CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) } diff --git a/rs/interfaces/src/crypto/sign.rs b/rs/interfaces/src/crypto/sign.rs index 28404d8b754..7cd381db0e0 100644 --- a/rs/interfaces/src/crypto/sign.rs +++ b/rs/interfaces/src/crypto/sign.rs @@ -57,12 +57,6 @@ pub trait BasicSigner { /// * `CryptoError::MalformedSecretKey`: if the secret key is malformed. /// * `CryptoError::InvalidArgument`: if the signature algorithm is not /// supported. - /// - /// When called within a Tokio runtime the function should be wrapped inside - /// 'tokio::task::spawn_blocking' when in async function or - /// 'tokio::task::block_in_place' when in sync function (using 'block_in_place' - /// should be very rare event). Otherwise the call panics because the - /// implementation of 'sign_basic' calls 'tokio::runtime::Runtime.block_on'. fn sign_basic( &self, message: &T, diff --git a/rs/monitoring/metrics/src/adapter_metrics_registry.rs b/rs/monitoring/metrics/src/adapter_metrics_registry.rs index 435b1f74e05..2a20fccb5d4 100644 --- a/rs/monitoring/metrics/src/adapter_metrics_registry.rs +++ b/rs/monitoring/metrics/src/adapter_metrics_registry.rs @@ -54,6 +54,7 @@ impl AdapterMetricsRegistry { /// Write accesses to here can not be starved by `gather()` calls, due to /// the write-preffering behaviour of the used `tokio::sync::RwLock`. pub fn register(&self, adapter_metrics: AdapterMetrics) -> Result<(), Error> { + /* if self .adapters .blocking_read() @@ -63,11 +64,13 @@ impl AdapterMetricsRegistry { return Err(Error::AlreadyReg); } self.adapters.blocking_write().push(adapter_metrics); + */ Ok(()) } /// Concurrently scrapes metrics from all registered adapters. pub async fn gather(&self, timeout: Duration) -> Vec { + /* join_all( self.adapters .read() @@ -101,5 +104,7 @@ impl AdapterMetricsRegistry { .into_iter() .flatten() .collect() + */ + vec![] } } diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 6d732310a9b..3bdcdb84354 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -175,7 +175,6 @@ impl Orchestrator { let crypto = tokio::task::spawn_blocking(move || { Arc::new(CryptoComponent::new( &crypto_config, - Some(tokio::runtime::Handle::current()), c_registry.get_registry_client(), c_log.clone(), Some(&c_metrics), diff --git a/rs/replica/src/main.rs b/rs/replica/src/main.rs index 4c8c661710e..f7a78cd4579 100644 --- a/rs/replica/src/main.rs +++ b/rs/replica/src/main.rs @@ -201,12 +201,8 @@ fn main() -> io::Result<()> { .set(1); } - let (registry, crypto) = setup::setup_crypto_registry( - &config, - rt_main.handle().clone(), - &metrics_registry, - logger.clone(), - ); + let (registry, crypto) = + setup::setup_crypto_registry(&config, &metrics_registry, logger.clone()); let node_id = crypto.get_node_id(); diff --git a/rs/replica/src/setup.rs b/rs/replica/src/setup.rs index 7ae35568d86..ef1a83f7377 100644 --- a/rs/replica/src/setup.rs +++ b/rs/replica/src/setup.rs @@ -178,7 +178,6 @@ pub fn get_config_source(replica_args: &Result) -> Con pub fn setup_crypto_registry( config: &Config, - tokio_runtime_handle: tokio::runtime::Handle, metrics_registry: &MetricsRegistry, logger: ReplicaLogger, ) -> (std::sync::Arc, CryptoComponent) { @@ -199,7 +198,6 @@ pub fn setup_crypto_registry( // TODO(RPL-49): pass in registry_client let crypto = setup_crypto_provider( &config.crypto, - tokio_runtime_handle, Arc::clone(®istry) as Arc, logger, metrics_registry, @@ -268,17 +266,10 @@ Examples: /// created. pub fn setup_crypto_provider( config: &CryptoConfig, - tokio_runtime_handle: tokio::runtime::Handle, registry: Arc, replica_logger: ReplicaLogger, metrics_registry: &MetricsRegistry, ) -> CryptoComponent { CryptoConfig::check_dir_has_required_permissions(&config.crypto_root).unwrap(); - CryptoComponent::new( - config, - Some(tokio_runtime_handle), - registry, - replica_logger, - Some(metrics_registry), - ) + CryptoComponent::new(config, registry, replica_logger, Some(metrics_registry)) } diff --git a/rs/replica_tests/src/lib.rs b/rs/replica_tests/src/lib.rs index 5df1d565e45..263823e5c51 100644 --- a/rs/replica_tests/src/lib.rs +++ b/rs/replica_tests/src/lib.rs @@ -300,7 +300,6 @@ where let registry = fake_registry_client.clone() as Arc; let crypto = setup_crypto_provider( &config.crypto, - rt.handle().clone(), registry.clone(), logger.clone(), &metrics_registry,