Skip to content

Commit

Permalink
chore: renaming and improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Feb 24, 2025
1 parent 4d421b5 commit 1625ee3
Showing 1 changed file with 69 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,38 @@ fn generate_exp_decaying_weights(n: usize, lambda: f64) -> Vec<f64> {
weights
}

// A node, which is selected to participate in the routing.
// The choice for selection is based on node's ranking (score).
/// A node candidate eligible for final routing selection based on its score.
///
/// # Overview
/// This struct represents a node that has passed initial pre-selection criteria and is part of the
/// routing candidate pool. The selection process happens in two phases:
/// 1. Pre-selection: depending on the settings, either the k-top nodes or all healthy nodes are chosen
/// 2. Final selection: a node is probabilistically selected from the candidate pool based on its score
#[derive(Clone, Debug)]
struct RoutingNode {
struct RoutingCandidateNode {
node: Node,
score: f64,
}

impl RoutingNode {
impl RoutingCandidateNode {
fn new(node: Node, score: f64) -> Self {
Self { node, score }
}
}

// Stores node's meta information and metrics (latencies, availabilities).
// Routing URLs are generated based on the score field.
// Routing nodes are probabilistically selected based on the score field.
#[derive(Clone, Debug)]
struct NodeMetrics {
// Size of the sliding window used for store latencies and availabilities of the node.
// Size of the sliding window used to store latencies and availabilities of the node.
window_size: usize,
/// Reflects the status of the most recent health check. It should be the same as the last element in `availabilities`.
is_healthy: bool,
/// Sliding window with latency measurements.
latencies: VecDeque<f64>,
/// Sliding window with availability measurements.
availabilities: VecDeque<bool>,
/// Overall score of the node. Calculated based on latencies and availabilities arrays. This score is used in `next_n_nodes()` and `next_node()` methods.
/// Overall score of the node. Calculated based on latencies and availabilities arrays. This score is used in `next_n_nodes()` method for the final nodes selection.
score: f64,
}

Expand Down Expand Up @@ -170,15 +175,15 @@ fn compute_score(
/// # Latency-based dynamic routing
///
/// This module implements a routing strategy that uses weighted random selection of nodes based on their historical data (latencies and availabilities).
/// The main features of this strategy are:
///
/// Summary of the routing strategy:
/// - Uses sliding windows for storing latencies and availabilities of each node
/// - The overall score of each node is computed as a product of latency and availability scores, score = score_l * score_a
/// - Latency and availability scores are computed from sliding windows using an additional array of weights, allowing prioritization of more recent observations. By default, exponentially decaying weights are used.
/// - Uses weighted random selection of nodes for load balancing
/// - Latency and availability scores are first computed separately from the sliding windows using an additional array of weights, allowing prioritization of more recent observations. By default, exponentially decaying weights are used.
/// - The final overall score of each node is computed as a product of latency and availability scores, namely score = score_l * score_a
/// - Nodes pre-selection phase: if k-top-nodes setting is enabled, then only k nodes with highest scores are filtered into the routing candidate pool, otherwise all nodes are taken
/// - Final nodes selection for routing from the candidate pool is probabilistic and proportional to score
///
/// ## Configuration Options
///
/// - `k_top_nodes`: Limit routing to only the top K nodes with highest score
/// - `use_availability_penalty`: Whether to penalize nodes for being unavailable
/// - Custom window weights can be provided for specialized decay functions
Expand All @@ -188,8 +193,8 @@ pub struct LatencyRoutingSnapshot {
k_top_nodes: Option<usize>,
// Stores all existing nodes in the topology along with their historical data (latencies and availabilities)
existing_nodes: HashMap<Node, NodeMetrics>,
// Snapshot of selected nodes, which are participating in routing. Snapshot is published via publish_routing_nodes() when either: topology changes or a health check of some node is received.
routing_nodes: Arc<ArcSwap<Vec<RoutingNode>>>,
// Snapshot of nodes, which are pre-selected as candidates for routing. Snapshot is published via publish_routing_nodes() when either: topology changes or a health check of some node is received.
routing_candidates: Arc<ArcSwap<Vec<RoutingCandidateNode>>>,
// Weights used to compute the availability score of a node.
window_weights: Vec<f64>,
// Pre-computed weights sum, passed for efficiency purpose as this sum doesn't change.
Expand All @@ -210,7 +215,7 @@ impl LatencyRoutingSnapshot {
Self {
k_top_nodes: None,
existing_nodes: HashMap::new(),
routing_nodes: Arc::new(ArcSwap::new(vec![].into())),
routing_candidates: Arc::new(ArcSwap::new(vec![].into())),
use_availability_penalty: true,
window_weights,
window_weights_sum,
Expand Down Expand Up @@ -240,36 +245,36 @@ impl LatencyRoutingSnapshot {
self
}

/// Atomically updates the routing_nodes
fn publish_routing_nodes(&self) {
let mut routing_nodes: Vec<RoutingNode> = self
/// Atomically updates the routing_candidates
fn publish_routing_candidates(&self) {
let mut routing_candidates: Vec<RoutingCandidateNode> = self
.existing_nodes
.iter()
.filter(|(_, v)| v.is_healthy)
.map(|(k, v)| RoutingNode::new(k.clone(), v.score))
.map(|(k, v)| RoutingCandidateNode::new(k.clone(), v.score))
.collect();

// In case requests are routed to only k top nodes, select these top nodes
// In case requests are routed to only k-top nodes, pre-select these candidates
if let Some(k_top) = self.k_top_nodes {
routing_nodes.sort_by(|a, b| {
routing_candidates.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});

if routing_nodes.len() > k_top {
routing_nodes.truncate(k_top);
if routing_candidates.len() > k_top {
routing_candidates.truncate(k_top);
}
}
// Atomically update the routing table
self.routing_nodes.store(Arc::new(routing_nodes));
// Atomically update the table of routing candidates
self.routing_candidates.store(Arc::new(routing_candidates));
}
}

/// Helper function to sample nodes based on their weights.
/// Node index is selected based on the input number in range [0.0, 1.0]
#[inline(always)]
fn weighted_sample(weighted_nodes: &[RoutingNode], number: f64) -> Option<usize> {
fn weighted_sample(weighted_nodes: &[RoutingCandidateNode], number: f64) -> Option<usize> {
if !(0.0..=1.0).contains(&number) || weighted_nodes.is_empty() {
return None;
}
Expand All @@ -293,7 +298,7 @@ fn weighted_sample(weighted_nodes: &[RoutingNode], number: f64) -> Option<usize>

impl RoutingSnapshot for LatencyRoutingSnapshot {
fn has_nodes(&self) -> bool {
!self.routing_nodes.load().is_empty()
!self.routing_candidates.load().is_empty()
}

fn next_node(&self) -> Option<Node> {
Expand All @@ -306,17 +311,18 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
return Vec::new();
}

let mut routing_nodes: Vec<RoutingNode> = self.routing_nodes.load().as_ref().clone();
let mut routing_candidates: Vec<RoutingCandidateNode> =
self.routing_candidates.load().as_ref().clone();

// Limit the number of returned nodes to the number of available nodes
let n = std::cmp::min(n, routing_nodes.len());
let n = std::cmp::min(n, routing_candidates.len());
let mut nodes = Vec::with_capacity(n);
let mut rng = rand::thread_rng();

for _ in 0..n {
let rand_num = rng.gen::<f64>();
if let Some(idx) = weighted_sample(routing_nodes.as_slice(), rand_num) {
let removed_node = routing_nodes.swap_remove(idx);
if let Some(idx) = weighted_sample(routing_candidates.as_slice(), rand_num) {
let removed_node = routing_candidates.swap_remove(idx);
nodes.push(removed_node.node);
}
}
Expand Down Expand Up @@ -347,7 +353,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
}

if has_changes {
self.publish_routing_nodes();
self.publish_routing_candidates();
}

has_changes
Expand All @@ -370,15 +376,15 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
self.use_availability_penalty,
);

self.publish_routing_nodes();
self.publish_routing_candidates();

true
}

fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(
self.existing_nodes.len(),
Some(self.routing_nodes.load().len()),
Some(self.routing_candidates.load().len()),
)
}
}
Expand All @@ -397,7 +403,7 @@ mod tests {
snapshot::{
latency_based_routing::{
compute_score, weighted_sample, LatencyRoutingSnapshot, NodeMetrics,
RoutingNode,
RoutingCandidateNode,
},
routing_snapshot::RoutingSnapshot,
},
Expand Down Expand Up @@ -441,40 +447,38 @@ mod tests {
.set_availability_penalty(false);
let node = Node::new("api1.com").unwrap();
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
snapshot
.existing_nodes
.insert(node.clone(), NodeMetrics::new(2));
snapshot.sync_nodes(&[node.clone()]);
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0)));
// Check first update
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap();
let metrics = snapshot.existing_nodes.get(&node).unwrap();
assert_eq!(metrics.score, (2.0 / 1.0) / 2.0);
assert_eq!(snapshot.next_node().unwrap(), node);
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(1)));
// Check second update
let health = HealthCheckStatus::new(Some(Duration::from_secs(2)));
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap();
let metrics = snapshot.existing_nodes.get(&node).unwrap();
assert_eq!(metrics.score, (2.0 / 2.0 + 1.0 / 1.0) / 3.0);
// Check third update
let health = HealthCheckStatus::new(Some(Duration::from_secs(3)));
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap();
assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0);
// Check forth update with none
// Check third update with none
let health = HealthCheckStatus::new(None);
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let (_, metrics) = snapshot.existing_nodes.iter().next().unwrap();
assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0);
let metrics = snapshot.existing_nodes.get(&node).unwrap();
assert_eq!(metrics.score, (2.0 / 2.0 + 1.0 / 1.0) / 3.0);
assert!(!snapshot.has_nodes());
assert_eq!(snapshot.existing_nodes.len(), 1);
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0)));
// Check fourth update
let health = HealthCheckStatus::new(Some(Duration::from_secs(3)));
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let metrics = snapshot.existing_nodes.get(&node).unwrap();
assert_eq!(metrics.score, (2.0 / 3.0 + 1.0 / 2.0) / 3.0);
}

