Skip to content

Commit

Permalink
Metrics (#11)
Browse files Browse the repository at this point in the history
* add metrics and kube events

* fix chrono

* fix autdated

* add helm for metrics
  • Loading branch information
arkadiuszspiewak authored May 17, 2023
1 parent f0428a2 commit 0759fd6
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 96 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "kyotu-project-operator"
path = "src/main.rs"

[lib]
name = "controller"
path = "src/lib.rs"

[dependencies]
git2 = "0.17.1"
tera = "1.18.1"
Expand All @@ -27,9 +35,11 @@ serde_yaml = "0.9.21"
actix-web = "4.3.1"
dotenv = "0.15.0"
tracing-actix-web = "0.7.4"
reqwest = { version = "0.11.17", features = ["json"] }
reqwest = { version = "0.11.18", features = ["json"] }
mockito = "1.0.2"
base64 = "0.21.0"
validator = { version="0.16.0", features=["derive"] }
lazy_static = "1.4.0"
regex = "1.5.4"
prometheus = "0.13.3"
chrono = { version = "0.4.19", default-features = false, features = ["serde"] }
11 changes: 8 additions & 3 deletions charts/kyotu-project-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ spec:
{{- include "kyotu-project-operator.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.podAnnotations }}
{{ toYaml .Values.podAnnotations | nindent 8 }}
{{- end }}
{{- if .Values.config.metrics.enabled }}
prometheus.io/scrape: "true"
prometheus.io/port: "{{ .Values.config.metrics.port }}"
prometheus.io/path: "{{ .Values.config.metrics.path }}"
{{- end }}
labels:
{{- include "kyotu-project-operator.selectorLabels" . | nindent 8 }}
spec:
Expand Down
5 changes: 5 additions & 0 deletions charts/kyotu-project-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ config:
sshKeySecretKey: id_rsa
logLevel: debug

metrics:
enabled: true
port: 8080
path: /metrics

service:
type: ClusterIP
port: 8080
Expand Down
173 changes: 145 additions & 28 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
use kube::Resource;
use kube::{client::Client, runtime::controller::Action};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kube::{
api::Api,
client::Client,
runtime::{
controller::{Action, Controller},
events::{Event, EventType},
events::{Recorder, Reporter},
watcher::Config,
},
Resource,
};
use serde::Serialize;
use std::path::Path;
use std::sync::Arc;
use tokio::time::Duration;
use tokio::{sync::RwLock, time::Duration};
use tracing::info;

use crate::finalizer;
use crate::gitlab::Gitlab;
Expand All @@ -11,17 +24,16 @@ use crate::project::{create_project, delete_project};
use crate::project_crd::Project;
use crate::rbacs::{add_rbacs, remove_rbacs};
use crate::secret::{create_secret, delete_secret};
use crate::{Error, Metrics, Result};

pub struct ContextData {
#[derive(Clone)]
pub struct Context {
pub client: Client,
pub gitlab: Gitlab,
}

impl ContextData {
#[allow(dead_code)]
pub fn new(client: Client, gitlab: Gitlab) -> Self {
Self { client, gitlab }
}
/// Diagnostics read by the web server
pub diagnostics: Arc<RwLock<Diagnostics>>,
/// Prometheus metrics
pub metrics: Metrics,
}

enum ProjectAction {
Expand All @@ -30,7 +42,10 @@ enum ProjectAction {
NoOp,
}

pub async fn reconcile(project: Arc<Project>, context: Arc<ContextData>) -> Result<Action, Error> {
pub async fn reconcile(project: Arc<Project>, context: Arc<Context>) -> Result<Action> {
let _timer = context.metrics.count_and_measure();
context.diagnostics.write().await.last_event = Utc::now();

let client: Client = context.client.clone();
let gitlab = context.gitlab.clone();

Expand All @@ -56,12 +71,18 @@ pub async fn reconcile(project: Arc<Project>, context: Arc<ContextData>) -> Resu
#[allow(clippy::needless_return)]
return match determine_action(&project) {
ProjectAction::Create => {
let recorder = context
.diagnostics
.read()
.await
.recorder(client.clone(), &project);
finalizer::add(
client.clone(),
project.metadata.name.as_ref().unwrap(),
&namespace,
)
.await?;
.await
.unwrap();

match create_namespace(client.clone(), &project_name).await {
Ok(_) => {}
Expand Down Expand Up @@ -103,9 +124,24 @@ pub async fn reconcile(project: Arc<Project>, context: Arc<ContextData>) -> Resu
.await
.unwrap();

recorder
.publish(Event {
type_: EventType::Normal,
reason: "Create".into(),
note: Some(format!("Creating `{project_name}`")),
action: "Creating".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;
Ok(Action::requeue(Duration::from_secs(10)))
}
ProjectAction::Delete => {
let recorder = context
.diagnostics
.read()
.await
.recorder(context.client.clone(), &project);
remove_rbacs(&project_name, flux_root, &google_group)
.await
.unwrap();
Expand All @@ -119,13 +155,53 @@ pub async fn reconcile(project: Arc<Project>, context: Arc<ContextData>) -> Resu
delete_namespace(client.clone(), &project_name)
.await
.unwrap();
finalizer::delete(client, project.metadata.name.as_ref().unwrap(), &namespace).await?;
finalizer::delete(client, project.metadata.name.as_ref().unwrap(), &namespace)
.await
.unwrap();

recorder
.publish(Event {
type_: EventType::Normal,
reason: "DeleteRequested".into(),
note: Some(format!("Delete `{}`", project_name)),
action: "Deleting".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;
Ok(Action::await_change())
}
ProjectAction::NoOp => Ok(Action::requeue(Duration::from_secs(10))),
};
}

pub async fn run(state: State) {
let client = Client::try_default()
.await
.expect("Failed to create client");

let crd_api: Api<Project> = Api::all(client.clone());

let gitlab_url = std::env::var("GITLAB_URL").expect("GITLAB_URL not set");
let gitlab_token = std::env::var("GITLAB_TOKEN").expect("GITLAB_TOKEN not set");

let gitlab = Gitlab::new(gitlab_url, gitlab_token);

Controller::new(crd_api.clone(), Config::default().any_semantic())
.run(reconcile, on_error, state.to_context(client, gitlab))
.for_each(|reconciliation_result| async move {
match reconciliation_result {
Ok(echo_resource) => {
info!("Reconciliation successful. Resource: {:?}", echo_resource);
}
Err(reconciliation_err) => {
eprintln!("Reconciliation error: {:?}", reconciliation_err)
}
}
})
.await;
}

#[allow(clippy::needless_return)]
//determine action to take based on the state of the echo CRD
fn determine_action(project: &Project) -> ProjectAction {
Expand All @@ -150,21 +226,62 @@ fn determine_action(project: &Project) -> ProjectAction {
}

//error handling
pub fn on_error(echo: Arc<Project>, error: &Error, _context: Arc<ContextData>) -> Action {
eprintln!("Reconciliation error:\n{:?}.\n{:?}", error, echo);
pub fn on_error(proj: Arc<Project>, error: &Error, context: Arc<Context>) -> Action {
eprintln!("Reconciliation error:\n{:?}.\n{:?}", error, proj);
context.metrics.reconcile_failure(&proj, error);
Action::requeue(Duration::from_secs(5))
}

//error enum
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Any error originating from the `kube-rs` crate
#[error("Kubernetes reported error: {source}")]
KubeError {
#[from]
source: kube::Error,
},
/// Error in user input or Echo resource definition, typically missing fields.
#[error("Invalid Project CRD: {0}")]
UserInputError(String),
/// State shared between the controller and the web server
#[derive(Clone, Default)]
pub struct State {
/// Diagnostics populated by the reconciler
diagnostics: Arc<RwLock<Diagnostics>>,
/// Metrics registry
registry: prometheus::Registry,
}

/// State wrapper around the controller outputs for the web server
impl State {
/// Metrics getter
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}

/// State getter
pub async fn diagnostics(&self) -> Diagnostics {
self.diagnostics.read().await.clone()
}

// Create a Controller Context that can update State
pub fn to_context(&self, client: Client, gitlab: Gitlab) -> Arc<Context> {
Arc::new(Context {
client,
gitlab,
metrics: Metrics::default().register(&self.registry).unwrap(),
diagnostics: self.diagnostics.clone(),
})
}
}

/// Diagnostics to be exposed by the web server
#[derive(Clone, Serialize)]
pub struct Diagnostics {
#[serde(deserialize_with = "from_ts")]
pub last_event: DateTime<Utc>,
#[serde(skip)]
pub reporter: Reporter,
}
impl Default for Diagnostics {
fn default() -> Self {
Self {
last_event: Utc::now(),
reporter: "kyotu-project-operator".into(),
}
}
}
impl Diagnostics {
fn recorder(&self, client: Client, proj: &Project) -> Recorder {
Recorder::new(client, self.reporter.clone(), proj.object_ref(&()))
}
}
9 changes: 1 addition & 8 deletions src/gitlab.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use reqwest::Client;
use serde_json::json;

#[derive(Clone)]
pub struct Gitlab {
pub client: Client,
pub gitlab_addr: String,
Expand All @@ -25,14 +26,6 @@ impl Gitlab {
}
}

pub fn clone(&self) -> Self {
Self {
client: self.client.clone(),
gitlab_addr: self.gitlab_addr.clone(),
token: self.token.clone(),
}
}

pub async fn get_group_by_name(&self, name: &str) -> Result<Option<u64>, reqwest::Error> {
let url = format!("{}/api/v4/groups", &self.gitlab_addr);
let res = self
Expand Down
Loading

0 comments on commit 0759fd6

Please sign in to comment.