Skip to content

Commit

Permalink
Use the node's host, if host is missing from slots
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Jan 9, 2024
1 parent d317ab7 commit 86c7ef8
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 77 deletions.
10 changes: 7 additions & 3 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,19 @@ where
let mut connections = self.connections.borrow_mut();
let mut rng = thread_rng();
let len = connections.len();
let mut samples = connections.values_mut().choose_multiple(&mut rng, len);
let samples = connections.iter_mut().choose_multiple(&mut rng, len);
let mut result = Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
"didn't get any slots from server".to_string(),
)));
for conn in samples.iter_mut() {
for (addr, conn) in samples {
let value = conn.req_command(&slot_cmd())?;
match parse_and_count_slots(&value, self.cluster_params.tls).map(|slots_data| {
let addr = addr.split(':').next().ok_or(RedisError::from((
ErrorKind::ClientError,
"can't parse node address",
)))?;
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(|slots_data| {
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas)
}) {
Ok(new_slots) => {
Expand Down
89 changes: 53 additions & 36 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,25 +870,13 @@ where
// When we no longer need to support Rust versions < 1.67, remove fast_math and transition to the ilog2 function.
let num_of_nodes_to_query =
std::cmp::max(fast_math::log2_raw(num_of_nodes as f32) as usize, 1);
let requested_nodes =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::User);
let topology_join_results =
futures::future::join_all(requested_nodes.map(|conn| async move {
let mut conn: C = conn.1.await;
conn.req_packed_command(&slot_cmd()).await
}))
.await;
let topology_values: Vec<_> = topology_join_results
.into_iter()
.filter_map(|r| r.ok())
.collect();
let (_, found_topology_hash) = calculate_topology(
topology_values,
DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
inner.cluster_params.tls,
let (_, found_topology_hash) = calculate_topology_from_random_nodes(
&inner,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
)?;
&read_guard,
DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
)
.await?;
let change_found = read_guard.get_current_topology_hash() != found_topology_hash;
Ok(change_found)
}
Expand All @@ -914,25 +902,13 @@ where
let num_of_nodes = read_guard.len();
const MAX_REQUESTED_NODES: usize = 50;
let num_of_nodes_to_query = std::cmp::min(num_of_nodes, MAX_REQUESTED_NODES);
let requested_nodes =
read_guard.random_connections(num_of_nodes_to_query, ConnectionType::User);
let topology_join_results =
futures::future::join_all(requested_nodes.map(|conn| async move {
let mut conn: C = conn.1.await;
conn.req_packed_command(&slot_cmd()).await
}))
.await;
let topology_values: Vec<_> = topology_join_results
.into_iter()
.filter_map(|r| r.ok())
.collect();
let (new_slots, topology_hash) = calculate_topology(
topology_values,
curr_retry,
inner.cluster_params.tls,
let (new_slots, topology_hash) = calculate_topology_from_random_nodes(
&inner,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
)?;
&read_guard,
curr_retry,
)
.await?;
info!("Found slot map: {new_slots}");
let connections = &*read_guard;
// Create a new connection vector of the found nodes
Expand Down Expand Up @@ -1562,6 +1538,47 @@ where
}
}

async fn calculate_topology_from_random_nodes<'a, C>(
inner: &Core<C>,
num_of_nodes_to_query: usize,
read_guard: &tokio::sync::RwLockReadGuard<'a, ConnectionsContainer<C>>,
curr_retry: usize,
) -> RedisResult<(
crate::cluster_slotmap::SlotMap,
crate::cluster_topology::TopologyHash,
)>
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let requested_nodes = read_guard
.random_connections(num_of_nodes_to_query, ConnectionType::User)
.filter_map(|(identifier, conn)| {
read_guard
.address_for_identifier(&identifier)
.map(|addr| (addr, conn))
});
let topology_join_results =
futures::future::join_all(requested_nodes.map(|(host, conn)| async move {
let mut conn: C = conn.await;
conn.req_packed_command(&slot_cmd())
.await
.map(|res| (host, res))
}))
.await;
let topology_values = topology_join_results.iter().filter_map(|r| {
r.as_ref().ok().and_then(|(addr, value)| {
get_host_and_port_from_addr(addr).map(|(host, _)| (host, value))
})
});
calculate_topology(
topology_values,
curr_retry,
inner.cluster_params.tls,
num_of_nodes_to_query,
inner.cluster_params.read_from_replicas,
)
}

