Skip to content

Commit

Permalink
wip: cluster error
Browse files Browse the repository at this point in the history
  • Loading branch information
ElaBosak233 committed Nov 25, 2024
1 parent 20c5be0 commit 034ce68
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 22 deletions.
67 changes: 47 additions & 20 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
pub(crate)mod traits;

use std::{collections::BTreeMap, process, sync::OnceLock, time::Duration};

use axum::extract::ws::WebSocket;
use k8s_openapi::api::core::v1::{
Container as K8sContainer, ContainerPort, EnvVar, Pod, PodSpec, Service, ServicePort,
ServiceSpec,
use k8s_openapi::{
api::core::v1::{
Container as K8sContainer, ContainerPort, EnvVar, Namespace, Pod, PodSpec, Service,
ServicePort, ServiceSpec,
},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt},
config::Kubeconfig,
runtime::wait::conditions,
runtime::{reflector::Lookup, wait::conditions},
Client as K8sClient, Config,
};
use once_cell::sync::OnceCell;
use tokio_util::codec::Framed;
use tracing::{error, info};
use crate::cluster::traits::ClusterError;

static K8S_CLIENT: OnceCell<K8sClient> = OnceCell::new();

pub fn get_k8s_client() -> &'static K8sClient {
K8S_CLIENT.get().unwrap()
pub fn get_k8s_client() -> K8sClient {
K8S_CLIENT.get().unwrap().clone()
}

pub async fn init() {
Expand All @@ -38,25 +44,41 @@ pub async fn init() {
}
let _ = K8S_CLIENT.set(client);
info!("Kubernetes client initialized successfully.");

let namespace_api: Api<Namespace> = Api::all(get_k8s_client().clone());
let namespaces = namespace_api.list(&ListParams::default()).await.unwrap();
if !namespaces.items.iter().any(|namespace| {
namespace.metadata.name == Some(crate::env::get_env().clone().cluster.namespace)
}) {
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(crate::env::get_env().clone().cluster.namespace),
..Default::default()
},
..Default::default()
};
let _ = namespace_api
.create(&PostParams::default(), &namespace)
.await;
info!("Namespace is created successfully.")
}
}

pub async fn create(
name: String, challenge: crate::db::entity::challenge::Model,
injected_flag: crate::db::entity::challenge::Flag,
) -> Result<Vec<crate::db::entity::pod::Nat>, anyhow::Error> {
let client = get_k8s_client().clone();

let metadata = k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
) -> Result<Vec<crate::db::entity::pod::Nat>, ClusterError> {
let metadata = ObjectMeta {
name: Some(name.clone()),
labels: Some(BTreeMap::from([(
String::from("cds/resource_id"),
name.clone(),
)])),
labels: Some(BTreeMap::from([
(String::from("cds/app"), String::from("challenges")),
(String::from("cds/resource_id"), name.clone()),
])),
..Default::default()
};

let pod_api: Api<Pod> = Api::namespaced(
client.clone(),
get_k8s_client(),
crate::env::get_env().cluster.namespace.as_str(),
);

Expand Down Expand Up @@ -118,10 +140,10 @@ pub async fn create(
entry: None,
});
}
},
}
false => {
let service_api: Api<Service> = Api::namespaced(
client.clone(),
get_k8s_client(),
crate::env::get_env().cluster.namespace.as_str(),
);
let service_ports: Vec<ServicePort> = challenge
Expand Down Expand Up @@ -178,15 +200,20 @@ pub async fn create(

pub async fn delete(name: String) {
let pod_api: Api<Pod> = Api::namespaced(
get_k8s_client().clone(),
get_k8s_client(),
crate::env::get_env().cluster.namespace.as_str(),
);
let _ = pod_api.delete(&name, &DeleteParams::default()).await;
let service_api: Api<Service> = Api::namespaced(
get_k8s_client(),
crate::env::get_env().cluster.namespace.as_str(),
);
let _ = service_api.delete(&name, &DeleteParams::default()).await;
}

pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), anyhow::Error> {
pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), ClusterError> {
let pod_api: Api<Pod> = Api::namespaced(
get_k8s_client().clone(),
get_k8s_client(),
crate::env::get_env().cluster.namespace.as_str(),
);
let mut pf = pod_api.portforward(&name, &[port]).await?;
Expand Down
15 changes: 15 additions & 0 deletions src/cluster/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ClusterError {
#[error("kube error: {0}")]
KubeError(#[from] kube::Error),
#[error("failed to infer config: {0}")]
InferConfigError(#[from] kube::config::InferConfigError),
#[error("failed to load kube config: {0}")]
KubeConfigError(#[from] kube::config::KubeconfigError),
#[error("kube runtime wait error: {0}")]
KubeRuntimeWaitError(#[from] kube::runtime::wait::Error),
#[error("proxy error: {0}")]
ProxyError(#[from] wsrx::Error),
}
3 changes: 1 addition & 2 deletions src/web/router/api/pod/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ pub async fn create(
crate::db::entity::challenge::Model::from(challenge.clone()),
injected_flag.clone(),
)
.await
.map_err(|err| WebError::OtherError(anyhow!("{:?}", err)))?;
.await?;

let pod = crate::db::entity::pod::ActiveModel {
name: Set(ctn_name),
Expand Down
3 changes: 3 additions & 0 deletions src/web/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub enum WebError {
MediaError(#[from] crate::media::traits::MediaError),
#[error("queue error: {0}")]
QueueError(#[from] crate::queue::traits::QueueError),
#[error("cluster error: {0}")]
ClusterError(#[from] crate::cluster::traits::ClusterError),
#[error(transparent)]
OtherError(#[from] anyhow::Error),
}
Expand All @@ -92,6 +94,7 @@ impl IntoResponse for WebError {
},
Self::MediaError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
Self::QueueError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
Self::ClusterError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
Self::OtherError(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};

Expand Down

0 comments on commit 034ce68

Please sign in to comment.