From d8083d372b969cc1b8b4e0841cac2a1abf027692 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Thu, 25 Jan 2024 20:04:35 +0000 Subject: [PATCH] Add `check_node_connections` function. --- .../cluster_async/connections_container.rs | 4 +- redis/src/cluster_async/connections_logic.rs | 87 +++++- redis/src/cluster_async/mod.rs | 3 +- redis/tests/support/mock_cluster.rs | 45 ++- .../test_async_cluster_connections_logic.rs | 256 ++++++++++++++++-- redis/tests/test_cluster_async.rs | 4 +- 6 files changed, 352 insertions(+), 47 deletions(-) diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index f207864ef..acd69c1e8 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -11,7 +11,7 @@ use crate::cluster_topology::TopologyHash; type IdentifierType = ArcStr; #[derive(Clone, Eq, PartialEq, Debug)] -pub(crate) struct ClusterNode { +pub struct ClusterNode { pub user_connection: Connection, pub management_connection: Option, pub ip: Option, @@ -21,7 +21,7 @@ impl ClusterNode where Connection: Clone, { - pub(crate) fn new( + pub fn new( user_connection: Connection, management_connection: Option, ip: Option, diff --git a/redis/src/cluster_async/connections_logic.rs b/redis/src/cluster_async/connections_logic.rs index 490ca30a4..48dd6f317 100644 --- a/redis/src/cluster_async/connections_logic.rs +++ b/redis/src/cluster_async/connections_logic.rs @@ -1,3 +1,8 @@ +use std::{ + iter::Iterator, + net::{IpAddr, SocketAddr}, +}; + use super::{AsyncClusterNode, Connect}; use crate::{ aio::{get_socket_addrs, ConnectionLike}, @@ -5,18 +10,28 @@ use crate::{ cluster_client::ClusterParams, RedisResult, }; + use futures_time::future::FutureExt; -use std::{ - iter::Iterator, - net::{IpAddr, SocketAddr}, -}; +use futures_util::join; +use tracing::warn; + +#[doc(hidden)] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum RefreshConnectionType { + // Refresh only user connections + OnlyUserConnection, + // Refresh only management connections + OnlyManagementConnection, + // Refresh all connections: both management and user connections. + AllConnections, +} /// Return true if a DNS change is detected, otherwise return false. /// This function takes a node's address, examines if its host has encountered a DNS change, where the node's endpoint now leads to a different IP address. /// If no socket addresses are discovered for the node's host address, or if it's a non-DNS address, it returns false. /// In case the node's host address resolves to socket addresses and none of them match the current connection's IP, /// a DNS change is detected, so the current connection isn't valid anymore and a new connection should be made. -async fn is_dns_changed(addr: &str, curr_ip: &IpAddr) -> bool { +async fn has_dns_changed(addr: &str, curr_ip: &IpAddr) -> bool { let (host, port) = match get_host_and_port_from_addr(addr) { Some((host, port)) => (host, port), None => return false, @@ -40,7 +55,7 @@ where if let Some(node) = node { let mut conn = node.user_connection.await; if let Some(ref ip) = node.ip { - if is_dns_changed(addr, ip).await { + if has_dns_changed(addr, ip).await { return connect_and_check(addr, params.clone(), None).await; } }; @@ -77,6 +92,66 @@ where Ok((conn, ip)) } +/// The function returns None if the checked connection/s are healthy. Otherwise, it returns the type of the unhealthy connection/s. +#[allow(dead_code)] +#[doc(hidden)] +pub async fn check_node_connections( + node: &AsyncClusterNode, + params: &ClusterParams, + conn_type: RefreshConnectionType, + address: &str, +) -> Option +where + C: ConnectionLike + Send + 'static + Clone, +{ + let timeout = params.connection_timeout.into(); + let (check_mgmt_connection, check_user_connection) = match conn_type { + RefreshConnectionType::OnlyUserConnection => (false, true), + RefreshConnectionType::OnlyManagementConnection => (true, false), + RefreshConnectionType::AllConnections => (true, true), + }; + let check = |conn, timeout, conn_type| async move { + match check_connection(&mut conn.await, timeout).await { + Ok(_) => false, + Err(err) => { + warn!( + "The {} connection for node {} is unhealthy. Error: {:?}", + conn_type, address, err + ); + true + } + } + }; + let (mgmt_failed, user_failed) = join!( + async { + if !check_mgmt_connection { + return false; + } + match node.management_connection.clone() { + Some(conn) => check(conn, timeout, "management").await, + None => { + warn!("The management connection for node {} isn't set", address); + true + } + } + }, + async { + if !check_user_connection { + return false; + } + let conn = node.user_connection.clone(); + check(conn, timeout, "user").await + }, + ); + + match (mgmt_failed, user_failed) { + (true, true) => Some(RefreshConnectionType::AllConnections), + (true, false) => Some(RefreshConnectionType::OnlyManagementConnection), + (false, true) => Some(RefreshConnectionType::OnlyUserConnection), + (false, false) => None, + } +} + async fn check_connection(conn: &mut C, timeout: futures_time::time::Duration) -> RedisResult<()> where C: ConnectionLike + Send + 'static, diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index e2aee4d77..4467d6673 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -200,7 +200,8 @@ where } type ConnectionFuture = future::Shared>; -type AsyncClusterNode = ClusterNode>; +/// Cluster node for async connections +pub type AsyncClusterNode = ClusterNode>; type ConnectionMap = connections_container::ConnectionsMap>; type ConnectionsContainer = self::connections_container::ConnectionsContainer>; diff --git a/redis/tests/support/mock_cluster.rs b/redis/tests/support/mock_cluster.rs index 77cc64c46..81e95cbce 100644 --- a/redis/tests/support/mock_cluster.rs +++ b/redis/tests/support/mock_cluster.rs @@ -39,7 +39,7 @@ pub struct MockConnectionBehavior { } impl MockConnectionBehavior { - pub fn new(id: &str, handler: Handler) -> Self { + fn new(id: &str, handler: Handler) -> Self { Self { id: id.to_string(), handler, @@ -49,23 +49,20 @@ impl MockConnectionBehavior { } } + #[must_use] + pub fn register_new(id: &str, handler: Handler) -> RemoveHandler { + get_behaviors().insert(id.to_string(), Self::new(id, handler)); + RemoveHandler(vec![id.to_string()]) + } + fn get_handler(&self) -> Handler { self.handler.clone() } } -pub fn add_new_mock_connection_behavior(name: &str, handler: Handler) { - MOCK_CONN_BEHAVIORS - .write() - .unwrap() - .insert(name.to_string(), MockConnectionBehavior::new(name, handler)); -} - pub fn modify_mock_connection_behavior(name: &str, func: impl FnOnce(&mut MockConnectionBehavior)) { func( - MOCK_CONN_BEHAVIORS - .write() - .unwrap() + get_behaviors() .get_mut(name) .expect("Handler `{name}` was not installed"), ); @@ -80,9 +77,26 @@ pub fn get_mock_connection_handler(name: &str) -> Handler { .get_handler() } +pub fn get_mock_connection(name: &str, id: usize) -> MockConnection { + get_mock_connection_with_port(name, id, 6379) +} + +pub fn get_mock_connection_with_port(name: &str, id: usize, port: u16) -> MockConnection { + MockConnection { + id, + handler: get_mock_connection_handler(name), + port, + } +} + static MOCK_CONN_BEHAVIORS: Lazy>> = Lazy::new(Default::default); +fn get_behaviors() -> std::sync::RwLockWriteGuard<'static, HashMap> +{ + MOCK_CONN_BEHAVIORS.write().unwrap() +} + #[derive(Default)] pub enum ConnectionIPReturnType { /// New connections' IP will be returned as None @@ -410,7 +424,7 @@ pub struct RemoveHandler(Vec); impl Drop for RemoveHandler { fn drop(&mut self) { for id in &self.0 { - MOCK_CONN_BEHAVIORS.write().unwrap().remove(id); + get_behaviors().remove(id); } } } @@ -440,7 +454,10 @@ impl MockEnv { .unwrap(); let id = id.to_string(); - add_new_mock_connection_behavior(&id, Arc::new(move |cmd, port| handler(cmd, port))); + let handler = MockConnectionBehavior::register_new( + &id, + Arc::new(move |cmd, port| handler(cmd, port)), + ); let client = client_builder.build().unwrap(); let connection = client.get_generic_connection().unwrap(); #[cfg(feature = "cluster-async")] @@ -454,7 +471,7 @@ impl MockEnv { connection, #[cfg(feature = "cluster-async")] async_connection, - handler: RemoveHandler(vec![id]), + handler, } } } diff --git a/redis/tests/test_async_cluster_connections_logic.rs b/redis/tests/test_async_cluster_connections_logic.rs index 99d519c7e..a78791298 100644 --- a/redis/tests/test_async_cluster_connections_logic.rs +++ b/redis/tests/test_async_cluster_connections_logic.rs @@ -1,39 +1,251 @@ #![cfg(feature = "cluster-async")] mod support; -mod async_cluster_connection_logic { - use super::*; - use redis::{cluster_async::connections_logic::connect_and_check, ClusterParams}; - use std::net::{IpAddr, Ipv4Addr}; - use support::{respond_startup, ConnectionIPReturnType, MockEnv}; +use futures_util::FutureExt; +use redis::{ + cluster_async::{connections_logic::RefreshConnectionType, AsyncClusterNode}, + ClusterParams, ErrorKind, +}; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::Arc; +use support::{ + get_mock_connection, get_mock_connection_with_port, modify_mock_connection_behavior, + respond_startup, ConnectionIPReturnType, MockConnection, MockConnectionBehavior, +}; - use crate::support::{modify_mock_connection_behavior, MockConnection}; +mod test_connect_and_check { + use super::*; + use redis::cluster_async::connections_logic::connect_and_check; - #[test] - fn test_connect_and_check_connect_successfully() { + #[tokio::test] + async fn test_connect_and_check_connect_successfully() { // Test that upon refreshing all connections, if both connections were successful, // the returned node contains both user and management connection let name = "test_connect_and_check_connect_successfully"; - let mock_env = MockEnv::new(name, move |cmd, _| { - respond_startup(name, cmd)?; - Ok(()) - }); + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); let expected_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); modify_mock_connection_behavior(name, |behavior| { behavior.returned_ip_type = ConnectionIPReturnType::Specified(expected_ip) }); - mock_env.runtime.block_on(async { - let (_conn, ip) = connect_and_check::( - &format!("{name}:6379"), - ClusterParams::default(), - None, - ) - .await - .unwrap(); - assert_eq!(ip, Some(expected_ip)); - }) + let (_conn, ip) = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + ) + .await + .unwrap(); + assert_eq!(ip, Some(expected_ip)); + } +} + +mod test_check_node_connections { + + use super::*; + use redis::cluster_async::connections_logic::check_node_connections; + + #[tokio::test] + async fn test_check_node_connections_find_no_problem() { + // Test that upon when checking both connections, if both connections are healthy no issue is returned. + let name = "test_check_node_connections_find_no_problem"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection(name, 1) }.boxed().shared(), + Some(async { get_mock_connection(name, 2) }.boxed().shared()), + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::AllConnections, + name, + ) + .await; + assert_eq!(response, None); + } + + #[tokio::test] + async fn test_check_node_connections_find_management_connection_issue() { + // Test that upon checking both connections, if management connection isn't responding to pings, `OnlyManagementConnection` will be returned. + let name = "test_check_node_connections_find_management_connection_issue"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, port| { + if port == 6381 { + return Err(Err((ErrorKind::ClientError, "some error").into())); + } + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection_with_port(name, 1, 6380) } + .boxed() + .shared(), + Some( + async { get_mock_connection_with_port(name, 2, 6381) } + .boxed() + .shared(), + ), + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::AllConnections, + name, + ) + .await; + assert_eq!( + response, + Some(RefreshConnectionType::OnlyManagementConnection) + ); + } + + #[tokio::test] + async fn test_check_node_connections_find_missing_management_connection() { + // Test that upon checking both connections, if management connection isn't present, `OnlyManagementConnection` will be returned. + let name = "test_check_node_connections_find_missing_management_connection"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection(name, 1) }.boxed().shared(), + None, + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::AllConnections, + name, + ) + .await; + assert_eq!( + response, + Some(RefreshConnectionType::OnlyManagementConnection) + ); + } + + #[tokio::test] + async fn test_check_node_connections_find_both_connections_issue() { + // Test that upon checking both connections, if management connection isn't responding to pings, `OnlyManagementConnection` will be returned. + let name = "test_check_node_connections_find_both_connections_issue"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|_, _| Err(Err((ErrorKind::ClientError, "some error").into()))), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection_with_port(name, 1, 6380) } + .boxed() + .shared(), + Some( + async { get_mock_connection_with_port(name, 2, 6381) } + .boxed() + .shared(), + ), + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::AllConnections, + name, + ) + .await; + assert_eq!(response, Some(RefreshConnectionType::AllConnections)); + } + + #[tokio::test] + async fn test_check_node_connections_find_user_connection_issue() { + // Test that upon checking both connections, if user connection isn't responding to pings, `OnlyUserConnection` will be returned. + let name = "test_check_node_connections_find_user_connection_issue"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, port| { + if port == 6380 { + return Err(Err((ErrorKind::ClientError, "some error").into())); + } + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection_with_port(name, 1, 6380) } + .boxed() + .shared(), + Some( + async { get_mock_connection_with_port(name, 2, 6381) } + .boxed() + .shared(), + ), + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::AllConnections, + name, + ) + .await; + assert_eq!(response, Some(RefreshConnectionType::OnlyUserConnection)); + } + + #[tokio::test] + async fn test_check_node_connections_ignore_missing_management_connection_when_refreshing_user() + { + // Test that upon checking only user connection, issues with management connection won't affect the result. + let name = + "test_check_node_connections_ignore_management_connection_issue_when_refreshing_user"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let node = AsyncClusterNode::new( + async { get_mock_connection(name, 1) }.boxed().shared(), + None, + None, + ); + let response = check_node_connections::( + &node, + &ClusterParams::default(), + RefreshConnectionType::OnlyUserConnection, + name, + ) + .await; + assert_eq!(response, None); } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 38246811c..e783dd424 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -591,8 +591,8 @@ fn test_cluster_async_cannot_connect_to_server_with_unknown_host_name() { } }; let client_builder = ClusterClient::builder(vec![&*format!("redis://{name}")]); - let client = client_builder.build().unwrap(); - add_new_mock_connection_behavior(name, Arc::new(handler)); + let client: ClusterClient = client_builder.build().unwrap(); + let _handler = MockConnectionBehavior::register_new(name, Arc::new(handler)); let connection = client.get_generic_connection::(); assert!(connection.is_err()); let err = connection.err().unwrap();