Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run one simulation at a time #109

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cli/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::{
Mutex,
};

use chrono::Utc;
use simkube::k8s::ApiSet;
use simkube::prelude::*;
use simkube::store::TraceStore;
Expand Down Expand Up @@ -67,7 +66,7 @@ pub async fn cmd(args: &Args) -> EmptyResult {

println!("Exporting snapshot data from store...");
let filters = ExportFilters::new(args.excluded_namespaces.clone(), vec![], true);
let start_ts = Utc::now().timestamp();
let start_ts = UtcClock.now_ts();
let end_ts = start_ts + 1;
let data = store.lock().unwrap().export(start_ts, end_ts, &filters)?;

Expand Down
3 changes: 1 addition & 2 deletions ctrl/cert_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use serde::{
use simkube::k8s::build_object_meta;
use simkube::macros::*;
use simkube::prelude::*;
use tracing::*;

use super::*;

Expand Down Expand Up @@ -87,7 +86,7 @@ pub(super) async fn create_certificate_if_not_present(
DRIVER_CERT_NAME, ctx.opts.cert_manager_issuer,
);
let obj = PartialCertificate {
metadata: build_object_meta(&ctx.driver_ns, DRIVER_CERT_NAME, &ctx.name, owner)?,
metadata: build_object_meta(&ctx.driver_ns, DRIVER_CERT_NAME, &ctx.name, owner),
spec: PartialCertificateSpec {
secret_name: DRIVER_CERT_NAME.into(),
secret_template: Some(CertificateSecretTemplate {
Expand Down
40 changes: 27 additions & 13 deletions ctrl/controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env;
use std::ops::Deref;
use std::sync::Arc;

Expand All @@ -17,11 +18,14 @@ use kube::api::{
};
use kube::error::ErrorResponse;
use kube::runtime::controller::Action;
use kube::Error::Api;
use kube::ResourceExt;
use serde_json::json;
use simkube::api::v1::build_simulation_root;
use simkube::errors::*;
use simkube::k8s::{
try_claim_lease,
LeaseState,
};
use simkube::metrics::api::*;
use simkube::prelude::*;
use tokio::runtime::Handle;
Expand All @@ -31,8 +35,8 @@ use tokio::time::Duration;
use crate::objects::*;
use crate::*;

pub(super) const REQUEUE_DURATION: Duration = Duration::from_secs(5);
const REQUEUE_ERROR_DURATION: Duration = Duration::from_secs(30);
pub(super) const REQUEUE_DURATION: Duration = Duration::from_secs(RETRY_DELAY_SECONDS as u64);
const REQUEUE_ERROR_DURATION: Duration = Duration::from_secs(ERROR_RETRY_DELAY_SECONDS as u64);
pub(super) const JOB_STATUS_CONDITION_COMPLETE: &str = "Complete";
pub(super) const JOB_STATUS_CONDITION_FAILED: &str = "Failed";

Expand All @@ -41,7 +45,7 @@ async fn setup_sim_metaroot(ctx: &SimulationContext, sim: &Simulation) -> anyhow
match roots_api.get_opt(&ctx.metaroot_name).await? {
None => {
info!("creating Simulation MetaRoot");
let metaroot = build_simulation_root(&ctx.metaroot_name, sim)?;
let metaroot = build_simulation_root(&ctx.metaroot_name, sim);
roots_api.create(&Default::default(), &metaroot).await.map_err(|e| e.into())
},
Some(metaroot) => Ok(metaroot),
Expand All @@ -51,8 +55,6 @@ async fn setup_sim_metaroot(ctx: &SimulationContext, sim: &Simulation) -> anyhow
pub(super) async fn fetch_driver_status(
ctx: &SimulationContext,
) -> anyhow::Result<(SimulationState, Option<DateTime<Utc>>, Option<DateTime<Utc>>)> {
// TODO should check if there are any other simulations running and block/wait until
// they're done before proceeding
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &ctx.driver_ns);
let (mut state, mut start_time, mut end_time) = (SimulationState::Initializing, None, None);

Expand Down Expand Up @@ -82,6 +84,7 @@ pub(super) async fn setup_driver(
ctx: &SimulationContext,
sim: &Simulation,
metaroot: &SimulationRoot,
ctrl_ns: &str,
) -> anyhow::Result<Action> {
info!("setting up simulation driver");

Expand All @@ -91,13 +94,23 @@ pub(super) async fn setup_driver(
bail!(SkControllerError::namespace_not_found(&sim.metrics_ns()));
};

match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns, Box::new(UtcClock)).await? {
LeaseState::Claimed => (),
LeaseState::WaitingForClaim(t) => {
info!("sleeping for {t} seconds");
return Ok(Action::requeue(Duration::from_secs(t as u64)));
},
LeaseState::Unknown => bail!("unknown lease state"),
}

// Create the namespaces
if ns_api.get_opt(&ctx.driver_ns).await?.is_none() {
info!("creating driver namespace {}", ctx.driver_ns);
let obj = build_driver_namespace(ctx, sim)?;
let obj = build_driver_namespace(ctx, sim);
ns_api.create(&Default::default(), &obj).await?;
};

// Set up the metrics collector
let mut prom_ready = false;
match &sim.spec.metrics_config {
Some(mc) => {
Expand All @@ -108,7 +121,7 @@ pub(super) async fn setup_driver(
match prom_api.get_opt(&ctx.prometheus_name).await? {
None => {
info!("creating Prometheus object {}/{}", sim.metrics_ns(), ctx.prometheus_name);
let obj = build_prometheus(&ctx.prometheus_name, sim, mc)?;
let obj = build_prometheus(&ctx.prometheus_name, sim, mc);
prom_api.create(&Default::default(), &obj).await?;
},
Some(prom) => {
Expand All @@ -130,7 +143,7 @@ pub(super) async fn setup_driver(
let driver_svc_api = kube::Api::<corev1::Service>::namespaced(ctx.client.clone(), &ctx.driver_ns);
if driver_svc_api.get_opt(&ctx.driver_svc).await?.is_none() {
info!("creating driver service {}", &ctx.driver_svc);
let obj = build_driver_service(ctx, metaroot)?;
let obj = build_driver_service(ctx, metaroot);
driver_svc_api.create(&Default::default(), &obj).await?;
}

Expand All @@ -157,15 +170,15 @@ pub(super) async fn setup_driver(
let webhook_api = kube::Api::<admissionv1::MutatingWebhookConfiguration>::all(ctx.client.clone());
if webhook_api.get_opt(&ctx.webhook_name).await?.is_none() {
info!("creating mutating webhook configuration {}", ctx.webhook_name);
let obj = build_mutating_webhook(ctx, metaroot)?;
let obj = build_mutating_webhook(ctx, metaroot);
webhook_api.create(&Default::default(), &obj).await?;
};

// Create the actual driver
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &ctx.driver_ns);
if jobs_api.get_opt(&ctx.driver_name).await?.is_none() {
info!("creating simulation driver {}", ctx.driver_name);
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name)?;
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name, ctrl_ns)?;
jobs_api.create(&Default::default(), &obj).await?;
}

Expand All @@ -183,7 +196,7 @@ pub(super) async fn cleanup(ctx: &SimulationContext, sim: &Simulation) {

info!("cleaning up prometheus resources");
if let Err(e) = prom_api.delete(&ctx.prometheus_name, &Default::default()).await {
if matches!(e, Api(ErrorResponse { code: 404, .. })) {
if matches!(e, kube::Error::Api(ErrorResponse { code: 404, .. })) {
warn!("prometheus object not found; maybe already cleaned up?");
} else {
error!("Error cleaning up Prometheus: {e:?}");
Expand Down Expand Up @@ -215,8 +228,9 @@ pub(crate) async fn reconcile(sim: Arc<Simulation>, ctx: Arc<SimulationContext>)
.await
.map_err(|e| anyhow!(e))?;

let ctrl_ns = env::var(CTRL_NS_ENV_VAR).map_err(|e| anyhow!(e))?;
match driver_state {
SimulationState::Initializing => setup_driver(&ctx, sim, &root).await.map_err(|e| e.into()),
SimulationState::Initializing => setup_driver(&ctx, sim, &root, &ctrl_ns).await.map_err(|e| e.into()),
SimulationState::Running => Ok(Action::await_change()),
SimulationState::Finished | SimulationState::Failed => {
cleanup(&ctx, sim).await;
Expand Down
1 change: 1 addition & 0 deletions ctrl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl SimulationContext {
SimulationContext {
client,
opts,

name: String::new(),
metaroot_name: String::new(),
driver_ns: String::new(),
Expand Down
53 changes: 27 additions & 26 deletions ctrl/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,15 @@ const PROM_COMPONENT_LABEL: &str = "prometheus";
const WEBHOOK_NAME: &str = "mutatepods.simkube.io";
const DRIVER_CERT_VOLUME: &str = "driver-cert";

pub(super) fn build_driver_namespace(ctx: &SimulationContext, sim: &Simulation) -> anyhow::Result<corev1::Namespace> {
pub(super) fn build_driver_namespace(ctx: &SimulationContext, sim: &Simulation) -> corev1::Namespace {
let owner = sim;
Ok(corev1::Namespace {
metadata: build_global_object_meta(&ctx.driver_ns, &ctx.name, owner)?,
corev1::Namespace {
metadata: build_global_object_meta(&ctx.driver_ns, &ctx.name, owner),
..Default::default()
})
}
}

pub(super) fn build_prometheus(
name: &str,
sim: &Simulation,
mc: &SimulationMetricsConfig,
) -> anyhow::Result<Prometheus> {
pub(super) fn build_prometheus(name: &str, sim: &Simulation, mc: &SimulationMetricsConfig) -> Prometheus {
// Configure the remote write endpoints; these _can_ be overridden by the user but set up some
// sane defaults so they don't have to.
let mut rw_cfgs = mc.remote_write_configs.clone();
Expand Down Expand Up @@ -83,8 +79,8 @@ pub(super) fn build_prometheus(
);

let owner = sim;
Ok(Prometheus {
metadata: build_object_meta(&sim.metrics_ns(), name, &sim.name_any(), owner)?,
Prometheus {
metadata: build_object_meta(&sim.metrics_ns(), name, &sim.name_any(), owner),
spec: PrometheusSpec {
image: Some(format!("quay.io/prometheus/prometheus:v{}", PROM_VERSION)),
pod_metadata: Some(PrometheusPodMetadata {
Expand All @@ -106,23 +102,23 @@ pub(super) fn build_prometheus(
..Default::default()
},
status: Default::default(),
})
}
}

pub(super) fn build_mutating_webhook(
ctx: &SimulationContext,
metaroot: &SimulationRoot,
) -> anyhow::Result<admissionv1::MutatingWebhookConfiguration> {
) -> admissionv1::MutatingWebhookConfiguration {
let owner = metaroot;
let mut metadata = build_global_object_meta(&ctx.webhook_name, &ctx.name, owner)?;
let mut metadata = build_global_object_meta(&ctx.webhook_name, &ctx.name, owner);
if ctx.opts.use_cert_manager {
metadata
.annotations
.get_or_insert(BTreeMap::new())
.insert("cert-manager.io/inject-ca-from".into(), format!("{}/{}", ctx.driver_ns, DRIVER_CERT_NAME));
}

Ok(admissionv1::MutatingWebhookConfiguration {
admissionv1::MutatingWebhookConfiguration {
metadata,
webhooks: Some(vec![admissionv1::MutatingWebhook {
admission_review_versions: vec!["v1".into()],
Expand All @@ -147,16 +143,13 @@ pub(super) fn build_mutating_webhook(
}]),
..Default::default()
}]),
})
}
}

pub(super) fn build_driver_service(
ctx: &SimulationContext,
metaroot: &SimulationRoot,
) -> anyhow::Result<corev1::Service> {
pub(super) fn build_driver_service(ctx: &SimulationContext, metaroot: &SimulationRoot) -> corev1::Service {
let owner = metaroot;
Ok(corev1::Service {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_svc, &ctx.name, owner)?,
corev1::Service {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_svc, &ctx.name, owner),
spec: Some(corev1::ServiceSpec {
ports: Some(vec![corev1::ServicePort {
port: ctx.opts.driver_port,
Expand All @@ -167,13 +160,14 @@ pub(super) fn build_driver_service(
..Default::default()
}),
..Default::default()
})
}
}

pub(super) fn build_driver_job(
ctx: &SimulationContext,
sim: &Simulation,
cert_secret_name: &str,
ctrl_ns: &str,
) -> anyhow::Result<batchv1::Job> {
let trace_url = Url::parse(&sim.spec.trace_path)?;
let (trace_vm, trace_volume, trace_mount_path) = match storage::get_scheme(&trace_url)? {
Expand All @@ -185,15 +179,15 @@ pub(super) fn build_driver_job(
let service_account = Some(env::var(POD_SVC_ACCOUNT_ENV_VAR)?);

Ok(batchv1::Job {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, sim)?,
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, sim),
spec: Some(batchv1::JobSpec {
backoff_limit: Some(0),
template: corev1::PodTemplateSpec {
spec: Some(corev1::PodSpec {
containers: vec![corev1::Container {
name: "driver".into(),
command: Some(vec!["/sk-driver".into()]),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path, ctrl_ns.into())),
image: Some(ctx.opts.driver_image.clone()),
env: Some(vec![
corev1::EnvVar {
Expand Down Expand Up @@ -251,7 +245,12 @@ fn build_certificate_volumes(cert_secret_name: &str) -> (corev1::VolumeMount, co
)
}

fn build_driver_args(ctx: &SimulationContext, cert_mount_path: String, trace_mount_path: String) -> Vec<String> {
fn build_driver_args(
ctx: &SimulationContext,
cert_mount_path: String,
trace_mount_path: String,
ctrl_ns: String,
) -> Vec<String> {
vec![
"--cert-path".into(),
format!("{cert_mount_path}/tls.crt"),
Expand All @@ -265,5 +264,7 @@ fn build_driver_args(ctx: &SimulationContext, cert_mount_path: String, trace_mou
ctx.name.clone(),
"--verbosity".into(),
ctx.opts.verbosity.clone(),
"--controller-ns".into(),
ctrl_ns,
]
}
Loading
Loading