Skip to content

Commit

Permalink
driver updates lease times
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Apr 9, 2024
1 parent 7733678 commit 508f42d
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 101 deletions.
17 changes: 10 additions & 7 deletions ctrl/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,22 @@ pub(super) async fn setup_driver(
bail!(SkControllerError::namespace_not_found(&sim.metrics_ns()));
};

match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns, &UtcClock).await? {
LeaseState::Claimed => (),
LeaseState::WaitingForClaim(t) => {
info!("sleeping for {t} seconds");
return Ok(Action::requeue(Duration::from_secs(t as u64)));
},
LeaseState::Unknown => bail!("unknown lease state"),
}

// Create the namespaces
if ns_api.get_opt(&ctx.driver_ns).await?.is_none() {
info!("creating driver namespace {}", ctx.driver_ns);
let obj = build_driver_namespace(ctx, sim);
ns_api.create(&Default::default(), &obj).await?;
};

match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns, &UtcClock).await? {
LeaseState::Claimed => (),
LeaseState::WaitingForClaim(t) => return Ok(Action::requeue(Duration::from_secs(t as u64))),
LeaseState::Unknown => bail!("unknown lease state"),
}

// Set up the metrics collector
let mut prom_ready = false;
match &sim.spec.metrics_config {
Expand Down Expand Up @@ -176,7 +179,7 @@ pub(super) async fn setup_driver(
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &ctx.driver_ns);
if jobs_api.get_opt(&ctx.driver_name).await?.is_none() {
info!("creating simulation driver {}", ctx.driver_name);
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name)?;
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name, ctrl_ns)?;
jobs_api.create(&Default::default(), &obj).await?;
}

