diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index ce7d66cd..4de192f5 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,24 +1,30 @@ +pub(crate)mod traits; + use std::{collections::BTreeMap, process, sync::OnceLock, time::Duration}; use axum::extract::ws::WebSocket; -use k8s_openapi::api::core::v1::{ - Container as K8sContainer, ContainerPort, EnvVar, Pod, PodSpec, Service, ServicePort, - ServiceSpec, +use k8s_openapi::{ + api::core::v1::{ + Container as K8sContainer, ContainerPort, EnvVar, Namespace, Pod, PodSpec, Service, + ServicePort, ServiceSpec, + }, + apimachinery::pkg::apis::meta::v1::ObjectMeta, }; use kube::{ api::{Api, DeleteParams, ListParams, PostParams, ResourceExt}, config::Kubeconfig, - runtime::wait::conditions, + runtime::{reflector::Lookup, wait::conditions}, Client as K8sClient, Config, }; use once_cell::sync::OnceCell; use tokio_util::codec::Framed; use tracing::{error, info}; +use crate::cluster::traits::ClusterError; static K8S_CLIENT: OnceCell = OnceCell::new(); -pub fn get_k8s_client() -> &'static K8sClient { - K8S_CLIENT.get().unwrap() +pub fn get_k8s_client() -> K8sClient { + K8S_CLIENT.get().unwrap().clone() } pub async fn init() { @@ -38,25 +44,41 @@ pub async fn init() { } let _ = K8S_CLIENT.set(client); info!("Kubernetes client initialized successfully."); + + let namespace_api: Api = Api::all(get_k8s_client().clone()); + let namespaces = namespace_api.list(&ListParams::default()).await.unwrap(); + if !namespaces.items.iter().any(|namespace| { + namespace.metadata.name == Some(crate::env::get_env().clone().cluster.namespace) + }) { + let namespace = Namespace { + metadata: ObjectMeta { + name: Some(crate::env::get_env().clone().cluster.namespace), + ..Default::default() + }, + ..Default::default() + }; + let _ = namespace_api + .create(&PostParams::default(), &namespace) + .await; + info!("Namespace is created successfully.") + } } pub async fn create( name: String, challenge: crate::db::entity::challenge::Model, injected_flag: crate::db::entity::challenge::Flag, -) -> Result, anyhow::Error> { - let client = get_k8s_client().clone(); - - let metadata = k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta { +) -> Result, ClusterError> { + let metadata = ObjectMeta { name: Some(name.clone()), - labels: Some(BTreeMap::from([( - String::from("cds/resource_id"), - name.clone(), - )])), + labels: Some(BTreeMap::from([ + (String::from("cds/app"), String::from("challenges")), + (String::from("cds/resource_id"), name.clone()), + ])), ..Default::default() }; let pod_api: Api = Api::namespaced( - client.clone(), + get_k8s_client(), crate::env::get_env().cluster.namespace.as_str(), ); @@ -118,10 +140,10 @@ pub async fn create( entry: None, }); } - }, + } false => { let service_api: Api = Api::namespaced( - client.clone(), + get_k8s_client(), crate::env::get_env().cluster.namespace.as_str(), ); let service_ports: Vec = challenge @@ -178,15 +200,20 @@ pub async fn create( pub async fn delete(name: String) { let pod_api: Api = Api::namespaced( - get_k8s_client().clone(), + get_k8s_client(), crate::env::get_env().cluster.namespace.as_str(), ); let _ = pod_api.delete(&name, &DeleteParams::default()).await; + let service_api: Api = Api::namespaced( + get_k8s_client(), + crate::env::get_env().cluster.namespace.as_str(), + ); + let _ = service_api.delete(&name, &DeleteParams::default()).await; } -pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), anyhow::Error> { +pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), ClusterError> { let pod_api: Api = Api::namespaced( - get_k8s_client().clone(), + get_k8s_client(), crate::env::get_env().cluster.namespace.as_str(), ); let mut pf = pod_api.portforward(&name, &[port]).await?; diff --git a/src/cluster/traits.rs b/src/cluster/traits.rs new file mode 100644 index 00000000..53d336a9 --- /dev/null +++ b/src/cluster/traits.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ClusterError { + #[error("kube error: {0}")] + KubeError(#[from] kube::Error), + #[error("failed to infer config: {0}")] + InferConfigError(#[from] kube::config::InferConfigError), + #[error("failed to load kube config: {0}")] + KubeConfigError(#[from] kube::config::KubeconfigError), + #[error("kube runtime wait error: {0}")] + KubeRuntimeWaitError(#[from] kube::runtime::wait::Error), + #[error("proxy error: {0}")] + ProxyError(#[from] wsrx::Error), +} diff --git a/src/web/router/api/pod/mod.rs b/src/web/router/api/pod/mod.rs index f8bf52b5..073d2644 100644 --- a/src/web/router/api/pod/mod.rs +++ b/src/web/router/api/pod/mod.rs @@ -114,8 +114,7 @@ pub async fn create( crate::db::entity::challenge::Model::from(challenge.clone()), injected_flag.clone(), ) - .await - .map_err(|err| WebError::OtherError(anyhow!("{:?}", err)))?; + .await?; let pod = crate::db::entity::pod::ActiveModel { name: Set(ctn_name), diff --git a/src/web/traits.rs b/src/web/traits.rs index b4e2a0e5..2cc4d024 100644 --- a/src/web/traits.rs +++ b/src/web/traits.rs @@ -72,6 +72,8 @@ pub enum WebError { MediaError(#[from] crate::media::traits::MediaError), #[error("queue error: {0}")] QueueError(#[from] crate::queue::traits::QueueError), + #[error("cluster error: {0}")] + ClusterError(#[from] crate::cluster::traits::ClusterError), #[error(transparent)] OtherError(#[from] anyhow::Error), } @@ -92,6 +94,7 @@ impl IntoResponse for WebError { }, Self::MediaError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), Self::QueueError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), + Self::ClusterError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), Self::OtherError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), };