diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 71b03d59e..569aaf1d4 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -162,10 +162,6 @@ required-features = ["json", "serde/derive"] name = "test_cluster_async" required-features = ["cluster-async"] -[[test]] -name = "test_async_cluster_connections_logic" -required-features = ["cluster-async"] - [[bench]] name = "bench_basic" harness = false diff --git a/redis/src/cluster_async/connections_logic.rs b/redis/src/cluster_async/connections_logic.rs index 490ca30a4..5d17a05a9 100644 --- a/redis/src/cluster_async/connections_logic.rs +++ b/redis/src/cluster_async/connections_logic.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "cluster-async")] use super::{AsyncClusterNode, Connect}; use crate::{ aio::{get_socket_addrs, ConnectionLike}, @@ -53,8 +54,7 @@ where } } -#[doc(hidden)] -pub async fn connect_and_check( +pub(crate) async fn connect_and_check( node: &str, params: ClusterParams, socket_addr: Option, @@ -100,3 +100,48 @@ pub(crate) fn get_host_and_port_from_addr(addr: &str) -> Option<(&str, u16)> { let port = parts.get(1).unwrap(); port.parse::().ok().map(|port| (*host, port)) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + cluster_async::connections_logic::connect_and_check, + testing::mock_connection::{ + modify_mock_connection_behavior, respond_startup, ConnectionIPReturnType, + MockConnection, MockConnectionBehavior, + }, + }; + use std::{ + net::{IpAddr, Ipv4Addr}, + sync::Arc, + }; + + #[tokio::test] + async fn test_connect_and_check_connect_successfully() { + // Test that upon refreshing all connections, if both connections were successful, + // the returned node contains both user and management connection + let name = "test_connect_and_check_connect_successfully"; + + let _handler = MockConnectionBehavior::register_new( + name, + Arc::new(move |cmd, _| { + respond_startup(name, cmd)?; + Ok(()) + }), + ); + + let expected_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + modify_mock_connection_behavior(name, |behavior| { + behavior.returned_ip_type = ConnectionIPReturnType::Specified(expected_ip) + }); + + let (_conn, ip) = connect_and_check::( + &format!("{name}:6379"), + ClusterParams::default(), + None, + ) + .await + .unwrap(); + assert_eq!(ip, Some(expected_ip)); + } +} diff --git a/redis/src/cluster_client.rs b/redis/src/cluster_client.rs index ed8285963..1e6e5df6f 100644 --- a/redis/src/cluster_client.rs +++ b/redis/src/cluster_client.rs @@ -77,8 +77,7 @@ impl RetryParams { /// Redis cluster specific parameters. #[derive(Default, Clone)] -#[doc(hidden)] -pub struct ClusterParams { +pub(crate) struct ClusterParams { pub(crate) password: Option, pub(crate) username: Option, pub(crate) read_from_replicas: ReadFromReplicaStrategy, diff --git a/redis/src/lib.rs b/redis/src/lib.rs index 51a3819b9..1a1d2ba8c 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -448,9 +448,8 @@ mod cluster_slotmap; #[cfg(feature = "cluster")] mod cluster_client; -// for testing purposes -#[cfg(feature = "cluster")] -pub use crate::cluster_client::ClusterParams; +/// Used exclusively for testing. This won't be stable, don't take a dependency on this module. +pub mod testing; #[cfg(feature = "cluster")] mod cluster_pipeline; diff --git a/redis/src/testing/mock_connection.rs b/redis/src/testing/mock_connection.rs new file mode 100644 index 000000000..1ee591cb9 --- /dev/null +++ b/redis/src/testing/mock_connection.rs @@ -0,0 +1,321 @@ +#![cfg(any(feature = "cluster", feature = "cluster_async"))] +#![allow(missing_docs)] +use crate::{ + aio, cluster, ErrorKind, FromRedisValue, IntoConnectionInfo, PushKind, RedisError, RedisResult, + Value, +}; + +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::OnceLock, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, RwLock, + }, + time::Duration, +}; + +#[cfg(feature = "cluster-async")] +use crate::{cluster_async, RedisFuture}; + +#[cfg(feature = "cluster-async")] +use futures::future; + +pub type Handler = Arc Result<(), RedisResult> + Send + Sync>; + +pub fn contains_slice(xs: &[u8], ys: &[u8]) -> bool { + for i in 0..xs.len() { + if xs[i..].starts_with(ys) { + return true; + } + } + false +} + +pub fn respond_startup(name: &str, cmd: &[u8]) -> Result<(), RedisResult> { + if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") { + Err(Ok(Value::SimpleString("OK".into()))) + } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + Err(Ok(Value::Array(vec![Value::Array(vec![ + Value::Int(0), + Value::Int(16383), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), + Value::Int(6379), + ]), + ])]))) + } else if contains_slice(cmd, b"READONLY") { + Err(Ok(Value::SimpleString("OK".into()))) + } else { + Ok(()) + } +} + +pub struct RemoveHandler(String); + +impl Drop for RemoveHandler { + fn drop(&mut self) { + MockConnectionBehavior::remove_mock(&self.0); + } +} + +pub struct MockConnectionBehavior { + pub id: String, + pub handler: Handler, + pub connection_id_provider: AtomicUsize, + pub returned_ip_type: ConnectionIPReturnType, + pub return_connection_err: ShouldReturnConnectionError, +} + +impl MockConnectionBehavior { + fn new(id: &str, handler: Handler) -> Self { + Self { + id: id.to_string(), + handler, + connection_id_provider: AtomicUsize::new(0), + returned_ip_type: ConnectionIPReturnType::default(), + return_connection_err: ShouldReturnConnectionError::default(), + } + } + + #[must_use] + pub fn register_new(id: &str, handler: Handler) -> RemoveHandler { + get_behaviors().insert(id.to_string(), Self::new(id, handler)); + RemoveHandler(id.to_string()) + } + + pub fn remove_mock(id: &str) { + get_behaviors().remove(id); + } + + fn get_handler(&self) -> Handler { + self.handler.clone() + } +} + +pub fn modify_mock_connection_behavior(name: &str, func: impl FnOnce(&mut MockConnectionBehavior)) { + func( + get_behaviors() + .get_mut(name) + .expect("Handler `{name}` was not installed"), + ); +} + +pub fn get_mock_connection_handler(name: &str) -> Handler { + get_behaviors() + .get(name) + .expect("Handler `{name}` was not installed") + .get_handler() +} + +static MOCK_CONN_BEHAVIORS: OnceLock>> = + OnceLock::new(); + +fn get_behaviors() -> std::sync::RwLockWriteGuard<'static, HashMap> +{ + MOCK_CONN_BEHAVIORS + .get_or_init(Default::default) + .write() + .unwrap() +} + +#[derive(Default)] +pub enum ConnectionIPReturnType { + /// New connections' IP will be returned as None + #[default] + None, + /// Creates connections with the specified IP + Specified(IpAddr), + /// Each new connection will be created with a different IP based on the passed atomic integer + Different(AtomicUsize), +} + +#[derive(Default)] +pub enum ShouldReturnConnectionError { + /// Don't return a connection error + #[default] + No, + /// Always return a connection error + Yes, + /// Return connection error when the internal index is an odd number + OnOddIdx(AtomicUsize), +} + +#[derive(Clone)] +pub struct MockConnection { + pub id: usize, + pub handler: Handler, + pub port: u16, +} + +#[cfg(feature = "cluster-async")] +impl cluster_async::Connect for MockConnection { + fn connect<'a, T>( + info: T, + _response_timeout: Duration, + _connection_timeout: Duration, + _socket_addr: Option, + ) -> RedisFuture<'a, (Self, Option)> + where + T: IntoConnectionInfo + Send + 'a, + { + let info = info.into_connection_info().unwrap(); + + let (name, port) = match &info.addr { + crate::ConnectionAddr::Tcp(addr, port) => (addr, *port), + _ => unreachable!(), + }; + let binding = get_behaviors(); + let conn_utils = binding + .get(name) + .unwrap_or_else(|| panic!("MockConnectionUtils for `{name}` were not installed")); + let conn_err = Box::pin(future::err(RedisError::from(std::io::Error::new( + std::io::ErrorKind::ConnectionReset, + "mock-io-error", + )))); + match &conn_utils.return_connection_err { + ShouldReturnConnectionError::No => {} + ShouldReturnConnectionError::Yes => return conn_err, + ShouldReturnConnectionError::OnOddIdx(curr_idx) => { + if curr_idx.fetch_add(1, Ordering::SeqCst) % 2 != 0 { + // raise an error on each odd number + return conn_err; + } + } + } + + let ip = match &conn_utils.returned_ip_type { + ConnectionIPReturnType::Specified(ip) => Some(*ip), + ConnectionIPReturnType::Different(ip_getter) => { + let first_ip_num = ip_getter.fetch_add(1, Ordering::SeqCst) as u8; + Some(IpAddr::V4(Ipv4Addr::new(first_ip_num, 0, 0, 0))) + } + ConnectionIPReturnType::None => None, + }; + + Box::pin(future::ok(( + MockConnection { + id: conn_utils + .connection_id_provider + .fetch_add(1, Ordering::SeqCst), + handler: conn_utils.get_handler(), + port, + }, + ip, + ))) + } +} + +impl cluster::Connect for MockConnection { + fn connect<'a, T>(info: T, _timeout: Option) -> RedisResult + where + T: IntoConnectionInfo, + { + let info = info.into_connection_info().unwrap(); + + let (name, port) = match &info.addr { + crate::ConnectionAddr::Tcp(addr, port) => (addr, *port), + _ => unreachable!(), + }; + let binding = get_behaviors(); + let conn_utils = binding + .get(name) + .unwrap_or_else(|| panic!("MockConnectionUtils for `{name}` were not installed")); + Ok(MockConnection { + id: conn_utils + .connection_id_provider + .fetch_add(1, Ordering::SeqCst), + handler: conn_utils.get_handler(), + port, + }) + } + + fn send_packed_command(&mut self, _cmd: &[u8]) -> RedisResult<()> { + Ok(()) + } + + fn set_write_timeout(&self, _dur: Option) -> RedisResult<()> { + Ok(()) + } + + fn set_read_timeout(&self, _dur: Option) -> RedisResult<()> { + Ok(()) + } + + fn recv_response(&mut self) -> RedisResult { + Ok(Value::Nil) + } +} + +#[cfg(feature = "cluster-async")] +impl aio::ConnectionLike for MockConnection { + fn req_packed_command<'a>(&'a mut self, cmd: &'a crate::Cmd) -> RedisFuture<'a, Value> { + Box::pin(future::ready( + (self.handler)(&cmd.get_packed_command(), self.port) + .expect_err("Handler did not specify a response"), + )) + } + + fn req_packed_commands<'a>( + &'a mut self, + _pipeline: &'a crate::Pipeline, + _offset: usize, + _count: usize, + ) -> RedisFuture<'a, Vec> { + Box::pin(future::ok(vec![])) + } + + fn get_db(&self) -> i64 { + 0 + } +} + +impl crate::ConnectionLike for MockConnection { + fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult { + (self.handler)(cmd, self.port).expect_err("Handler did not specify a response") + } + + fn req_packed_commands( + &mut self, + cmd: &[u8], + offset: usize, + _count: usize, + ) -> RedisResult> { + let res = (self.handler)(cmd, self.port).expect_err("Handler did not specify a response"); + match res { + Err(err) => Err(err), + Ok(res) => { + if let Value::Array(results) = res { + match results.into_iter().nth(offset) { + Some(Value::Array(res)) => Ok(res), + _ => Err((ErrorKind::ResponseError, "non-array response").into()), + } + } else { + Err(( + ErrorKind::ResponseError, + "non-array response", + String::from_redis_value(&res).unwrap(), + ) + .into()) + } + } + } + } + + fn get_db(&self) -> i64 { + 0 + } + + fn check_connection(&mut self) -> bool { + true + } + + fn is_open(&self) -> bool { + true + } + + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } +} diff --git a/redis/src/testing/mod.rs b/redis/src/testing/mod.rs new file mode 100644 index 000000000..6c0f50c60 --- /dev/null +++ b/redis/src/testing/mod.rs @@ -0,0 +1 @@ +pub mod mock_connection; diff --git a/redis/tests/support/mock_cluster.rs b/redis/tests/support/mock_cluster.rs index 0f782662f..a18b8fe94 100644 --- a/redis/tests/support/mock_cluster.rs +++ b/redis/tests/support/mock_cluster.rs @@ -1,244 +1,17 @@ +#![cfg(test)] use redis::{ - cluster::{self, ClusterClient, ClusterClientBuilder}, - ErrorKind, FromRedisValue, RedisError, -}; - -use std::{ - collections::HashMap, - net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, RwLock, + cluster::{ClusterClient, ClusterClientBuilder}, + testing::mock_connection::{ + contains_slice, MockConnection, MockConnectionBehavior, RemoveHandler, }, - time::Duration, -}; - -use { - once_cell::sync::Lazy, - redis::{IntoConnectionInfo, RedisResult, Value}, + RedisResult, Value, }; -#[cfg(feature = "cluster-async")] -use redis::{aio, cluster_async, RedisFuture}; - -#[cfg(feature = "cluster-async")] -use futures::future; +use std::sync::Arc; -use redis::PushKind; #[cfg(feature = "cluster-async")] use tokio::runtime::Runtime; -type Handler = Arc Result<(), RedisResult> + Send + Sync>; - -pub struct MockConnectionBehavior { - pub id: String, - pub handler: Handler, - pub connection_id_provider: AtomicUsize, - pub returned_ip_type: ConnectionIPReturnType, - pub return_connection_err: ShouldReturnConnectionError, -} - -impl MockConnectionBehavior { - pub fn new(id: &str, handler: Handler) -> Self { - Self { - id: id.to_string(), - handler, - connection_id_provider: AtomicUsize::new(0), - returned_ip_type: ConnectionIPReturnType::default(), - return_connection_err: ShouldReturnConnectionError::default(), - } - } - - fn get_handler(&self) -> Handler { - self.handler.clone() - } -} - -pub fn add_new_mock_connection_behavior(name: &str, handler: Handler) { - MOCK_CONN_BEHAVIORS - .write() - .unwrap() - .insert(name.to_string(), MockConnectionBehavior::new(name, handler)); -} - -pub fn modify_mock_connection_behavior(name: &str, func: impl FnOnce(&mut MockConnectionBehavior)) { - func( - MOCK_CONN_BEHAVIORS - .write() - .unwrap() - .get_mut(name) - .expect("Handler `{name}` was not installed"), - ); -} - -pub fn get_mock_connection_handler(name: &str) -> Handler { - MOCK_CONN_BEHAVIORS - .read() - .unwrap() - .get(name) - .expect("Handler `{name}` was not installed") - .get_handler() -} - -static MOCK_CONN_BEHAVIORS: Lazy>> = - Lazy::new(Default::default); - -#[derive(Default)] -pub enum ConnectionIPReturnType { - /// New connections' IP will be returned as None - #[default] - None, - /// Creates connections with the specified IP - Specified(IpAddr), - /// Each new connection will be created with a different IP based on the passed atomic integer - Different(AtomicUsize), -} - -#[derive(Default)] -pub enum ShouldReturnConnectionError { - /// Don't return a connection error - #[default] - No, - /// Always return a connection error - Yes, - /// Return connection error when the internal index is an odd number - OnOddIdx(AtomicUsize), -} - -#[derive(Clone)] -pub struct MockConnection { - pub id: usize, - pub handler: Handler, - pub port: u16, -} - -#[cfg(feature = "cluster-async")] -impl cluster_async::Connect for MockConnection { - fn connect<'a, T>( - info: T, - _response_timeout: Duration, - _connection_timeout: Duration, - _socket_addr: Option, - ) -> RedisFuture<'a, (Self, Option)> - where - T: IntoConnectionInfo + Send + 'a, - { - let info = info.into_connection_info().unwrap(); - - let (name, port) = match &info.addr { - redis::ConnectionAddr::Tcp(addr, port) => (addr, *port), - _ => unreachable!(), - }; - let binding = MOCK_CONN_BEHAVIORS.read().unwrap(); - let conn_utils = binding - .get(name) - .unwrap_or_else(|| panic!("MockConnectionUtils for `{name}` were not installed")); - let conn_err = Box::pin(future::err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::ConnectionReset, - "mock-io-error", - )))); - match &conn_utils.return_connection_err { - ShouldReturnConnectionError::No => {} - ShouldReturnConnectionError::Yes => return conn_err, - ShouldReturnConnectionError::OnOddIdx(curr_idx) => { - if curr_idx.fetch_add(1, Ordering::SeqCst) % 2 != 0 { - // raise an error on each odd number - return conn_err; - } - } - } - - let ip = match &conn_utils.returned_ip_type { - ConnectionIPReturnType::Specified(ip) => Some(*ip), - ConnectionIPReturnType::Different(ip_getter) => { - let first_ip_num = ip_getter.fetch_add(1, Ordering::SeqCst) as u8; - Some(IpAddr::V4(Ipv4Addr::new(first_ip_num, 0, 0, 0))) - } - ConnectionIPReturnType::None => None, - }; - - Box::pin(future::ok(( - MockConnection { - id: conn_utils - .connection_id_provider - .fetch_add(1, Ordering::SeqCst), - handler: conn_utils.get_handler(), - port, - }, - ip, - ))) - } -} - -impl cluster::Connect for MockConnection { - fn connect<'a, T>(info: T, _timeout: Option) -> RedisResult - where - T: IntoConnectionInfo, - { - let info = info.into_connection_info().unwrap(); - - let (name, port) = match &info.addr { - redis::ConnectionAddr::Tcp(addr, port) => (addr, *port), - _ => unreachable!(), - }; - let binding = MOCK_CONN_BEHAVIORS.read().unwrap(); - let conn_utils = binding - .get(name) - .unwrap_or_else(|| panic!("MockConnectionUtils for `{name}` were not installed")); - Ok(MockConnection { - id: conn_utils - .connection_id_provider - .fetch_add(1, Ordering::SeqCst), - handler: conn_utils.get_handler(), - port, - }) - } - - fn send_packed_command(&mut self, _cmd: &[u8]) -> RedisResult<()> { - Ok(()) - } - - fn set_write_timeout(&self, _dur: Option) -> RedisResult<()> { - Ok(()) - } - - fn set_read_timeout(&self, _dur: Option) -> RedisResult<()> { - Ok(()) - } - - fn recv_response(&mut self) -> RedisResult { - Ok(Value::Nil) - } -} - -pub fn contains_slice(xs: &[u8], ys: &[u8]) -> bool { - for i in 0..xs.len() { - if xs[i..].starts_with(ys) { - return true; - } - } - false -} - -pub fn respond_startup(name: &str, cmd: &[u8]) -> Result<(), RedisResult> { - if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") { - Err(Ok(Value::SimpleString("OK".into()))) - } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { - Err(Ok(Value::Array(vec![Value::Array(vec![ - Value::Int(0), - Value::Int(16383), - Value::Array(vec![ - Value::BulkString(name.as_bytes().to_vec()), - Value::Int(6379), - ]), - ])]))) - } else if contains_slice(cmd, b"READONLY") { - Err(Ok(Value::SimpleString("OK".into()))) - } else { - Ok(()) - } -} - #[derive(Clone, Debug)] pub struct MockSlotRange { pub primary_port: u16, @@ -322,78 +95,6 @@ pub fn respond_startup_with_replica_using_config( } } -#[cfg(feature = "cluster-async")] -impl aio::ConnectionLike for MockConnection { - fn req_packed_command<'a>(&'a mut self, cmd: &'a redis::Cmd) -> RedisFuture<'a, Value> { - Box::pin(future::ready( - (self.handler)(&cmd.get_packed_command(), self.port) - .expect_err("Handler did not specify a response"), - )) - } - - fn req_packed_commands<'a>( - &'a mut self, - _pipeline: &'a redis::Pipeline, - _offset: usize, - _count: usize, - ) -> RedisFuture<'a, Vec> { - Box::pin(future::ok(vec![])) - } - - fn get_db(&self) -> i64 { - 0 - } -} - -impl redis::ConnectionLike for MockConnection { - fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult { - (self.handler)(cmd, self.port).expect_err("Handler did not specify a response") - } - - fn req_packed_commands( - &mut self, - cmd: &[u8], - offset: usize, - _count: usize, - ) -> RedisResult> { - let res = (self.handler)(cmd, self.port).expect_err("Handler did not specify a response"); - match res { - Err(err) => Err(err), - Ok(res) => { - if let Value::Array(results) = res { - match results.into_iter().nth(offset) { - Some(Value::Array(res)) => Ok(res), - _ => Err((ErrorKind::ResponseError, "non-array response").into()), - } - } else { - Err(( - ErrorKind::ResponseError, - "non-array response", - String::from_redis_value(&res).unwrap(), - ) - .into()) - } - } - } - } - - fn get_db(&self) -> i64 { - 0 - } - - fn check_connection(&mut self) -> bool { - true - } - - fn is_open(&self) -> bool { - true - } - - fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { - // TODO - implement handling RESP3 push messages - } -} - pub struct MockEnv { #[cfg(feature = "cluster-async")] pub runtime: Runtime, @@ -405,16 +106,6 @@ pub struct MockEnv { pub handler: RemoveHandler, } -pub struct RemoveHandler(Vec); - -impl Drop for RemoveHandler { - fn drop(&mut self) { - for id in &self.0 { - MOCK_CONN_BEHAVIORS.write().unwrap().remove(id); - } - } -} - impl MockEnv { pub fn new( id: &str, @@ -440,7 +131,10 @@ impl MockEnv { .unwrap(); let id = id.to_string(); - add_new_mock_connection_behavior(&id, Arc::new(move |cmd, port| handler(cmd, port))); + let remove_handler = MockConnectionBehavior::register_new( + &id, + Arc::new(move |cmd, port| handler(cmd, port)), + ); let client = client_builder.build().unwrap(); let connection = client.get_generic_connection().unwrap(); #[cfg(feature = "cluster-async")] @@ -454,7 +148,7 @@ impl MockEnv { connection, #[cfg(feature = "cluster-async")] async_connection, - handler: RemoveHandler(vec![id]), + handler: remove_handler, } } } diff --git a/redis/tests/test_async_cluster_connections_logic.rs b/redis/tests/test_async_cluster_connections_logic.rs deleted file mode 100644 index 99d519c7e..000000000 --- a/redis/tests/test_async_cluster_connections_logic.rs +++ /dev/null @@ -1,39 +0,0 @@ -#![cfg(feature = "cluster-async")] -mod support; - -mod async_cluster_connection_logic { - use super::*; - use redis::{cluster_async::connections_logic::connect_and_check, ClusterParams}; - use std::net::{IpAddr, Ipv4Addr}; - use support::{respond_startup, ConnectionIPReturnType, MockEnv}; - - use crate::support::{modify_mock_connection_behavior, MockConnection}; - - #[test] - fn test_connect_and_check_connect_successfully() { - // Test that upon refreshing all connections, if both connections were successful, - // the returned node contains both user and management connection - let name = "test_connect_and_check_connect_successfully"; - - let mock_env = MockEnv::new(name, move |cmd, _| { - respond_startup(name, cmd)?; - Ok(()) - }); - - let expected_ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); - modify_mock_connection_behavior(name, |behavior| { - behavior.returned_ip_type = ConnectionIPReturnType::Specified(expected_ip) - }); - - mock_env.runtime.block_on(async { - let (_conn, ip) = connect_and_check::( - &format!("{name}:6379"), - ClusterParams::default(), - None, - ) - .await - .unwrap(); - assert_eq!(ip, Some(expected_ip)); - }) - } -} diff --git a/redis/tests/test_cluster.rs b/redis/tests/test_cluster.rs index bb4d00347..2b4661779 100644 --- a/redis/tests/test_cluster.rs +++ b/redis/tests/test_cluster.rs @@ -8,8 +8,9 @@ use std::sync::{ use crate::support::*; use redis::{ cluster::{cluster_pipe, ClusterClient}, - cmd, parse_redis_value, Commands, ConnectionLike, ErrorKind, ProtocolVersion, RedisError, - Value, + cmd, parse_redis_value, + testing::mock_connection::{contains_slice, respond_startup}, + Commands, ConnectionLike, ErrorKind, ProtocolVersion, RedisError, Value, }; #[test] diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 1f1beaa18..b0ad0f739 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -16,6 +16,9 @@ use once_cell::sync::Lazy; use redis::cluster_routing::Route; use redis::cluster_routing::SingleNodeRoutingInfo; use redis::cluster_routing::SlotAddr; +use redis::testing::mock_connection::{ + contains_slice, respond_startup, MockConnection, MockConnectionBehavior, +}; use redis::ProtocolVersion; use redis::{ @@ -593,7 +596,7 @@ fn test_cluster_async_cannot_connect_to_server_with_unknown_host_name() { }; let client_builder = ClusterClient::builder(vec![&*format!("redis://{name}")]); let client = client_builder.build().unwrap(); - add_new_mock_connection_behavior(name, Arc::new(handler)); + let _handle = MockConnectionBehavior::register_new(name, Arc::new(handler)); let connection = client.get_generic_connection::(); assert!(connection.is_err()); let err = connection.err().unwrap();