diff --git a/redis/src/cluster_async/connections_logic.rs b/redis/src/cluster_async/connections_logic.rs index 946d191c9a..a48409dc73 100644 --- a/redis/src/cluster_async/connections_logic.rs +++ b/redis/src/cluster_async/connections_logic.rs @@ -8,9 +8,10 @@ use crate::{ aio::{get_socket_addrs, ConnectionLike}, cluster::get_connection_info, cluster_client::ClusterParams, - RedisResult, + ErrorKind, RedisError, RedisResult, }; +use futures::prelude::*; use futures_time::future::FutureExt; use futures_util::{future::BoxFuture, join}; use tracing::warn; @@ -53,48 +54,406 @@ pub(crate) async fn get_or_create_conn( addr: &str, node: Option>, params: &ClusterParams, -) -> RedisResult<(C, Option)> +) -> RedisResult> where - C: ConnectionLike + Connect + Send + Clone + 'static, + C: ConnectionLike + Send + Clone + Sync + Connect + 'static, { if let Some(node) = node { let mut conn = node.user_connection.await; if let Some(ref ip) = node.ip { if has_dns_changed(addr, ip).await { - return connect_and_check(addr, params.clone(), None).await; + return connect_and_check( + addr, + params.clone(), + None, + RefreshConnectionType::OnlyUserConnection, + None, + ) + .await + .get_node(); } }; match check_connection(&mut conn, params.connection_timeout.into()).await { - Ok(_) => Ok((conn, node.ip)), - Err(_) => connect_and_check(addr, params.clone(), None).await, + Ok(_) => Ok(AsyncClusterNode::new( + async { conn }.boxed().shared(), + None, + node.ip, + )), + Err(_) => connect_and_check( + addr, + params.clone(), + None, + RefreshConnectionType::OnlyUserConnection, + None, + ) + .await + .get_node(), } } else { - connect_and_check(addr, params.clone(), None).await + connect_and_check( + addr, + params.clone(), + None, + RefreshConnectionType::OnlyUserConnection, + None, + ) + .await + .get_node() + } +} + +fn warn_mismatch_ip(addr: &str, new_ip: Option, prev_ip: Option) { + warn!( + "New IP was found for node {:?}: + new connection IP = {:?}, previous connection IP = {:?}", + addr, new_ip, prev_ip + ); +} + +fn warn_management_conn_faild(addr: &str, err: &RedisError) { + warn!( + "Failed to create management connection for node {:?}. Error: {:?}", + addr, err + ); +} + +fn create_async_node( + user_conn: C, + management_conn: Option, + ip: Option, +) -> AsyncClusterNode +where + C: ConnectionLike + Connect + Send + Sync + 'static + Clone, +{ + let user_conn = async { user_conn }.boxed().shared(); + let management_conn = + management_conn.map(|management_conn| async { management_conn }.boxed().shared()); + + AsyncClusterNode::new(user_conn, management_conn, ip) +} + +pub(crate) async fn connect_and_check_all_connections( + addr: &str, + params: ClusterParams, + socket_addr: Option, +) -> ConnectAndCheckResult +where + C: ConnectionLike + Connect + Send + Sync + 'static + Clone, +{ + match future::join( + create_connection(addr, params.clone(), socket_addr), + create_connection(addr, params.clone(), socket_addr), + ) + .await + { + (Ok(conn_1), Ok(conn_2)) => { + // Both connections were successfully established + let (mut user_conn, mut user_ip): (C, Option) = conn_1; + let (mut management_conn, management_ip): (C, Option) = conn_2; + if user_ip == management_ip { + // Set up both connections + if let Err(err) = setup_user_connection(&mut user_conn, params).await { + return err.into(); + } + match setup_management_connection(&mut management_conn).await { + Ok(_) => ConnectAndCheckResult::Success(create_async_node( + user_conn, + Some(management_conn), + user_ip, + )), + Err(err) => ConnectAndCheckResult::ManagementConnectionFailed { + node: create_async_node(user_conn, None, user_ip), + err, + }, + } + } else { + // Use only the connection with the latest IP address + warn_mismatch_ip(addr, user_ip, management_ip); + if has_dns_changed(addr, &user_ip.unwrap()).await { + // The user_ip is incorrect. Use the created `management_conn` for the user connection + user_conn = management_conn; + user_ip = management_ip; + } + match setup_user_connection(&mut user_conn, params).await { + Ok(_) => ConnectAndCheckResult::ManagementConnectionFailed { + node: create_async_node(user_conn, None, user_ip), + err: (ErrorKind::IoError, "mismatched IP").into(), + }, + Err(err) => err.into(), + } + } + } + (Ok(conn), Err(err)) | (Err(err), Ok(conn)) => { + // Only a single connection was successfully established. Use it for the user connection + warn_management_conn_faild(addr, &err); + let (mut user_conn, user_ip): (C, Option) = conn; + match setup_user_connection(&mut user_conn, params).await { + Ok(_) => ConnectAndCheckResult::ManagementConnectionFailed { + node: create_async_node(user_conn, None, user_ip), + err, + }, + Err(err) => err.into(), + } + } + (Err(err_1), Err(err_2)) => { + // Neither of the connections succeeded. + RedisError::from(( + ErrorKind::IoError, + "Failed to refresh both connections", + format!( + "Node: {:?} received errors: `{:?}`, `{:?}`", + addr, err_1, err_2 + ), + )) + .into() + } } } +async fn create_management_connection( + addr: &str, + params: ClusterParams, + socket_addr: Option, +) -> RedisResult<(C, Option)> +where + C: ConnectionLike + Connect + Send + Sync + 'static + Clone, +{ + let res = create_connection(addr, params.clone(), socket_addr).await; + if let Err(err) = res.as_ref() { + warn_management_conn_faild(addr, err); + }; + res +} + +async fn connect_and_check_only_management_conn( + addr: &str, + params: ClusterParams, + socket_addr: Option, + prev_node: AsyncClusterNode, +) -> ConnectAndCheckResult +where + C: ConnectionLike + Connect + Send + Sync + 'static + Clone, +{ + let (mut new_conn, new_ip) = + match create_management_connection(addr, params.clone(), socket_addr).await { + Ok(tuple) => tuple, + Err(err) => { + let node = ClusterNode { + management_connection: None, + ..prev_node + }; + return ConnectAndCheckResult::ManagementConnectionFailed { node, err }; + } + }; + + let (user_connection, mut management_connection) = if new_ip != prev_node.ip { + // An IP mismatch was detected. Attempt to establish a new connection to replace both the management and user connections. + // Use the successfully established connection for the user, then proceed to create a new one for management. + warn_mismatch_ip(addr, new_ip, prev_node.ip); + if let Err(err) = setup_user_connection(&mut new_conn, params.clone()).await { + return ConnectAndCheckResult::Failed(err); + } + let user_connection = async { new_conn }.boxed().shared(); + let management_connection = create_management_connection(addr, params.clone(), socket_addr) + .await + .map(|(conn, _ip)| conn) + .ok(); + (user_connection, management_connection) + } else { + (prev_node.user_connection, Some(new_conn)) + }; + + if let Some(new_conn) = management_connection.as_mut() { + // The new IP matches the existing one. Use this connection for the management connection. + // TODO - should we fail here even if a new user connection was set up? or in that situation, should we return the new connection? + if let Err(err) = setup_management_connection(new_conn).await { + return ConnectAndCheckResult::ManagementConnectionFailed { + node: ClusterNode { + user_connection, + ip: new_ip, + management_connection: None, + }, + err, + }; + }; + }; + ConnectAndCheckResult::Success(ClusterNode { + user_connection, + ip: new_ip, + management_connection: management_connection.map(|conn| async { conn }.boxed().shared()), + }) +} + #[doc(hidden)] +pub enum ConnectAndCheckResult { + Success(AsyncClusterNode), + ManagementConnectionFailed { + node: AsyncClusterNode, + err: RedisError, + }, + Failed(RedisError), +} + +impl ConnectAndCheckResult { + pub fn get_node(self) -> RedisResult> { + match self { + ConnectAndCheckResult::Success(node) => Ok(node), + ConnectAndCheckResult::ManagementConnectionFailed { node, .. } => Ok(node), + ConnectAndCheckResult::Failed(err) => Err(err), + } + } + + pub fn get_error(self) -> Option { + match self { + ConnectAndCheckResult::Success(_) => None, + ConnectAndCheckResult::ManagementConnectionFailed { err, .. } => Some(err), + ConnectAndCheckResult::Failed(err) => Some(err), + } + } +} + +impl From for ConnectAndCheckResult { + fn from(value: RedisError) -> Self { + ConnectAndCheckResult::Failed(value) + } +} + +impl From> for ConnectAndCheckResult { + fn from(value: AsyncClusterNode) -> Self { + ConnectAndCheckResult::Success(value) + } +} + +impl From>> for ConnectAndCheckResult { + fn from(value: RedisResult>) -> Self { + match value { + Ok(value) => value.into(), + Err(err) => err.into(), + } + } +} + +#[doc(hidden)] +#[must_use] pub async fn connect_and_check( + addr: &str, + params: ClusterParams, + socket_addr: Option, + conn_type: RefreshConnectionType, + node: Option>, +) -> ConnectAndCheckResult +where + C: ConnectionLike + Connect + Send + Sync + 'static + Clone, +{ + match conn_type { + RefreshConnectionType::OnlyUserConnection => { + let (user_conn, ip) = + match create_user_connection(addr, params.clone(), socket_addr).await { + Ok(tuple) => tuple, + Err(err) => return err.into(), + }; + if let Some(node) = node { + let mut management_conn = match node.management_connection { + Some(ref conn) => Some(conn.clone().await), + None => None, + }; + if ip != node.ip { + // New IP was found, refresh the management connection too + management_conn = + create_and_setup_management_connection(addr, params, socket_addr) + .await + .ok() + .map(|(conn, _ip): (C, Option)| conn); + } + create_async_node(user_conn, management_conn, ip).into() + } else { + create_async_node(user_conn, None, ip).into() + } + } + RefreshConnectionType::OnlyManagementConnection => { + // Refreshing only the management connection requires the node to exist alongside a user connection. Otherwise, refresh all connections. + match node { + Some(node) => { + connect_and_check_only_management_conn(addr, params, socket_addr, node).await + } + None => connect_and_check_all_connections(addr, params, socket_addr).await, + } + } + RefreshConnectionType::AllConnections => { + connect_and_check_all_connections(addr, params, socket_addr).await + } + } +} + +async fn create_user_connection( + node: &str, + params: ClusterParams, + socket_addr: Option, +) -> RedisResult<(C, Option)> +where + C: ConnectionLike + Connect + Send + 'static, +{ + let (mut conn, ip): (C, Option) = + create_connection(node, params.clone(), socket_addr).await?; + setup_user_connection(&mut conn, params).await?; + Ok((conn, ip)) +} + +async fn create_and_setup_management_connection( node: &str, params: ClusterParams, socket_addr: Option, ) -> RedisResult<(C, Option)> +where + C: ConnectionLike + Connect + Send + 'static, +{ + let (mut conn, ip): (C, Option) = + create_connection(node, params.clone(), socket_addr).await?; + setup_management_connection(&mut conn).await?; + Ok((conn, ip)) +} + +async fn setup_user_connection(conn: &mut C, params: ClusterParams) -> RedisResult<()> where C: ConnectionLike + Connect + Send + 'static, { let read_from_replicas = params.read_from_replicas != crate::cluster_slotmap::ReadFromReplicaStrategy::AlwaysFromPrimary; - let connection_timeout = params.connection_timeout; - let response_timeout = params.response_timeout; - let info = get_connection_info(node, params)?; - let (mut conn, ip) = - C::connect(info, response_timeout, connection_timeout, socket_addr).await?; - check_connection(&mut conn, connection_timeout.into()).await?; + let connection_timeout = params.connection_timeout.into(); + check_connection(conn, connection_timeout).await?; if read_from_replicas { // If READONLY is sent to primary nodes, it will have no effect - crate::cmd("READONLY").query_async(&mut conn).await?; + crate::cmd("READONLY").query_async(conn).await?; } - Ok((conn, ip)) + Ok(()) +} + +#[doc(hidden)] +pub const MANAGEMENT_CONN_NAME: &str = "glide_management_connection"; + +async fn setup_management_connection(conn: &mut C) -> RedisResult<()> +where + C: ConnectionLike + Connect + Send + 'static, +{ + crate::cmd("CLIENT") + .arg(&["SETNAME", MANAGEMENT_CONN_NAME]) + .query_async(conn) + .await?; + Ok(()) +} + +async fn create_connection( + node: &str, + params: ClusterParams, + socket_addr: Option, +) -> RedisResult<(C, Option)> +where + C: ConnectionLike + Connect + Send + 'static, +{ + let connection_timeout = params.connection_timeout; + let response_timeout = params.response_timeout; + let info = get_connection_info(node, params)?; + C::connect(info, response_timeout, connection_timeout, socket_addr).await } /// The function returns None if the checked connection/s are healthy. Otherwise, it returns the type of the unhealthy connection/s. diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index dcd56e6d0e..b8c0f8a33c 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -46,9 +46,8 @@ use std::{ use crate::{ aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection}, cluster::slot_cmd, - cluster_async::{ - connections_container::ClusterNode, - connections_logic::{get_host_and_port_from_addr, get_or_create_conn, ConnectionFuture}, + cluster_async::connections_logic::{ + get_host_and_port_from_addr, get_or_create_conn, ConnectionFuture, RefreshConnectionType, }, cluster_client::{ClusterParams, RetryParams}, cluster_routing::{ @@ -668,18 +667,21 @@ where .map(|(node_addr, socket_addr)| { let params: ClusterParams = params.clone(); async move { - let result = connect_and_check(&node_addr, params, socket_addr).await; + let result = connect_and_check( + &node_addr, + params, + socket_addr, + RefreshConnectionType::OnlyUserConnection, + None, + ) + .await + .get_node(); let node_identifier = if let Some(socket_addr) = socket_addr { socket_addr.to_string() } else { node_addr }; - result.map(|(conn, ip)| { - ( - node_identifier, - ClusterNode::new(async { conn }.boxed().shared(), None, ip), - ) - }) + result.map(|node| (node_identifier, node)) } }) .buffer_unordered(initial_nodes.len()) @@ -753,12 +755,10 @@ where let addr_option = connections_container.address_for_identifier(&identifier); let node_option = connections_container.remove_node(&identifier); if let Some(addr) = addr_option { - let conn = get_or_create_conn(&addr, node_option, cluster_params).await; - if let Ok((conn, ip)) = conn { - connections_container.replace_or_add_connection_for_address( - addr, - ClusterNode::new(async { conn }.boxed().shared(), None, ip), - ); + let node = get_or_create_conn(&addr, node_option, cluster_params).await; + if let Ok(node) = node { + connections_container + .replace_or_add_connection_for_address(addr, node); } } connections_container @@ -996,11 +996,8 @@ where ConnectionsMap(HashMap::with_capacity(nodes_len)), |mut connections, (addr, connection)| async { let conn = get_or_create_conn(addr, connection, &inner.cluster_params).await; - if let Ok((conn, ip)) = conn { - connections.0.insert( - addr.into(), - ClusterNode::new(async { conn }.boxed().shared(), None, ip), - ); + if let Ok(node) = conn { + connections.0.insert(addr.into(), node); } connections }, @@ -1242,20 +1239,23 @@ where let (identifier, mut conn) = match conn_check { ConnectionCheck::Found((identifier, connection)) => (identifier, connection.await), ConnectionCheck::OnlyAddress(addr) => { - match connect_and_check::(&addr, core.cluster_params.clone(), None).await { - Ok((connection, ip)) => { - let connection_clone = connection.clone(); + match connect_and_check::( + &addr, + core.cluster_params.clone(), + None, + RefreshConnectionType::OnlyUserConnection, + None, + ) + .await + .get_node() + { + Ok(node) => { + let connection_clone = node.user_connection.clone().await; let mut connections = core.conn_lock.write().await; - let identifier = connections.replace_or_add_connection_for_address( - addr, - ClusterNode::new( - async move { connection_clone.clone() }.boxed().shared(), - None, - ip, - ), - ); + let identifier = + connections.replace_or_add_connection_for_address(addr, node); drop(connections); - (identifier, connection) + (identifier, connection_clone) } Err(err) => { return Err(err); diff --git a/redis/tests/test_async_cluster_connections_logic.rs b/redis/tests/test_async_cluster_connections_logic.rs index 5199b64b10..00e1f36ae4 100644 --- a/redis/tests/test_async_cluster_connections_logic.rs +++ b/redis/tests/test_async_cluster_connections_logic.rs @@ -15,8 +15,34 @@ use support::{ }; mod test_connect_and_check { + use std::sync::atomic::AtomicUsize; + + use crate::support::{get_mock_connection_handler, ShouldReturnConnectionError}; + use super::*; - use redis::cluster_async::testing::connect_and_check; + use redis::cluster_async::testing::{connect_and_check, ConnectAndCheckResult}; + + fn assert_partial_result( + result: ConnectAndCheckResult, + ) -> (AsyncClusterNode, redis::RedisError) { + match result { + ConnectAndCheckResult::ManagementConnectionFailed { node, err } => (node, err), + ConnectAndCheckResult::Success(_) => panic!("full success"), + ConnectAndCheckResult::Failed(_) => panic!("failed"), + } + } + + fn assert_full_success( + result: ConnectAndCheckResult, + ) -> AsyncClusterNode { + match result { + ConnectAndCheckResult::Success(node) => node, + ConnectAndCheckResult::ManagementConnectionFailed { .. } => { + panic!("partial success") + } + ConnectAndCheckResult::Failed(_) => panic!("failed"), + } + } #[tokio::test] async fn test_connect_and_check_connect_successfully() { @@ -32,19 +58,369 @@ mod test_connect_and_check { }), ); - let expected_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); modify_mock_connection_behavior(name, |behavior| { - behavior.returned_ip_type = ConnectionIPReturnType::Specified(expected_ip) + behavior.returned_ip_type = ConnectionIPReturnType::Specified(ip) }); - let (_conn, ip) = connect_and_check::( + let result = connect_and_check::( &format!("{name}:6379"), ClusterParams::default(), None, + RefreshConnectionType::AllConnections, + None, ) - .await - .unwrap(); - assert_eq!(ip, Some(expected_ip)); + .await; + let node = assert_full_success(result); + assert!(node.management_connection.is_some()); + assert_eq!(node.ip, Some(ip)); + } + + #[tokio::test] + async fn test_connect_and_check_all_connections_one_connection_err_returns_only_user_conn() { + // Test that upon refreshing all connections, if only one of the new connections fail, + // the other successful connection will be used as the user connection, as a partial success. + let name = "all_connections_one_connection_err"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + modify_mock_connection_behavior(name, |behavior| { + // The second connection will fail + behavior.return_connection_err = + ShouldReturnConnectionError::OnOddIdx(AtomicUsize::new(0)) + }); + + let params = ClusterParams::default(); + + let result = connect_and_check::( + &format!("{name}:6379"), + params.clone(), + None, + RefreshConnectionType::AllConnections, + None, + ) + .await; + let (node, _) = assert_partial_result(result); + assert!(node.management_connection.is_none()); + assert_eq!(node.ip, None); + + modify_mock_connection_behavior(name, |behavior| { + // The first connection will fail + behavior.return_connection_err = + ShouldReturnConnectionError::OnOddIdx(AtomicUsize::new(1)); + }); + + let result = connect_and_check::( + &format!("{name}:6379"), + params, + None, + RefreshConnectionType::AllConnections, + None, + ) + .await; + let (node, _) = assert_partial_result(result); + assert!(node.management_connection.is_none()); + assert_eq!(node.ip, None); + } + + #[tokio::test] + async fn test_connect_and_check_all_connections_different_ip_returns_only_user_conn() { + // Test that upon refreshing all connections, if the IPs of the new connections differ, + // the function selects only the connection with the correct IP as the user connection. + let name = "all_connections_different_ip"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Different(AtomicUsize::new(0)); + }); + + // The first connection will have 0.0.0.0 IP + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::AllConnections, + None, + ) + .await; + let (node, _) = assert_partial_result(result); + assert!(node.management_connection.is_none()); + assert_eq!(node.ip, Some(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))); + } + + #[tokio::test] + async fn test_connect_and_check_all_connections_both_conn_error_returns_err() { + // Test that when trying to refresh all connections and both connections fail, the function returns with an error + let name = "both_conn_error_returns_err"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + modify_mock_connection_behavior(name, |behavior| { + behavior.return_connection_err = ShouldReturnConnectionError::Yes + }); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::AllConnections, + None, + ) + .await; + let err = result.get_error().unwrap(); + assert!( + err.to_string() + .contains("Failed to refresh both connections") + && err.kind() == ErrorKind::IoError + ); + } + + #[tokio::test] + async fn test_connect_and_check_only_management_same_ip() { + // Test that when we refresh only the management connection and the new connection returned with the same IP as the user's, + // the returned node contains a new management connection and the user connection remains unchanged + let name = "only_management_same_ip"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Specified(ip) + }); + + let user_conn_id: usize = 1000; + let user_conn = MockConnection { + id: user_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let node = AsyncClusterNode::new(async { user_conn }.boxed().shared(), None, Some(ip)); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::OnlyManagementConnection, + Some(node), + ) + .await; + let node = assert_full_success(result); + assert!(node.management_connection.is_some()); + // Confirm that the user connection remains unchanged + assert_eq!(node.user_connection.await.id, user_conn_id); + } + + #[tokio::test] + async fn test_connect_and_check_only_management_different_ip_reconnects_both_connections() { + // Test that when we try the refresh only the management connection and a new IP is found, both connections are being replaced + let name = "only_management_different_ip"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + let new_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Specified(new_ip) + }); + let user_conn_id: usize = 1000; + let user_conn = MockConnection { + id: user_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let prev_ip = IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)); + let node = AsyncClusterNode::new(async { user_conn }.boxed().shared(), None, Some(prev_ip)); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::OnlyManagementConnection, + Some(node), + ) + .await; + let node = assert_full_success(result); + assert!(node.management_connection.is_some()); + // Confirm that the user connection was changed + assert_ne!(node.user_connection.await.id, user_conn_id); + assert!(node.ip.is_some()); + assert_eq!(node.ip.unwrap().to_string(), *"1.2.3.4"); + assert_ne!(node.ip, Some(prev_ip)); + } + + #[tokio::test] + async fn test_connect_and_check_only_management_connection_err() { + // Test that when we try the refresh only the management connection and it fails, we receive a partial success with the same node. + let name = "only_management_connection_err"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + modify_mock_connection_behavior(name, |behavior| { + behavior.return_connection_err = ShouldReturnConnectionError::Yes; + }); + + let user_conn_id: usize = 1000; + let user_conn = MockConnection { + id: user_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let prev_ip = Some(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1))); + let node = AsyncClusterNode::new(async { user_conn }.boxed().shared(), None, prev_ip); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::OnlyManagementConnection, + Some(node), + ) + .await; + let (node, _) = assert_partial_result(result); + assert!(node.management_connection.is_none()); + // Confirm that the user connection was changed + assert_eq!(node.user_connection.await.id, user_conn_id); + assert_eq!(node.ip, prev_ip); + } + + #[tokio::test] + async fn test_connect_and_check_only_user_connection_same_ip() { + // Test that upon refreshing only the user connection, if the newly created connection share the same IP as the existing management connection, + // the managament connection remains unchanged + let name = "only_user_connection_same_ip"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let prev_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Specified(prev_ip); + }); + let old_user_conn_id: usize = 1000; + let management_conn_id: usize = 2000; + let old_user_conn = MockConnection { + id: old_user_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let management_conn = MockConnection { + id: management_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + + let node = AsyncClusterNode::new( + async { old_user_conn }.boxed().shared(), + Some(async { management_conn }.boxed().shared()), + Some(prev_ip), + ); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::OnlyUserConnection, + Some(node), + ) + .await; + let node = assert_full_success(result); + // Confirm that a new user connection was created + assert_ne!(node.user_connection.await.id, old_user_conn_id); + // Confirm that the management connection remains unchanged + assert_eq!( + node.management_connection.unwrap().await.id, + management_conn_id + ); + } + + #[tokio::test] + async fn test_connect_and_check_only_user_connection_new_ip_refreshing_both_connections() { + // Test that upon refreshing only the user connection, if the newly created connection has a different IP from the existing one, + // the managament connection is being refreshed too + let name = "only_user_connection_new_ip"; + + let _handle = MockConnectionBehavior::register_new( + name, + Arc::new(|cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Different(AtomicUsize::new(0)); + }); + + let old_user_conn_id: usize = 1000; + let management_conn_id: usize = 2000; + let old_user_conn = MockConnection { + id: old_user_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let management_conn = MockConnection { + id: management_conn_id, + handler: get_mock_connection_handler(name), + port: 6379, + }; + let prev_ip = Some(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1))); + let node = AsyncClusterNode::new( + async { old_user_conn }.boxed().shared(), + Some(async { management_conn }.boxed().shared()), + prev_ip, + ); + + let result = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + RefreshConnectionType::OnlyUserConnection, + Some(node), + ) + .await; + let node = assert_full_success(result); + // Confirm that a new user connection was created + assert_ne!(node.user_connection.await.id, old_user_conn_id); + // Confirm that a new management connection was created + assert_ne!( + node.management_connection.unwrap().await.id, + management_conn_id + ); } }