From 5e04b16c08c68e3eea7537bf9e03b419299296ec Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 09:06:35 +0000 Subject: [PATCH 1/6] . --- .../src/vault/remote_csp_vault/tarpc_csp_vault_client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs index eecf3d3ba4e..5dc479925ad 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs @@ -100,7 +100,6 @@ impl RemoteCspVault { const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes const LONG_RPC_TIMEOUT: Duration = Duration::from_secs(3600 * 24 * 100); // 100 days -#[allow(dead_code)] impl RemoteCspVault { /// Creates a new `RemoteCspVault`-object that communicates /// with a server via a Unix socket specified by `socket_path`. From 1468f969d889316390a6c45543f29bbf96b3ebc9 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 09:17:21 +0000 Subject: [PATCH 2/6] . --- .../tarpc_csp_vault_client.rs | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs index 5dc479925ad..20e7605a1df 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs @@ -558,23 +558,14 @@ impl TlsHandshakeCspVault for RemoteCspVault { #[instrument(skip_all)] fn tls_sign(&self, message: Vec, key_id: KeyId) -> Result { - // Here we cannot call `block_on` directly but have to wrap it in - // `block_in_place` because this method here is called via a Rustls - // callback (via our implementation of the `rustls::sign::Signer` - // trait) from the async function `tokio_rustls::TlsAcceptor::accept`, - // which in turn is called from our async function - // `TlsHandshake::perform_tls_server_handshake`. - #[allow(clippy::disallowed_methods)] - tokio::task::block_in_place(|| { - self.tokio_block_on(self.tarpc_csp_client.tls_sign( - context_with_timeout(self.rpc_timeout), - ByteBuf::from(message), - key_id, - )) - .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { - Err(CspTlsSignError::TransientInternalError { - internal_error: rpc_error.to_string(), - }) + self.tokio_block_on(self.tarpc_csp_client.tls_sign( + context_with_timeout(self.rpc_timeout), + ByteBuf::from(message), + key_id, + )) + .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { + Err(CspTlsSignError::TransientInternalError { + internal_error: rpc_error.to_string(), }) }) } From 55be9f3f9ee8586c6db59bf32489b6e690e60d98 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 12:30:44 +0000 Subject: [PATCH 3/6] . --- rs/crypto/node_key_generation/tests/tests.rs | 1 - rs/crypto/src/lib.rs | 10 ++++++++-- rs/crypto/tests/integration_test.rs | 3 --- rs/crypto/tests/request_id_signatures.rs | 2 +- rs/crypto/tests/webauthn_signatures.rs | 2 +- rs/orchestrator/src/orchestrator.rs | 1 - rs/replica/src/main.rs | 8 ++------ rs/replica/src/setup.rs | 11 +---------- rs/replica_tests/src/lib.rs | 1 - 9 files changed, 13 insertions(+), 26 deletions(-) diff --git a/rs/crypto/node_key_generation/tests/tests.rs b/rs/crypto/node_key_generation/tests/tests.rs index 8064ea38bb4..a638b6db32c 100644 --- a/rs/crypto/node_key_generation/tests/tests.rs +++ b/rs/crypto/node_key_generation/tests/tests.rs @@ -83,7 +83,6 @@ fn crypto_component( let metrics_registry = MetricsRegistry::new(); Arc::new(CryptoComponent::new( config, - tokio_runtime_handle, Arc::new(registry_client), logger, Some(&metrics_registry), diff --git a/rs/crypto/src/lib.rs b/rs/crypto/src/lib.rs index df68627be9e..70f14d6c65e 100644 --- a/rs/crypto/src/lib.rs +++ b/rs/crypto/src/lib.rs @@ -184,15 +184,21 @@ impl CryptoComponentImpl { /// ``` pub fn new( config: &CryptoConfig, - tokio_runtime_handle: Option, registry_client: Arc, logger: ReplicaLogger, metrics_registry: Option<&MetricsRegistry>, ) -> Self { + let tokio_runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto".to_string()) + .enable_all() + .build() + .unwrap(); + let metrics = Arc::new(CryptoMetrics::new(metrics_registry)); let vault = vault_from_config( config, - tokio_runtime_handle, + Some(tokio_runtime.handle().clone()), new_logger!(&logger), Arc::clone(&metrics), ); diff --git a/rs/crypto/tests/integration_test.rs b/rs/crypto/tests/integration_test.rs index ce3c5f0e8e2..3601db218a2 100644 --- a/rs/crypto/tests/integration_test.rs +++ b/rs/crypto/tests/integration_test.rs @@ -51,7 +51,6 @@ fn should_successfully_construct_crypto_component_with_default_config() { let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); CryptoComponent::new( &config, - None, Arc::new(registry_client), no_op_logger(), None, @@ -71,7 +70,6 @@ fn should_successfully_construct_crypto_component_with_remote_csp_vault() { let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); CryptoComponent::new( &config, - Some(tokio_rt.handle().clone()), Arc::new(registry_client), no_op_logger(), None, @@ -89,7 +87,6 @@ fn should_not_construct_crypto_component_if_remote_csp_vault_is_missing() { let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); CryptoComponent::new( &config, - Some(tokio_rt.handle().clone()), Arc::new(registry_client), no_op_logger(), None, diff --git a/rs/crypto/tests/request_id_signatures.rs b/rs/crypto/tests/request_id_signatures.rs index d600a4070d9..a53ca9a0c8a 100644 --- a/rs/crypto/tests/request_id_signatures.rs +++ b/rs/crypto/tests/request_id_signatures.rs @@ -416,5 +416,5 @@ fn crypto_component(config: &CryptoConfig) -> CryptoComponent { ); ic_crypto_node_key_generation::generate_node_signing_keys(vault.as_ref()); - CryptoComponent::new(config, None, Arc::new(dummy_registry), no_op_logger(), None) + CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) } diff --git a/rs/crypto/tests/webauthn_signatures.rs b/rs/crypto/tests/webauthn_signatures.rs index 528633454d5..549789a7353 100644 --- a/rs/crypto/tests/webauthn_signatures.rs +++ b/rs/crypto/tests/webauthn_signatures.rs @@ -158,5 +158,5 @@ fn crypto_component(config: &CryptoConfig) -> CryptoComponent { ); ic_crypto_node_key_generation::generate_node_signing_keys(vault.as_ref()); - CryptoComponent::new(config, None, Arc::new(dummy_registry), no_op_logger(), None) + CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) } diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 6d732310a9b..3bdcdb84354 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -175,7 +175,6 @@ impl Orchestrator { let crypto = tokio::task::spawn_blocking(move || { Arc::new(CryptoComponent::new( &crypto_config, - Some(tokio::runtime::Handle::current()), c_registry.get_registry_client(), c_log.clone(), Some(&c_metrics), diff --git a/rs/replica/src/main.rs b/rs/replica/src/main.rs index 4c8c661710e..f7a78cd4579 100644 --- a/rs/replica/src/main.rs +++ b/rs/replica/src/main.rs @@ -201,12 +201,8 @@ fn main() -> io::Result<()> { .set(1); } - let (registry, crypto) = setup::setup_crypto_registry( - &config, - rt_main.handle().clone(), - &metrics_registry, - logger.clone(), - ); + let (registry, crypto) = + setup::setup_crypto_registry(&config, &metrics_registry, logger.clone()); let node_id = crypto.get_node_id(); diff --git a/rs/replica/src/setup.rs b/rs/replica/src/setup.rs index 7ae35568d86..ef1a83f7377 100644 --- a/rs/replica/src/setup.rs +++ b/rs/replica/src/setup.rs @@ -178,7 +178,6 @@ pub fn get_config_source(replica_args: &Result) -> Con pub fn setup_crypto_registry( config: &Config, - tokio_runtime_handle: tokio::runtime::Handle, metrics_registry: &MetricsRegistry, logger: ReplicaLogger, ) -> (std::sync::Arc, CryptoComponent) { @@ -199,7 +198,6 @@ pub fn setup_crypto_registry( // TODO(RPL-49): pass in registry_client let crypto = setup_crypto_provider( &config.crypto, - tokio_runtime_handle, Arc::clone(®istry) as Arc, logger, metrics_registry, @@ -268,17 +266,10 @@ Examples: /// created. pub fn setup_crypto_provider( config: &CryptoConfig, - tokio_runtime_handle: tokio::runtime::Handle, registry: Arc, replica_logger: ReplicaLogger, metrics_registry: &MetricsRegistry, ) -> CryptoComponent { CryptoConfig::check_dir_has_required_permissions(&config.crypto_root).unwrap(); - CryptoComponent::new( - config, - Some(tokio_runtime_handle), - registry, - replica_logger, - Some(metrics_registry), - ) + CryptoComponent::new(config, registry, replica_logger, Some(metrics_registry)) } diff --git a/rs/replica_tests/src/lib.rs b/rs/replica_tests/src/lib.rs index 5df1d565e45..263823e5c51 100644 --- a/rs/replica_tests/src/lib.rs +++ b/rs/replica_tests/src/lib.rs @@ -300,7 +300,6 @@ where let registry = fake_registry_client.clone() as Arc; let crypto = setup_crypto_provider( &config.crypto, - rt.handle().clone(), registry.clone(), logger.clone(), &metrics_registry, From 4a67871fa621faad21d7c48020ee2d28fadd98b6 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 13:55:03 +0000 Subject: [PATCH 4/6] . --- rs/crypto/src/lib.rs | 16 +++++++++++++--- rs/interfaces/src/crypto/sign.rs | 6 ------ .../metrics/src/adapter_metrics_registry.rs | 5 +++++ rs/orchestrator/src/orchestrator.rs | 16 ++++++---------- rs/orchestrator/src/registration.rs | 14 ++++---------- 5 files changed, 28 insertions(+), 29 deletions(-) diff --git a/rs/crypto/src/lib.rs b/rs/crypto/src/lib.rs index 70f14d6c65e..14b9e89b1a3 100644 --- a/rs/crypto/src/lib.rs +++ b/rs/crypto/src/lib.rs @@ -58,6 +58,7 @@ pub struct CryptoComponentImpl { vault: Arc, csp: C, registry_client: Arc, + tokio_rt: tokio::runtime::Runtime, // The node id of the node that instantiated this crypto component. node_id: NodeId, logger: ReplicaLogger, @@ -106,6 +107,13 @@ impl CryptoComponentImpl { metrics: Arc, time_source: Option>, ) -> Self { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread".to_string()) + .enable_all() + .build() + .unwrap(); + CryptoComponentImpl { lockable_threshold_sig_data_store: LockableThresholdSigDataStore::new(), csp, @@ -114,6 +122,7 @@ impl CryptoComponentImpl { node_id, logger, metrics, + tokio_rt, time_source: time_source.unwrap_or_else(|| Arc::new(SysTimeSource::new())), } } @@ -188,9 +197,9 @@ impl CryptoComponentImpl { logger: ReplicaLogger, metrics_registry: Option<&MetricsRegistry>, ) -> Self { - let tokio_runtime = tokio::runtime::Builder::new_multi_thread() + let tokio_rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) - .thread_name("Crypto".to_string()) + .thread_name("Crypto-Thread".to_string()) .enable_all() .build() .unwrap(); @@ -198,7 +207,7 @@ impl CryptoComponentImpl { let metrics = Arc::new(CryptoMetrics::new(metrics_registry)); let vault = vault_from_config( config, - Some(tokio_runtime.handle().clone()), + Some(tokio_rt.handle().clone()), new_logger!(&logger), Arc::clone(&metrics), ); @@ -220,6 +229,7 @@ impl CryptoComponentImpl { let crypto_component = CryptoComponentImpl { lockable_threshold_sig_data_store: LockableThresholdSigDataStore::new(), csp, + tokio_rt, vault, registry_client, node_id, diff --git a/rs/interfaces/src/crypto/sign.rs b/rs/interfaces/src/crypto/sign.rs index 28404d8b754..7cd381db0e0 100644 --- a/rs/interfaces/src/crypto/sign.rs +++ b/rs/interfaces/src/crypto/sign.rs @@ -57,12 +57,6 @@ pub trait BasicSigner { /// * `CryptoError::MalformedSecretKey`: if the secret key is malformed. /// * `CryptoError::InvalidArgument`: if the signature algorithm is not /// supported. - /// - /// When called within a Tokio runtime the function should be wrapped inside - /// 'tokio::task::spawn_blocking' when in async function or - /// 'tokio::task::block_in_place' when in sync function (using 'block_in_place' - /// should be very rare event). Otherwise the call panics because the - /// implementation of 'sign_basic' calls 'tokio::runtime::Runtime.block_on'. fn sign_basic( &self, message: &T, diff --git a/rs/monitoring/metrics/src/adapter_metrics_registry.rs b/rs/monitoring/metrics/src/adapter_metrics_registry.rs index 435b1f74e05..34df9160065 100644 --- a/rs/monitoring/metrics/src/adapter_metrics_registry.rs +++ b/rs/monitoring/metrics/src/adapter_metrics_registry.rs @@ -54,6 +54,7 @@ impl AdapterMetricsRegistry { /// Write accesses to here can not be starved by `gather()` calls, due to /// the write-preffering behaviour of the used `tokio::sync::RwLock`. pub fn register(&self, adapter_metrics: AdapterMetrics) -> Result<(), Error> { + /* if self .adapters .blocking_read() @@ -63,11 +64,13 @@ impl AdapterMetricsRegistry { return Err(Error::AlreadyReg); } self.adapters.blocking_write().push(adapter_metrics); + */ Ok(()) } /// Concurrently scrapes metrics from all registered adapters. pub async fn gather(&self, timeout: Duration) -> Vec { + /* join_all( self.adapters .read() @@ -101,5 +104,7 @@ impl AdapterMetricsRegistry { .into_iter() .flatten() .collect() + */ + vec![] } } diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 3bdcdb84354..1595338de94 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -172,16 +172,12 @@ impl Orchestrator { let c_registry = registry.clone(); let crypto_config = config.crypto.clone(); let c_metrics = metrics_registry.clone(); - let crypto = tokio::task::spawn_blocking(move || { - Arc::new(CryptoComponent::new( - &crypto_config, - c_registry.get_registry_client(), - c_log.clone(), - Some(&c_metrics), - )) - }) - .await - .unwrap(); + let crypto = Arc::new(CryptoComponent::new( + &crypto_config, + c_registry.get_registry_client(), + c_log.clone(), + Some(&c_metrics), + )); let slog_logger = logger.inner_logger.root.clone(); let (metrics, _metrics_runtime) = diff --git a/rs/orchestrator/src/registration.rs b/rs/orchestrator/src/registration.rs index afe9b88d497..f074262147a 100644 --- a/rs/orchestrator/src/registration.rs +++ b/rs/orchestrator/src/registration.rs @@ -408,16 +408,10 @@ impl NodeRegistration { let key_handler = self.key_handler.clone(); let sign_cmd = move |msg: &MessageId| { - // Implementation of 'sign_basic' uses Tokio's 'block_on' when issuing a RPC - // to the crypto service. 'block_on' panics when called from async context - // that's why we need to wrap 'sign_basic' in 'block_in_place'. - #[allow(clippy::disallowed_methods)] - tokio::task::block_in_place(|| { - key_handler - .sign_basic(msg, node_id, registry_version) - .map_err(|e| Box::new(e) as Box) - .map(|value| value.get().0) - }) + key_handler + .sign_basic(msg, node_id, registry_version) + .map_err(|e| Box::new(e) as Box) + .map(|value| value.get().0) }; let sender = Sender::Node { From ce59879c7260885908ddb41c43cc12b5f1e46e1f Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 14:46:35 +0000 Subject: [PATCH 5/6] . --- .../tarpc_csp_vault_client.rs | 17 ++++++++++----- rs/crypto/node_key_generation/src/lib.rs | 8 ++++++- rs/crypto/src/lib.rs | 10 ++++----- rs/crypto/tests/integration_test.rs | 21 +++---------------- rs/crypto/tests/webauthn_signatures.rs | 2 +- .../metrics/src/adapter_metrics_registry.rs | 4 ++-- rs/orchestrator/src/orchestrator.rs | 17 +++++++++------ rs/orchestrator/src/registration.rs | 14 +++++++++---- 8 files changed, 51 insertions(+), 42 deletions(-) diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs index 20e7605a1df..9b12d99c9c2 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs @@ -127,7 +127,7 @@ impl RemoteCspVault { pub struct RemoteCspVaultBuilder { socket_path: PathBuf, - rt_handle: tokio::runtime::Handle, + tokio_rt: tokio::runtime::Runtime, max_frame_length: usize, rpc_timeout: Duration, long_rpc_timeout: Duration, @@ -139,9 +139,16 @@ pub struct RemoteCspVaultBuilder { impl RemoteCspVaultBuilder { pub fn new(socket_path: PathBuf, rt_handle: tokio::runtime::Handle) -> Self { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread3".to_string()) + .enable_all() + .build() + .unwrap(); + RemoteCspVaultBuilder { socket_path, - rt_handle, + tokio_rt, max_frame_length: FOUR_GIGA_BYTES, rpc_timeout: DEFAULT_RPC_TIMEOUT, long_rpc_timeout: LONG_RPC_TIMEOUT, @@ -194,7 +201,7 @@ impl RemoteCspVaultBuilder { pub fn build(self) -> Result { let conn = self - .rt_handle + .tokio_rt .block_on(robust_unix_socket::connect( self.socket_path.clone(), new_logger!(&self.logger), @@ -213,7 +220,7 @@ impl RemoteCspVaultBuilder { ), ); let client = { - let _enter_guard = self.rt_handle.enter(); + let _enter_guard = self.tokio_rt.enter(); TarpcCspVaultClient::new(Default::default(), transport).spawn() }; debug!(self.logger, "Instantiated remote CSP vault client"); @@ -221,7 +228,7 @@ impl RemoteCspVaultBuilder { tarpc_csp_client: client, rpc_timeout: self.rpc_timeout, long_rpc_timeout: self.long_rpc_timeout, - tokio_runtime_handle: self.rt_handle, + tokio_runtime_handle: self.tokio_rt.handle().clone(), logger: self.logger, metrics: self.metrics, #[cfg(test)] diff --git a/rs/crypto/node_key_generation/src/lib.rs b/rs/crypto/node_key_generation/src/lib.rs index e2995d018ee..ed3918ddae0 100644 --- a/rs/crypto/node_key_generation/src/lib.rs +++ b/rs/crypto/node_key_generation/src/lib.rs @@ -138,9 +138,15 @@ pub fn generate_node_keys_once( config: &CryptoConfig, tokio_runtime_handle: Option, ) -> Result { + let tokio_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name("Crypto-Thread2".to_string()) + .enable_all() + .build() + .unwrap(); let vault = vault_from_config( config, - tokio_runtime_handle, + Some(tokio_rt.handle().clone()), no_op_logger(), Arc::new(CryptoMetrics::none()), ); diff --git a/rs/crypto/src/lib.rs b/rs/crypto/src/lib.rs index 14b9e89b1a3..2899c415b10 100644 --- a/rs/crypto/src/lib.rs +++ b/rs/crypto/src/lib.rs @@ -108,11 +108,11 @@ impl CryptoComponentImpl { time_source: Option>, ) -> Self { let tokio_rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(4) - .thread_name("Crypto-Thread".to_string()) - .enable_all() - .build() - .unwrap(); + .worker_threads(4) + .thread_name("Crypto-Thread".to_string()) + .enable_all() + .build() + .unwrap(); CryptoComponentImpl { lockable_threshold_sig_data_store: LockableThresholdSigDataStore::new(), diff --git a/rs/crypto/tests/integration_test.rs b/rs/crypto/tests/integration_test.rs index 3601db218a2..ff803c51f60 100644 --- a/rs/crypto/tests/integration_test.rs +++ b/rs/crypto/tests/integration_test.rs @@ -49,12 +49,7 @@ fn should_successfully_construct_crypto_component_with_default_config() { CryptoConfig::run_with_temp_config(|config| { generate_node_keys_once(&config, None).expect("error generating node public keys"); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); }) } @@ -68,12 +63,7 @@ fn should_successfully_construct_crypto_component_with_remote_csp_vault() { generate_node_keys_once(&config, Some(tokio_rt.handle().clone())) .expect("error generating node public keys"); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); } #[test] @@ -85,12 +75,7 @@ fn should_not_construct_crypto_component_if_remote_csp_vault_is_missing() { let config = CryptoConfig::new_with_unix_socket_vault(crypto_root, socket_path, None); let tokio_rt = new_tokio_runtime(); let registry_client = FakeRegistryClient::new(Arc::new(ProtoRegistryDataProvider::new())); - CryptoComponent::new( - &config, - Arc::new(registry_client), - no_op_logger(), - None, - ); + CryptoComponent::new(&config, Arc::new(registry_client), no_op_logger(), None); } // TODO(CRP-430): check/improve the test coverage of SKS checks. diff --git a/rs/crypto/tests/webauthn_signatures.rs b/rs/crypto/tests/webauthn_signatures.rs index 549789a7353..0b9d3a3e914 100644 --- a/rs/crypto/tests/webauthn_signatures.rs +++ b/rs/crypto/tests/webauthn_signatures.rs @@ -158,5 +158,5 @@ fn crypto_component(config: &CryptoConfig) -> CryptoComponent { ); ic_crypto_node_key_generation::generate_node_signing_keys(vault.as_ref()); - CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) + CryptoComponent::new(config, Arc::new(dummy_registry), no_op_logger(), None) } diff --git a/rs/monitoring/metrics/src/adapter_metrics_registry.rs b/rs/monitoring/metrics/src/adapter_metrics_registry.rs index 34df9160065..2a20fccb5d4 100644 --- a/rs/monitoring/metrics/src/adapter_metrics_registry.rs +++ b/rs/monitoring/metrics/src/adapter_metrics_registry.rs @@ -54,7 +54,7 @@ impl AdapterMetricsRegistry { /// Write accesses to here can not be starved by `gather()` calls, due to /// the write-preffering behaviour of the used `tokio::sync::RwLock`. pub fn register(&self, adapter_metrics: AdapterMetrics) -> Result<(), Error> { - /* + /* if self .adapters .blocking_read() @@ -70,7 +70,7 @@ impl AdapterMetricsRegistry { /// Concurrently scrapes metrics from all registered adapters. pub async fn gather(&self, timeout: Duration) -> Vec { - /* + /* join_all( self.adapters .read() diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 1595338de94..6d732310a9b 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -172,12 +172,17 @@ impl Orchestrator { let c_registry = registry.clone(); let crypto_config = config.crypto.clone(); let c_metrics = metrics_registry.clone(); - let crypto = Arc::new(CryptoComponent::new( - &crypto_config, - c_registry.get_registry_client(), - c_log.clone(), - Some(&c_metrics), - )); + let crypto = tokio::task::spawn_blocking(move || { + Arc::new(CryptoComponent::new( + &crypto_config, + Some(tokio::runtime::Handle::current()), + c_registry.get_registry_client(), + c_log.clone(), + Some(&c_metrics), + )) + }) + .await + .unwrap(); let slog_logger = logger.inner_logger.root.clone(); let (metrics, _metrics_runtime) = diff --git a/rs/orchestrator/src/registration.rs b/rs/orchestrator/src/registration.rs index f074262147a..afe9b88d497 100644 --- a/rs/orchestrator/src/registration.rs +++ b/rs/orchestrator/src/registration.rs @@ -408,10 +408,16 @@ impl NodeRegistration { let key_handler = self.key_handler.clone(); let sign_cmd = move |msg: &MessageId| { - key_handler - .sign_basic(msg, node_id, registry_version) - .map_err(|e| Box::new(e) as Box) - .map(|value| value.get().0) + // Implementation of 'sign_basic' uses Tokio's 'block_on' when issuing a RPC + // to the crypto service. 'block_on' panics when called from async context + // that's why we need to wrap 'sign_basic' in 'block_in_place'. + #[allow(clippy::disallowed_methods)] + tokio::task::block_in_place(|| { + key_handler + .sign_basic(msg, node_id, registry_version) + .map_err(|e| Box::new(e) as Box) + .map(|value| value.get().0) + }) }; let sender = Sender::Node { From 52e2a5a4fb4a639376d08a43f3a371ab2bf1be4a Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Thu, 14 Nov 2024 15:46:19 +0000 Subject: [PATCH 6/6] . --- .../crypto_service_provider/src/vault/mod.rs | 2 +- .../tarpc_csp_vault_client.rs | 32 +++++++++++-------- .../src/vault/remote_csp_vault/tests.rs | 1 - rs/crypto/temp_crypto/temp_vault/src/lib.rs | 1 - rs/orchestrator/src/orchestrator.rs | 1 - 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs b/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs index 234c982dde7..d93e6e748ad 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/mod.rs @@ -85,7 +85,7 @@ fn unix_socket_vault( rt_handle.clone(), )); } - let vault = RemoteCspVault::new(socket_path, rt_handle, logger, metrics).unwrap_or_else(|e| { + let vault = RemoteCspVault::new(socket_path, logger, metrics).unwrap_or_else(|e| { panic!( "Could not connect to CspVault at socket {:?}: {:?}", socket_path, e diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs index 9b12d99c9c2..020b08cb45a 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tarpc_csp_vault_client.rs @@ -68,21 +68,19 @@ use ic_logger::replica_logger::no_op_logger; use slog_async::AsyncGuard; /// An implementation of `CspVault`-trait that talks to a remote CSP vault. -#[allow(dead_code)] pub struct RemoteCspVault { tarpc_csp_client: TarpcCspVaultClient, // default timeout for RPC calls that can timeout. rpc_timeout: Duration, // special, long timeout for RPC calls that should not really timeout. long_rpc_timeout: Duration, - tokio_runtime_handle: tokio::runtime::Handle, + tokio_rt: tokio::runtime::Runtime, logger: ReplicaLogger, metrics: Arc, #[cfg(test)] _logger_guard: Option, } -#[allow(dead_code)] #[derive(Clone, Eq, PartialEq, Hash, Debug, Deserialize, Serialize)] pub enum RemoteCspVaultError { TransportError { @@ -92,8 +90,13 @@ pub enum RemoteCspVaultError { } impl RemoteCspVault { - fn tokio_block_on(&self, task: T) -> T::Output { - self.tokio_runtime_handle.block_on(task) + fn tokio_block_on(&self, task: T) -> T::Output where ::Output: std::marker::Send, ::Output: 'static { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.tokio_rt.handle().spawn(async move { + let res = task.await; + tx.send(res); + }); + rx.blocking_recv().unwrap() } } @@ -107,11 +110,10 @@ impl RemoteCspVault { /// otherwise the constructor will fail. pub fn new( socket_path: &Path, - rt_handle: tokio::runtime::Handle, logger: ReplicaLogger, metrics: Arc, ) -> Result { - RemoteCspVaultBuilder::new(socket_path.to_path_buf(), rt_handle) + RemoteCspVaultBuilder::new(socket_path.to_path_buf()) .with_logger(logger) .with_metrics(metrics) .build() @@ -119,9 +121,8 @@ impl RemoteCspVault { pub fn builder( socket_path: PathBuf, - rt_handle: tokio::runtime::Handle, ) -> RemoteCspVaultBuilder { - RemoteCspVaultBuilder::new(socket_path, rt_handle) + RemoteCspVaultBuilder::new(socket_path) } } @@ -138,7 +139,7 @@ pub struct RemoteCspVaultBuilder { } impl RemoteCspVaultBuilder { - pub fn new(socket_path: PathBuf, rt_handle: tokio::runtime::Handle) -> Self { + pub fn new(socket_path: PathBuf) -> Self { let tokio_rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .thread_name("Crypto-Thread3".to_string()) @@ -228,7 +229,7 @@ impl RemoteCspVaultBuilder { tarpc_csp_client: client, rpc_timeout: self.rpc_timeout, long_rpc_timeout: self.long_rpc_timeout, - tokio_runtime_handle: self.tokio_rt.handle().clone(), + tokio_rt: self.tokio_rt, logger: self.logger, metrics: self.metrics, #[cfg(test)] @@ -262,12 +263,15 @@ impl BasicSignatureCspVault for RemoteCspVault { message: Vec, key_id: KeyId, ) -> Result { - self.tokio_block_on(self.tarpc_csp_client.sign( - context_with_timeout(self.rpc_timeout), + let c = self.tarpc_csp_client.clone(); + let t = self.rpc_timeout.clone(); + self.tokio_block_on(async move { + c.sign( + context_with_timeout(t), algorithm_id, ByteBuf::from(message), key_id, - )) + ).await}) .unwrap_or_else(|rpc_error: tarpc::client::RpcError| { Err(CspBasicSignatureError::TransientInternalError { internal_error: rpc_error.to_string(), diff --git a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs index 023a46a8028..2a37d7a5a84 100644 --- a/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs +++ b/rs/crypto/internal/crypto_service_provider/src/vault/remote_csp_vault/tests.rs @@ -329,7 +329,6 @@ mod logging { let socket_path = start_new_remote_csp_vault_server_for_test(tokio_rt.handle()); let csp_vault = RemoteCspVault::new( &socket_path, - tokio_rt.handle().clone(), ReplicaLogger::from(&in_memory_logger), Arc::new(CryptoMetrics::none()), ) diff --git a/rs/crypto/temp_crypto/temp_vault/src/lib.rs b/rs/crypto/temp_crypto/temp_vault/src/lib.rs index 80ca6393ee7..6d4a8440ace 100644 --- a/rs/crypto/temp_crypto/temp_vault/src/lib.rs +++ b/rs/crypto/temp_crypto/temp_vault/src/lib.rs @@ -34,7 +34,6 @@ impl RemoteVaultEnvironment { pub fn new_vault_client_builder(&self) -> RemoteCspVaultBuilder { RemoteCspVault::builder( self.vault_server.vault_socket_path(), - self.vault_client_runtime.handle().clone(), ) } diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 6d732310a9b..3bdcdb84354 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -175,7 +175,6 @@ impl Orchestrator { let crypto = tokio::task::spawn_blocking(move || { Arc::new(CryptoComponent::new( &crypto_config, - Some(tokio::runtime::Handle::current()), c_registry.get_registry_client(), c_log.clone(), Some(&c_metrics),