Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
alpeb committed Dec 29, 2023
1 parent d3de530 commit 17e300c
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 25 deletions.
136 changes: 117 additions & 19 deletions cni-repair-controller/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use futures::{Stream, StreamExt};
use k8s_openapi::api::core::v1::{ObjectReference, Pod};
use kube::{
api::DeleteParams,
runtime::{
events::{Event, EventType, Recorder, Reporter},
watcher,
},
Client, Error, Resource, ResourceExt,
};
use kubert::Runtime;
use prometheus_client::{metrics::counter::Counter, registry::Registry};
use prometheus_client::{
metrics::{counter::Counter, histogram::Histogram},
registry::Registry,
};
use tokio::sync::mpsc::{self, error::TrySendError, Receiver, Sender};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, Instant};

// ERRNO 95: Operation not supported
const UNSUCCESSFUL_EXIT_CODE: i32 = 95;
Expand All @@ -26,7 +31,13 @@ const EVENT_REASON: &str = "LinkerdCNINotConfigured";
#[derive(Clone, Debug)]
pub struct Metrics {
queue_overflow: Counter<u64>,
deleted_pods: Counter<u64>,
pods_deleted: Counter<u64>,
pods_delete_latency_seconds: Histogram,
pods_delete_errors: Counter<u64>,
pods_delete_timeouts: Counter<u64>,
events_publish_latency_seconds: Histogram,
events_publish_errors: Counter<u64>,
events_publish_timeouts: Counter<u64>,
}

pub fn run(
Expand Down Expand Up @@ -83,7 +94,10 @@ async fn process_events(
match tx.try_send(object_ref) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
tracing::warn!(%namespace, %name, "Dropped event (channel full)");
// If a pod is in a failed state, it should continually be
// reported to be in CrashLoopBackoff; so it will naturally
// be retried.
tracing::debug!(%namespace, %name, "Dropped event (channel full)");
metrics.queue_overflow.inc();
}
Err(TrySendError::Closed(_)) => return,
Expand All @@ -103,21 +117,50 @@ async fn process_pods(
let namespace = object_ref.namespace.clone().unwrap_or_default();
let name = object_ref.name.clone().unwrap_or_default();
let pods = kube::Api::<Pod>::namespaced(client.clone(), &namespace);
let delete_res = pods.delete(&name, &Default::default()).await;
match delete_res {
Ok(_) => {
tracing::info!(%namespace, %name, "Deleting pod");
metrics.deleted_pods.inc();
if let Err(err) =
publish_k8s_event(client.clone(), controller_pod_name.clone(), object_ref).await
{
tracing::warn!(%err, %namespace, %name, "Error publishing event");
}
let delete_params = DeleteParams {
grace_period_seconds: Some(0),
..Default::default()
};

let t0 = Instant::now();
let deleted = tokio::select! {
res = pods.delete(&name, &delete_params) => res,
_ = time::sleep(Duration::from_secs(1)) => {
tracing::warn!(%namespace, %name, "Pod deletion timed out");
metrics.pods_delete_timeouts.inc();
continue;
}
Err(err) => {
tracing::warn!(%err, %namespace, %name, "Error deleting pod")
};
if let Err(err) = deleted {
tracing::warn!(%err, %namespace, %name, "Error deleting pod");
metrics.pods_delete_errors.inc();
continue;
}
let latency = time::Instant::now() - t0;
tracing::info!(%namespace, %name, "Deleted pod");
metrics.pods_deleted.inc();
metrics
.pods_delete_latency_seconds
.observe(latency.as_secs_f64());

let t0 = Instant::now();
let event = tokio::select! {
res = publish_k8s_event(client.clone(), controller_pod_name.clone(), object_ref) => res,
_ = time::sleep(Duration::from_secs(1)) => {
tracing::warn!(%namespace, %name, "Event publishing timed out");
metrics.events_publish_timeouts.inc();
continue;
}
};
if let Err(err) = event {
tracing::warn!(%err, %namespace, %name, "Error publishing event");
metrics.events_publish_errors.inc();
continue;
}
let latency = time::Instant::now() - t0;
metrics
.events_publish_latency_seconds
.observe(latency.as_secs_f64());
}
}

Expand All @@ -144,22 +187,77 @@ async fn publish_k8s_event(

impl Metrics {
pub fn register(prom: &mut Registry) -> Self {
// Default values from go client
// (https://github.com/prometheus/client_golang/blob/5d584e2717ef525673736d72cd1d12e304f243d7/prometheus/histogram.go#L68)
let histogram_buckets = [
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];

let queue_overflow = Counter::<u64>::default();
prom.register(
"queue_overflow",
"Incremented whenever the event processing queue overflows",
queue_overflow.clone(),
);
let deleted_pods = Counter::<u64>::default();

let pods_deleted = Counter::<u64>::default();
prom.register(
"deleted",
"pods_deleted",
"Number of pods deleted by the controller",
deleted_pods.clone(),
pods_deleted.clone(),
);

let pods_delete_latency_seconds = Histogram::new(histogram_buckets.iter().cloned());
prom.register(
"pods_delete_latency_seconds",
"Pod deletion latency distribution",
pods_delete_latency_seconds.clone(),
);

let pods_delete_errors = Counter::<u64>::default();
prom.register(
"pods_delete_errors",
"Incremented whenever the pod deletion call errors out",
pods_delete_errors.clone(),
);

let pods_delete_timeouts = Counter::<u64>::default();
prom.register(
"pods_delete_timeout",
"Incremented whenever the pod deletion call times out",
pods_delete_timeouts.clone(),
);

let events_publish_latency_seconds = Histogram::new(histogram_buckets.iter().cloned());
prom.register(
"events_publish_latency_seconds",
"Events publish latency distribution",
events_publish_latency_seconds.clone(),
);

let events_publish_errors = Counter::<u64>::default();
prom.register(
"events_publish_errors",
"Incremented whenever the event publishing call errors out",
events_publish_errors.clone(),
);

let events_publish_timeouts = Counter::<u64>::default();
prom.register(
"events_publish_timeouts",
"Incremented whenever the event publishing call times out",
events_publish_timeouts.clone(),
);

Self {
queue_overflow,
deleted_pods,
pods_deleted,
pods_delete_latency_seconds,
pods_delete_errors,
pods_delete_timeouts,
events_publish_latency_seconds,
events_publish_errors,
events_publish_timeouts,
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions cni-repair-controller/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{bail, Result};
use anyhow::Result;
use clap::Parser;
use kubert::Runtime;
use linkerd_cni_repair_controller::Metrics;
Expand Down Expand Up @@ -60,11 +60,8 @@ async fn main() -> Result<()> {
let handle =
linkerd_cni_repair_controller::run(&mut rt, node_name, controller_pod_name, metrics);

// Block the main thread on the shutdown signal. Once it fires, wait for the background tasks to
// complete before exiting.
if rt.run().await.is_err() {
bail!("aborted");
}
// Block the main thread on the shutdown signal
rt.run().await?;

handle.abort();
Ok(())
Expand Down

0 comments on commit 17e300c

Please sign in to comment.