Expand Down
12 changes: 10 additions & 2 deletions ctrl/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub(super) fn build_driver_job(
ctx: &SimulationContext,
sim: &Simulation,
cert_secret_name: &str,
ctrl_ns: &str,
) -> anyhow::Result<batchv1::Job> {
let trace_url = Url::parse(&sim.spec.trace_path)?;
let (trace_vm, trace_volume, trace_mount_path) = match storage::get_scheme(&trace_url)? {
Expand All @@ -186,7 +187,7 @@ pub(super) fn build_driver_job(
containers: vec![corev1::Container {
name: "driver".into(),
command: Some(vec!["/sk-driver".into()]),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path, ctrl_ns.into())),
image: Some(ctx.opts.driver_image.clone()),
env: Some(vec![
corev1::EnvVar {
Expand Down Expand Up @@ -244,7 +245,12 @@ fn build_certificate_volumes(cert_secret_name: &str) -> (corev1::VolumeMount, co
)
}

fn build_driver_args(ctx: &SimulationContext, cert_mount_path: String, trace_mount_path: String) -> Vec<String> {
fn build_driver_args(
ctx: &SimulationContext,
cert_mount_path: String,
trace_mount_path: String,
ctrl_ns: String,
) -> Vec<String> {
vec![
"--cert-path".into(),
format!("{cert_mount_path}/tls.crt"),
Expand All @@ -258,5 +264,7 @@ fn build_driver_args(ctx: &SimulationContext, cert_mount_path: String, trace_mou
ctx.name.clone(),
"--verbosity".into(),
ctx.opts.verbosity.clone(),
"--controller-ns".into(),
ctrl_ns,
]
}
21 changes: 11 additions & 10 deletions ctrl/tests/controller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use super::controller::*;
use super::*;
use crate::objects::*;

const CTRL_NAMESPACE: &str = "ctrl-ns";

#[fixture]
fn sim() -> Simulation {
Simulation {
Expand Down Expand Up @@ -144,7 +142,7 @@ async fn test_setup_driver_no_ns(sim: Simulation, root: SimulationRoot, opts: Op
.build();

assert!(matches!(
setup_driver(&ctx, &sim, &root, CTRL_NAMESPACE)
setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE)
.await
.unwrap_err()
.downcast::<SkControllerError>()
Expand All @@ -160,7 +158,7 @@ async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, op
let (mut fake_apiserver, client) = make_fake_apiserver();
let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim);

let lease_obj = build_lease(&sim, &root, CTRL_NAMESPACE, UtcClock.now());
let lease_obj = build_lease(&sim, &root, TEST_CTRL_NAMESPACE, UtcClock.now());
let driver_ns = ctx.driver_ns.clone();
let prom_name = ctx.prometheus_name.clone();
let driver_ns_obj = build_driver_namespace(&ctx, &sim);
Expand All @@ -175,7 +173,7 @@ async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, op
})
.handle(move |when, then| {
when.method(GET)
.path(format!("/apis/coordination.k8s.io/v1/namespaces/{CTRL_NAMESPACE}/leases/sk-lease"));
.path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_CTRL_NAMESPACE}/leases/{SK_LEASE_NAME}"));
then.json_body_obj(&lease_obj);
})
.handle_not_found(format!("/api/v1/namespaces/{driver_ns}"))
Expand All @@ -190,7 +188,10 @@ async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, op
then.json_body_obj(&prom_obj);
})
.build();
assert_eq!(setup_driver(&ctx, &sim, &root, CTRL_NAMESPACE).await.unwrap(), Action::requeue(REQUEUE_DURATION));
assert_eq!(
setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE).await.unwrap(),
Action::requeue(REQUEUE_DURATION)
);
fake_apiserver.assert();
}

Expand All @@ -217,11 +218,11 @@ async fn test_setup_driver_wait_prom(
let webhook_name = ctx.webhook_name.clone();
let driver_name = ctx.driver_name.clone();

let lease_obj = build_lease(&sim, &root, CTRL_NAMESPACE, UtcClock.now());
let lease_obj = build_lease(&sim, &root, TEST_CTRL_NAMESPACE, UtcClock.now());
let driver_ns_obj = build_driver_namespace(&ctx, &sim);
let driver_svc_obj = build_driver_service(&ctx, &root);
let webhook_obj = build_mutating_webhook(&ctx, &root);
let driver_obj = build_driver_job(&ctx, &sim, "".into()).unwrap();
let driver_obj = build_driver_job(&ctx, &sim, "".into(), TEST_CTRL_NAMESPACE).unwrap();

fake_apiserver
.handle(|when, then| {
Expand All @@ -232,7 +233,7 @@ async fn test_setup_driver_wait_prom(
})
.handle(move |when, then| {
when.method(GET)
.path(format!("/apis/coordination.k8s.io/v1/namespaces/{CTRL_NAMESPACE}/leases/sk-lease"));
.path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_CTRL_NAMESPACE}/leases/{SK_LEASE_NAME}"));
then.json_body_obj(&lease_obj);
})
.handle(move |when, then| {
Expand Down Expand Up @@ -287,7 +288,7 @@ async fn test_setup_driver_wait_prom(
});
}
fake_apiserver.build();
let res = setup_driver(&ctx, &sim, &root, CTRL_NAMESPACE).await.unwrap();
let res = setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE).await.unwrap();
if ready {
assert_eq!(res, Action::await_change());
} else {
Expand Down
11 changes: 7 additions & 4 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::mutation::MutationData;
use crate::runner::TraceRunner;
use crate::runner::run_trace;

#[derive(Clone, Debug, Parser)]
struct Options {
#[arg(long)]
sim_name: String,

#[arg(long)]
controller_ns: String,

#[arg(long)]
virtual_ns_prefix: String,

Expand Down Expand Up @@ -62,6 +65,7 @@ pub struct DriverContext {
name: String,
root_name: String,
sim: Simulation,
ctrl_ns: String,
virtual_ns_prefix: String,
owners_cache: Arc<Mutex<OwnersCache>>,
store: Arc<dyn TraceStorable + Send + Sync>,
Expand All @@ -86,6 +90,7 @@ async fn run(opts: Options) -> EmptyResult {
name,
root_name,
sim,
ctrl_ns: opts.controller_ns.clone(),
virtual_ns_prefix: opts.virtual_ns_prefix.clone(),
owners_cache,
store,
Expand All @@ -107,11 +112,9 @@ async fn run(opts: Options) -> EmptyResult {
// Give the mutation handler a bit of time to come online before starting the sim
sleep(Duration::from_secs(5)).await;

let runner = TraceRunner::new(client).await?;

tokio::select! {
res = server_task => Err(anyhow!("server terminated: {res:#?}")),
res = tokio::spawn(runner.run(ctx.clone())) => {
res = tokio::spawn(run_trace(ctx.clone(), client)) => {
match res {
Ok(r) => r,
Err(err) => Err(err.into()),
Expand Down
138 changes: 66 additions & 72 deletions driver/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use simkube::jsonutils;
use simkube::k8s::{
add_common_metadata,
build_global_object_meta,
try_update_lease,
ApiSet,
GVK,
};
Expand Down Expand Up @@ -76,87 +77,80 @@ pub(super) fn build_virtual_obj(
Ok(vobj)
}

pub struct TraceRunner {
roots_api: kube::Api<SimulationRoot>,
ns_api: kube::Api<corev1::Namespace>,
apiset: ApiSet,
}
#[instrument(parent=None, skip_all, fields(simulation=ctx.name))]
pub async fn run_trace(ctx: DriverContext, client: kube::Client) -> EmptyResult {
let roots_api: kube::Api<SimulationRoot> = kube::Api::all(client.clone());
let ns_api: kube::Api<corev1::Namespace> = kube::Api::all(client.clone());
let mut apiset = ApiSet::new(client.clone());

let root_obj = if let Some(root) = roots_api.get_opt(&ctx.root_name).await? {
warn!("Driver root {} already exists; continuing...", ctx.root_name);
root
} else {
let root_obj = build_simulation_root(&ctx.root_name, &ctx.sim);
roots_api.create(&Default::default(), &root_obj).await?
};

impl TraceRunner {
pub async fn new(client: kube::Client) -> anyhow::Result<TraceRunner> {
Ok(TraceRunner {
roots_api: kube::Api::all(client.clone()),
ns_api: kube::Api::all(client.clone()),
apiset: ApiSet::new(client.clone()),
})
}
let mut sim_ts = ctx.store.start_ts().ok_or(anyhow!("no trace data"))?;
let sim_end_ts = ctx.store.end_ts().ok_or(anyhow!("no trace data"))?;
let sim_duration = sim_end_ts - sim_ts;

#[instrument(parent=None, skip_all, fields(simulation=ctx.name))]
pub async fn run(mut self, ctx: DriverContext) -> EmptyResult {
let root_obj = if let Some(root) = self.roots_api.get_opt(&ctx.root_name).await? {
warn!("Driver root {} already exists; continuing...", ctx.root_name);
root
} else {
let root_obj = build_simulation_root(&ctx.root_name, &ctx.sim);
self.roots_api.create(&Default::default(), &root_obj).await?
};

let mut sim_ts = ctx.store.start_ts().ok_or(anyhow!("no trace data"))?;
for (evt, maybe_next_ts) in ctx.store.iter() {
// We're currently assuming that all tracked objects are namespace-scoped,
// this will panic/fail if that is not true.
for obj in &evt.applied_objs {
let gvk = GVK::from_dynamic_obj(obj)?;
let original_ns = obj.namespace().unwrap();
let virtual_ns = format!("{}-{}", ctx.virtual_ns_prefix, original_ns);

if self.ns_api.get_opt(&virtual_ns).await?.is_none() {
info!("creating virtual namespace: {virtual_ns}");
let vns = build_virtual_ns(&ctx, &root_obj, &virtual_ns);
self.ns_api.create(&Default::default(), &vns).await?;
}

let pod_spec_template_path = ctx
.store
.config()
.pod_spec_template_path(&gvk)
.ok_or(anyhow!("unknown simulated object: {:?}", gvk))?;
let vobj = build_virtual_obj(&ctx, &root_obj, &original_ns, &virtual_ns, obj, pod_spec_template_path)?;

info!("applying object {}", vobj.namespaced_name());
self.apiset
.namespaced_api_for(&gvk, virtual_ns)
.await?
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;
}
try_update_lease(client.clone(), &ctx.sim, &ctx.ctrl_ns, sim_duration, &UtcClock).await?;

for obj in &evt.deleted_objs {
info!("deleting object {}", obj.namespaced_name());
let gvk = GVK::from_dynamic_obj(obj)?;
let virtual_ns = format!("{}-{}", ctx.virtual_ns_prefix, obj.namespace().unwrap());
self.apiset
.namespaced_api_for(&gvk, virtual_ns)
.await?
.delete(&obj.name_any(), &Default::default())
.await?;
}
for (evt, maybe_next_ts) in ctx.store.iter() {
// We're currently assuming that all tracked objects are namespace-scoped,
// this will panic/fail if that is not true.
for obj in &evt.applied_objs {
let gvk = GVK::from_dynamic_obj(obj)?;
let original_ns = obj.namespace().unwrap();
let virtual_ns = format!("{}-{}", ctx.virtual_ns_prefix, original_ns);

if let Some(next_ts) = maybe_next_ts {
let sleep_duration = max(0, next_ts - sim_ts);
if ns_api.get_opt(&virtual_ns).await?.is_none() {
info!("creating virtual namespace: {virtual_ns}");
let vns = build_virtual_ns(&ctx, &root_obj, &virtual_ns);
ns_api.create(&Default::default(), &vns).await?;
}

info!("next event happens in {sleep_duration} seconds, sleeping");
debug!("current sim ts = {sim_ts}, next sim ts = {next_ts}");
let pod_spec_template_path = ctx
.store
.config()
.pod_spec_template_path(&gvk)
.ok_or(anyhow!("unknown simulated object: {:?}", gvk))?;
let vobj = build_virtual_obj(&ctx, &root_obj, &original_ns, &virtual_ns, obj, pod_spec_template_path)?;

info!("applying object {}", vobj.namespaced_name());
apiset
.namespaced_api_for(&gvk, virtual_ns)
.await?
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;
}

sim_ts = next_ts;
sleep(Duration::from_secs(sleep_duration as u64)).await;
}
for obj in &evt.deleted_objs {
info!("deleting object {}", obj.namespaced_name());
let gvk = GVK::from_dynamic_obj(obj)?;
let virtual_ns = format!("{}-{}", ctx.virtual_ns_prefix, obj.namespace().unwrap());
apiset
.namespaced_api_for(&gvk, virtual_ns)
.await?
.delete(&obj.name_any(), &Default::default())
.await?;
}

if let Err(e) = self.roots_api.delete(&ctx.root_name, &Default::default()).await {
error!("could not delete driver root {}: {e}", ctx.root_name);
if let Some(next_ts) = maybe_next_ts {
let sleep_duration = max(0, next_ts - sim_ts);

info!("next event happens in {sleep_duration} seconds, sleeping");
debug!("current sim ts = {sim_ts}, next sim ts = {next_ts}");

sim_ts = next_ts;
sleep(Duration::from_secs(sleep_duration as u64)).await;
}
}

Ok(())
if let Err(e) = roots_api.delete(&ctx.root_name, &Default::default()).await {
error!("could not delete driver root {}: {e}", ctx.root_name);
}

Ok(())
}
1 change: 1 addition & 0 deletions driver/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn build_driver_context(
name: TEST_DRIVER_NAME.into(),
root_name: TEST_DRIVER_ROOT_NAME.into(),
sim: Simulation::new(TEST_SIM_NAME, Default::default()),
ctrl_ns: TEST_CTRL_NAMESPACE.into(),
virtual_ns_prefix: TEST_VIRT_NS_PREFIX.into(),
owners_cache,
store,
Expand Down
Loading

0 comments on commit 508f42d

Please sign in to comment.