#[test]
Expand Down Expand Up @@ -511,6 +515,13 @@ mod tests {
keys.sort_by(|a, b| a.domain().cmp(b.domain()));
assert_eq!(keys, vec![&node_2, &node_3]);
assert!(!snapshot.has_nodes());
// Sync with [node_2, node_3] again
let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]);
assert!(!nodes_changed);
let mut keys = snapshot.existing_nodes.keys().collect::<Vec<_>>();
keys.sort_by(|a, b| a.domain().cmp(b.domain()));
assert_eq!(keys, vec![&node_2, &node_3]);
assert!(!snapshot.has_nodes());
// Sync with []
let nodes_changed = snapshot.sync_nodes(&[]);
assert!(nodes_changed);
Expand All @@ -526,11 +537,11 @@ mod tests {
fn test_weighted_sample() {
let node = Node::new("api1.com").unwrap();
// Case 1: empty array
let arr: &[RoutingNode] = &[];
let arr: &[RoutingCandidateNode] = &[];
let idx = weighted_sample(arr, 0.5);
assert_eq!(idx, None);
// Case 2: single element in array
let arr = &[RoutingNode::new(node.clone(), 1.0)];
let arr = &[RoutingCandidateNode::new(node.clone(), 1.0)];
let idx = weighted_sample(arr, 0.0);
assert_eq!(idx, Some(0));
let idx = weighted_sample(arr, 1.0);
Expand All @@ -542,8 +553,8 @@ mod tests {
assert_eq!(idx, None);
// Case 3: two elements in array (second element has twice the weight of the first)
let arr = &[
RoutingNode::new(node.clone(), 1.0),
RoutingNode::new(node.clone(), 2.0),
RoutingCandidateNode::new(node.clone(), 1.0),
RoutingCandidateNode::new(node.clone(), 2.0),
]; // prefixed_sum = [1.0, 3.0]
let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0
assert_eq!(idx, Some(0));
Expand All @@ -560,10 +571,10 @@ mod tests {
assert_eq!(idx, None);
// Case 4: four elements in array
let arr = &[
RoutingNode::new(node.clone(), 1.0),
RoutingNode::new(node.clone(), 2.0),
RoutingNode::new(node.clone(), 1.5),
RoutingNode::new(node.clone(), 2.5),
RoutingCandidateNode::new(node.clone(), 1.0),
RoutingCandidateNode::new(node.clone(), 2.0),
RoutingCandidateNode::new(node.clone(), 1.5),
RoutingCandidateNode::new(node.clone(), 2.5),
]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0
assert_eq!(idx, Some(0)); // probability ~0.14
Expand Down Expand Up @@ -689,7 +700,7 @@ mod tests {
(node_3, metrics_3),
(node_4, metrics_4),
]);
snapshot.publish_routing_nodes();
snapshot.publish_routing_candidates();
let mut stats = HashMap::new();
let experiments = 30;
let select_nodes_count = 1;
Expand Down

0 comments on commit 1625ee3

Please sign in to comment.