impl<C> ConnectionLike for ClusterConnection<C>
where
C: ConnectionLike + Send + Clone + Unpin + Sync + Connect + 'static,
Expand Down
5 changes: 5 additions & 0 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ impl Slot {
pub fn end(&self) -> u16 {
self.end
}

#[allow(dead_code)] // used in tests
pub(crate) fn master(&self) -> &str {
self.master.as_str()
}
}

/// What type of node should a request be routed to, assuming read from replica is enabled.
Expand Down
124 changes: 86 additions & 38 deletions redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub fn get_slot(key: &[u8]) -> u16 {
pub(crate) fn parse_and_count_slots(
raw_slot_resp: &Value,
tls: Option<TlsMode>,
// The DNS address of the node from which `raw_slot_resp` was received.
addr_of_answering_node: &str,
) -> RedisResult<(u16, Vec<Slot>)> {
// Parse response.
let mut slots = Vec::with_capacity(2);
Expand Down Expand Up @@ -102,12 +104,17 @@ pub(crate) fn parse_and_count_slots(
return None;
}

let ip = if let Value::BulkString(ref ip) = node[0] {
String::from_utf8_lossy(ip)
let hostname = if let Value::BulkString(ref ip) = node[0] {
let hostname = String::from_utf8_lossy(ip);
if hostname.is_empty() {
addr_of_answering_node.into()
} else {
hostname
}
} else {
return None;
};
if ip.is_empty() {
if hostname.is_empty() {
return None;
}

Expand All @@ -116,7 +123,9 @@ pub(crate) fn parse_and_count_slots(
} else {
return None;
};
Some(get_connection_addr(ip.into_owned(), port, tls, None).to_string())
Some(
get_connection_addr(hostname.into_owned(), port, tls, None).to_string(),
)
} else {
None
}
Expand Down Expand Up @@ -148,22 +157,16 @@ fn calculate_hash<T: Hash>(t: &T) -> u64 {
s.finish()
}

pub(crate) fn calculate_topology(
topology_views: Vec<Value>,
pub(crate) fn calculate_topology<'a>(
topology_views: impl Iterator<Item = (&'a str, &'a Value)>,
curr_retry: usize,
tls_mode: Option<TlsMode>,
num_of_queried_nodes: usize,
read_from_replica: ReadFromReplicaStrategy,
) -> Result<(SlotMap, TopologyHash), RedisError> {
if topology_views.is_empty() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error: All CLUSTER SLOTS results are errors",
)));
}
) -> RedisResult<(SlotMap, TopologyHash)> {
let mut hash_view_map = HashMap::new();
for view in topology_views {
if let Ok(slots_and_count) = parse_and_count_slots(&view, tls_mode) {
for (host, view) in topology_views {
if let Ok(slots_and_count) = parse_and_count_slots(view, tls_mode, host) {
let hash_value = calculate_hash(&slots_and_count);
let topology_entry = hash_view_map.entry(hash_value).or_insert(TopologyView {
hash_value,
Expand Down Expand Up @@ -279,8 +282,8 @@ mod tests {
slot_value(4001, 8000, "node1", 6380),
]);

let res1 = parse_and_count_slots(&view1, None).unwrap();
let res2 = parse_and_count_slots(&view2, None).unwrap();
let res1 = parse_and_count_slots(&view1, None, "foo").unwrap();
let res2 = parse_and_count_slots(&view2, None, "foo").unwrap();
assert_eq!(res1.0, res2.0);
assert_eq!(res1.1.len(), res2.1.len());
let check =
Expand All @@ -290,28 +293,72 @@ mod tests {
assert!(check);
}

#[test]
fn parse_slots_returns_slots_with_host_name_if_missing() {
let view = Value::Array(vec![slot_value(0, 4000, "", 6379)]);

let (slot_count, slots) = parse_and_count_slots(&view, None, "node").unwrap();
assert_eq!(slot_count, 4000);
assert_eq!(slots[0].master(), "node:6379");
}

#[test]
fn should_parse_and_hash_regardless_of_missing_host_name_and_order() {
let view1 = Value::Array(vec![
slot_value(0, 4000, "", 6379),
slot_value(4001, 8000, "node2", 6380),
slot_value(8001, 16383, "node3", 6379),
]);

let view2 = Value::Array(vec![
slot_value(8001, 16383, "", 6379),
slot_value(0, 4000, "node1", 6379),
slot_value(4001, 8000, "node2", 6380),
]);

let res1 = parse_and_count_slots(&view1, None, "node1").unwrap();
let res2 = parse_and_count_slots(&view2, None, "node3").unwrap();

assert_eq!(calculate_hash(&res1), calculate_hash(&res2));
assert_eq!(res1.0, res2.0);
assert_eq!(res1.1.len(), res2.1.len());
let equality_check =
res1.1.into_iter().zip(res2.1).all(|(first, second)| {
first.start() == second.start() && first.end() == second.end()
});
assert!(equality_check);
}

enum ViewType {
SingleNodeViewFullCoverage,
SingleNodeViewMissingSlots,
TwoNodesViewFullCoverage,
TwoNodesViewMissingSlots,
}
fn get_view(view_type: &ViewType) -> Value {
fn get_view(view_type: &ViewType) -> (&str, Value) {
match view_type {
ViewType::SingleNodeViewFullCoverage => {
Value::Array(vec![slot_value(0, 16383, "node1", 6379)])
}
ViewType::SingleNodeViewMissingSlots => {
Value::Array(vec![slot_value(0, 4000, "node1", 6379)])
}
ViewType::TwoNodesViewFullCoverage => Value::Array(vec![
slot_value(0, 4000, "node1", 6379),
slot_value(4001, 16383, "node2", 6380),
]),
ViewType::TwoNodesViewMissingSlots => Value::Array(vec![
slot_value(0, 3000, "node3", 6381),
slot_value(4001, 16383, "node4", 6382),
]),
ViewType::SingleNodeViewFullCoverage => (
"first",
Value::Array(vec![slot_value(0, 16383, "node1", 6379)]),
),
ViewType::SingleNodeViewMissingSlots => (
"second",
Value::Array(vec![slot_value(0, 4000, "node1", 6379)]),
),
ViewType::TwoNodesViewFullCoverage => (
"third",
Value::Array(vec![
slot_value(0, 4000, "node1", 6379),
slot_value(4001, 16383, "node2", 6380),
]),
),
ViewType::TwoNodesViewMissingSlots => (
"fourth",
Value::Array(vec![
slot_value(0, 3000, "node3", 6381),
slot_value(4001, 16383, "node4", 6382),
]),
),
}
}

Expand All @@ -328,8 +375,9 @@ mod tests {
get_view(&ViewType::SingleNodeViewFullCoverage),
get_view(&ViewType::TwoNodesViewFullCoverage),
];

let (topology_view, _) = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
Expand All @@ -352,7 +400,7 @@ mod tests {
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let topology_view = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
Expand All @@ -371,7 +419,7 @@ mod tests {
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
3,
None,
queried_nodes,
Expand All @@ -394,7 +442,7 @@ mod tests {
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
Expand All @@ -418,7 +466,7 @@ mod tests {
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
Expand All @@ -442,7 +490,7 @@ mod tests {
get_view(&ViewType::SingleNodeViewMissingSlots),
];
let (topology_view, _) = calculate_topology(
topology_results,
topology_results.iter().map(|(addr, value)| (*addr, value)),
1,
None,
queried_nodes,
Expand Down
Loading

0 comments on commit 86c7ef8

Please sign in to comment.