From 1625ee3c48d65165b8dfb8694f40bbb9fd6d511d Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Mon, 24 Feb 2025 10:11:04 +0100 Subject: [PATCH] chore: renaming and improve docs --- .../snapshot/latency_based_routing.rs | 127 ++++++++++-------- 1 file changed, 69 insertions(+), 58 deletions(-) diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs index a34c6d7c..e40951f9 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/snapshot/latency_based_routing.rs @@ -30,25 +30,30 @@ fn generate_exp_decaying_weights(n: usize, lambda: f64) -> Vec { weights } -// A node, which is selected to participate in the routing. -// The choice for selection is based on node's ranking (score). +/// A node candidate eligible for final routing selection based on its score. +/// +/// # Overview +/// This struct represents a node that has passed initial pre-selection criteria and is part of the +/// routing candidate pool. The selection process happens in two phases: +/// 1. Pre-selection: depending on the settings, either the k-top nodes or all healthy nodes are chosen +/// 2. Final selection: a node is probabilistically selected from the candidate pool based on its score #[derive(Clone, Debug)] -struct RoutingNode { +struct RoutingCandidateNode { node: Node, score: f64, } -impl RoutingNode { +impl RoutingCandidateNode { fn new(node: Node, score: f64) -> Self { Self { node, score } } } // Stores node's meta information and metrics (latencies, availabilities). -// Routing URLs are generated based on the score field. +// Routing nodes are probabilistically selected based on the score field. #[derive(Clone, Debug)] struct NodeMetrics { - // Size of the sliding window used for store latencies and availabilities of the node. + // Size of the sliding window used to store latencies and availabilities of the node. window_size: usize, /// Reflects the status of the most recent health check. It should be the same as the last element in `availabilities`. is_healthy: bool, @@ -56,7 +61,7 @@ struct NodeMetrics { latencies: VecDeque, /// Sliding window with availability measurements. availabilities: VecDeque, - /// Overall score of the node. Calculated based on latencies and availabilities arrays. This score is used in `next_n_nodes()` and `next_node()` methods. + /// Overall score of the node. Calculated based on latencies and availabilities arrays. This score is used in `next_n_nodes()` method for the final nodes selection. score: f64, } @@ -170,15 +175,15 @@ fn compute_score( /// # Latency-based dynamic routing /// /// This module implements a routing strategy that uses weighted random selection of nodes based on their historical data (latencies and availabilities). -/// The main features of this strategy are: /// +/// Summary of the routing strategy: /// - Uses sliding windows for storing latencies and availabilities of each node -/// - The overall score of each node is computed as a product of latency and availability scores, score = score_l * score_a -/// - Latency and availability scores are computed from sliding windows using an additional array of weights, allowing prioritization of more recent observations. By default, exponentially decaying weights are used. -/// - Uses weighted random selection of nodes for load balancing +/// - Latency and availability scores are first computed separately from the sliding windows using an additional array of weights, allowing prioritization of more recent observations. By default, exponentially decaying weights are used. +/// - The final overall score of each node is computed as a product of latency and availability scores, namely score = score_l * score_a +/// - Nodes pre-selection phase: if k-top-nodes setting is enabled, then only k nodes with highest scores are filtered into the routing candidate pool, otherwise all nodes are taken +/// - Final nodes selection for routing from the candidate pool is probabilistic and proportional to score /// /// ## Configuration Options -/// /// - `k_top_nodes`: Limit routing to only the top K nodes with highest score /// - `use_availability_penalty`: Whether to penalize nodes for being unavailable /// - Custom window weights can be provided for specialized decay functions @@ -188,8 +193,8 @@ pub struct LatencyRoutingSnapshot { k_top_nodes: Option, // Stores all existing nodes in the topology along with their historical data (latencies and availabilities) existing_nodes: HashMap, - // Snapshot of selected nodes, which are participating in routing. Snapshot is published via publish_routing_nodes() when either: topology changes or a health check of some node is received. - routing_nodes: Arc>>, + // Snapshot of nodes, which are pre-selected as candidates for routing. Snapshot is published via publish_routing_nodes() when either: topology changes or a health check of some node is received. + routing_candidates: Arc>>, // Weights used to compute the availability score of a node. window_weights: Vec, // Pre-computed weights sum, passed for efficiency purpose as this sum doesn't change. @@ -210,7 +215,7 @@ impl LatencyRoutingSnapshot { Self { k_top_nodes: None, existing_nodes: HashMap::new(), - routing_nodes: Arc::new(ArcSwap::new(vec![].into())), + routing_candidates: Arc::new(ArcSwap::new(vec![].into())), use_availability_penalty: true, window_weights, window_weights_sum, @@ -240,36 +245,36 @@ impl LatencyRoutingSnapshot { self } - /// Atomically updates the routing_nodes - fn publish_routing_nodes(&self) { - let mut routing_nodes: Vec = self + /// Atomically updates the routing_candidates + fn publish_routing_candidates(&self) { + let mut routing_candidates: Vec = self .existing_nodes .iter() .filter(|(_, v)| v.is_healthy) - .map(|(k, v)| RoutingNode::new(k.clone(), v.score)) + .map(|(k, v)| RoutingCandidateNode::new(k.clone(), v.score)) .collect(); - // In case requests are routed to only k top nodes, select these top nodes + // In case requests are routed to only k-top nodes, pre-select these candidates if let Some(k_top) = self.k_top_nodes { - routing_nodes.sort_by(|a, b| { + routing_candidates.sort_by(|a, b| { b.score .partial_cmp(&a.score) .unwrap_or(std::cmp::Ordering::Equal) }); - if routing_nodes.len() > k_top { - routing_nodes.truncate(k_top); + if routing_candidates.len() > k_top { + routing_candidates.truncate(k_top); } } - // Atomically update the routing table - self.routing_nodes.store(Arc::new(routing_nodes)); + // Atomically update the table of routing candidates + self.routing_candidates.store(Arc::new(routing_candidates)); } } /// Helper function to sample nodes based on their weights. /// Node index is selected based on the input number in range [0.0, 1.0] #[inline(always)] -fn weighted_sample(weighted_nodes: &[RoutingNode], number: f64) -> Option { +fn weighted_sample(weighted_nodes: &[RoutingCandidateNode], number: f64) -> Option { if !(0.0..=1.0).contains(&number) || weighted_nodes.is_empty() { return None; } @@ -293,7 +298,7 @@ fn weighted_sample(weighted_nodes: &[RoutingNode], number: f64) -> Option impl RoutingSnapshot for LatencyRoutingSnapshot { fn has_nodes(&self) -> bool { - !self.routing_nodes.load().is_empty() + !self.routing_candidates.load().is_empty() } fn next_node(&self) -> Option { @@ -306,17 +311,18 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { return Vec::new(); } - let mut routing_nodes: Vec = self.routing_nodes.load().as_ref().clone(); + let mut routing_candidates: Vec = + self.routing_candidates.load().as_ref().clone(); // Limit the number of returned nodes to the number of available nodes - let n = std::cmp::min(n, routing_nodes.len()); + let n = std::cmp::min(n, routing_candidates.len()); let mut nodes = Vec::with_capacity(n); let mut rng = rand::thread_rng(); for _ in 0..n { let rand_num = rng.gen::(); - if let Some(idx) = weighted_sample(routing_nodes.as_slice(), rand_num) { - let removed_node = routing_nodes.swap_remove(idx); + if let Some(idx) = weighted_sample(routing_candidates.as_slice(), rand_num) { + let removed_node = routing_candidates.swap_remove(idx); nodes.push(removed_node.node); } } @@ -347,7 +353,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { } if has_changes { - self.publish_routing_nodes(); + self.publish_routing_candidates(); } has_changes @@ -370,7 +376,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { self.use_availability_penalty, ); - self.publish_routing_nodes(); + self.publish_routing_candidates(); true } @@ -378,7 +384,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { fn routes_stats(&self) -> RoutesStats { RoutesStats::new( self.existing_nodes.len(), - Some(self.routing_nodes.load().len()), + Some(self.routing_candidates.load().len()), ) } } @@ -397,7 +403,7 @@ mod tests { snapshot::{ latency_based_routing::{ compute_score, weighted_sample, LatencyRoutingSnapshot, NodeMetrics, - RoutingNode, + RoutingCandidateNode, }, routing_snapshot::RoutingSnapshot, }, @@ -441,15 +447,13 @@ mod tests { .set_availability_penalty(false); let node = Node::new("api1.com").unwrap(); let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); - snapshot - .existing_nodes - .insert(node.clone(), NodeMetrics::new(2)); + snapshot.sync_nodes(&[node.clone()]); assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0))); // Check first update let is_updated = snapshot.update_node(&node, health); assert!(is_updated); assert!(snapshot.has_nodes()); - let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap(); + let metrics = snapshot.existing_nodes.get(&node).unwrap(); assert_eq!(metrics.score, (2.0 / 1.0) / 2.0); assert_eq!(snapshot.next_node().unwrap(), node); assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(1))); @@ -457,24 +461,24 @@ mod tests { let health = HealthCheckStatus::new(Some(Duration::from_secs(2))); let is_updated = snapshot.update_node(&node, health); assert!(is_updated); - let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap(); + let metrics = snapshot.existing_nodes.get(&node).unwrap(); assert_eq!(metrics.score, (2.0 / 2.0 + 1.0 / 1.0) / 3.0); - // Check third update - let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); - let is_updated = snapshot.update_node(&node, health); - assert!(is_updated); - let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap(); - assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0); - // Check forth update with none + // Check third update with none let health = HealthCheckStatus::new(None); let is_updated = snapshot.update_node(&node, health); assert!(is_updated); - let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap(); - assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0); + let metrics = snapshot.existing_nodes.get(&node).unwrap(); + assert_eq!(metrics.score, (2.0 / 2.0 + 1.0 / 1.0) / 3.0); assert!(!snapshot.has_nodes()); assert_eq!(snapshot.existing_nodes.len(), 1); assert!(snapshot.next_node().is_none()); assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0))); + // Check fourth update + let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + let metrics = snapshot.existing_nodes.get(&node).unwrap(); + assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0); } #[test] @@ -511,6 +515,13 @@ mod tests { keys.sort_by(|a, b| a.domain().cmp(b.domain())); assert_eq!(keys, vec![&node_2, &node_3]); assert!(!snapshot.has_nodes()); + // Sync with [node_2, node_3] again + let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]); + assert!(!nodes_changed); + let mut keys = snapshot.existing_nodes.keys().collect::>(); + keys.sort_by(|a, b| a.domain().cmp(b.domain())); + assert_eq!(keys, vec![&node_2, &node_3]); + assert!(!snapshot.has_nodes()); // Sync with [] let nodes_changed = snapshot.sync_nodes(&[]); assert!(nodes_changed); @@ -526,11 +537,11 @@ mod tests { fn test_weighted_sample() { let node = Node::new("api1.com").unwrap(); // Case 1: empty array - let arr: &[RoutingNode] = &[]; + let arr: &[RoutingCandidateNode] = &[]; let idx = weighted_sample(arr, 0.5); assert_eq!(idx, None); // Case 2: single element in array - let arr = &[RoutingNode::new(node.clone(), 1.0)]; + let arr = &[RoutingCandidateNode::new(node.clone(), 1.0)]; let idx = weighted_sample(arr, 0.0); assert_eq!(idx, Some(0)); let idx = weighted_sample(arr, 1.0); @@ -542,8 +553,8 @@ mod tests { assert_eq!(idx, None); // Case 3: two elements in array (second element has twice the weight of the first) let arr = &[ - RoutingNode::new(node.clone(), 1.0), - RoutingNode::new(node.clone(), 2.0), + RoutingCandidateNode::new(node.clone(), 1.0), + RoutingCandidateNode::new(node.clone(), 2.0), ]; // prefixed_sum = [1.0, 3.0] let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0 assert_eq!(idx, Some(0)); @@ -560,10 +571,10 @@ mod tests { assert_eq!(idx, None); // Case 4: four elements in array let arr = &[ - RoutingNode::new(node.clone(), 1.0), - RoutingNode::new(node.clone(), 2.0), - RoutingNode::new(node.clone(), 1.5), - RoutingNode::new(node.clone(), 2.5), + RoutingCandidateNode::new(node.clone(), 1.0), + RoutingCandidateNode::new(node.clone(), 2.0), + RoutingCandidateNode::new(node.clone(), 1.5), + RoutingCandidateNode::new(node.clone(), 2.5), ]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0] let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0 assert_eq!(idx, Some(0)); // probability ~0.14 @@ -689,7 +700,7 @@ mod tests { (node_3, metrics_3), (node_4, metrics_4), ]); - snapshot.publish_routing_nodes(); + snapshot.publish_routing_candidates(); let mut stats = HashMap::new(); let experiments = 30; let select_nodes_count = 1;