From 4ebc0d4f6e69e9b2282d6ddca5a0e3be433d0ee9 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Tue, 9 Apr 2024 12:39:48 -0700 Subject: [PATCH] adding new tests for lease --- ctrl/controller.rs | 5 +- ctrl/tests/controller_test.rs | 151 ++++++++++++++----------- driver/runner.rs | 2 +- lib/k8s/lease.rs | 49 ++++---- lib/k8s/mod.rs | 3 + lib/k8s/tests/lease_test.rs | 203 ++++++++++++++++++++++++++++++++++ lib/k8s/tests/mod.rs | 1 + lib/testutils/mod.rs | 5 + lib/testutils/sim.rs | 32 ++++++ 9 files changed, 357 insertions(+), 94 deletions(-) create mode 100644 lib/k8s/tests/lease_test.rs create mode 100644 lib/testutils/sim.rs diff --git a/ctrl/controller.rs b/ctrl/controller.rs index dbf3394c..45e6c286 100644 --- a/ctrl/controller.rs +++ b/ctrl/controller.rs @@ -18,7 +18,6 @@ use kube::api::{ }; use kube::error::ErrorResponse; use kube::runtime::controller::Action; -use kube::Error::Api; use kube::ResourceExt; use serde_json::json; use simkube::api::v1::build_simulation_root; @@ -95,7 +94,7 @@ pub(super) async fn setup_driver( bail!(SkControllerError::namespace_not_found(&sim.metrics_ns())); }; - match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns, &UtcClock).await? { + match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns, Box::new(UtcClock)).await? { LeaseState::Claimed => (), LeaseState::WaitingForClaim(t) => { info!("sleeping for {t} seconds"); @@ -197,7 +196,7 @@ pub(super) async fn cleanup(ctx: &SimulationContext, sim: &Simulation) { info!("cleaning up prometheus resources"); if let Err(e) = prom_api.delete(&ctx.prometheus_name, &Default::default()).await { - if matches!(e, Api(ErrorResponse { code: 404, .. })) { + if matches!(e, kube::Error::Api(ErrorResponse { code: 404, .. })) { warn!("prometheus object not found; maybe already cleaned up?"); } else { error!("Error cleaning up Prometheus: {e:?}"); diff --git a/ctrl/tests/controller_test.rs b/ctrl/tests/controller_test.rs index c9d4bf53..70f6abd6 100644 --- a/ctrl/tests/controller_test.rs +++ b/ctrl/tests/controller_test.rs @@ -10,36 +10,6 @@ use super::controller::*; use super::*; use crate::objects::*; -#[fixture] -fn sim() -> Simulation { - Simulation { - metadata: metav1::ObjectMeta { - name: Some(TEST_SIM_NAME.into()), - uid: Some("1234-asdf".into()), - ..Default::default() - }, - spec: SimulationSpec { - driver_namespace: TEST_NAMESPACE.into(), - trace_path: "file:///foo/bar".into(), - metrics_config: Some(Default::default()), - ..Default::default() - }, - status: Default::default(), - } -} - -#[fixture] -fn root() -> SimulationRoot { - SimulationRoot { - metadata: metav1::ObjectMeta { - name: Some(format!("sk-{TEST_SIM_NAME}-root")), - uid: Some("qwerty-5678".into()), - ..Default::default() - }, - spec: SimulationRootSpec {}, - } -} - #[fixture] fn opts() -> Options { Options { @@ -53,9 +23,9 @@ fn opts() -> Options { #[rstest] #[tokio::test] -async fn test_fetch_driver_status_no_driver(sim: Simulation, opts: Options) { +async fn test_fetch_driver_status_no_driver(test_sim: Simulation, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let driver_name = ctx.driver_name.clone(); fake_apiserver @@ -67,9 +37,9 @@ async fn test_fetch_driver_status_no_driver(sim: Simulation, opts: Options) { #[rstest] #[tokio::test] -async fn test_fetch_driver_status_driver_no_status(sim: Simulation, opts: Options) { +async fn test_fetch_driver_status_driver_no_status(test_sim: Simulation, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let driver_name = ctx.driver_name.clone(); fake_apiserver @@ -84,9 +54,9 @@ async fn test_fetch_driver_status_driver_no_status(sim: Simulation, opts: Option #[rstest] #[tokio::test] -async fn test_fetch_driver_status_driver_running(sim: Simulation, opts: Options) { +async fn test_fetch_driver_status_driver_running(test_sim: Simulation, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let driver_name = ctx.driver_name.clone(); fake_apiserver @@ -107,7 +77,7 @@ async fn test_fetch_driver_status_driver_running(sim: Simulation, opts: Options) #[case::complete(JOB_STATUS_CONDITION_COMPLETE)] #[case::failed(JOB_STATUS_CONDITION_FAILED)] #[tokio::test] -async fn test_fetch_driver_status_driver_finished(sim: Simulation, opts: Options, #[case] status: &'static str) { +async fn test_fetch_driver_status_driver_finished(test_sim: Simulation, opts: Options, #[case] status: &'static str) { let expected_state = if status == JOB_STATUS_CONDITION_COMPLETE { SimulationState::Finished } else { @@ -115,7 +85,7 @@ async fn test_fetch_driver_status_driver_finished(sim: Simulation, opts: Options }; let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let driver_name = ctx.driver_name.clone(); fake_apiserver @@ -133,36 +103,78 @@ async fn test_fetch_driver_status_driver_finished(sim: Simulation, opts: Options } #[rstest] +#[traced_test] #[tokio::test] -async fn test_setup_driver_no_ns(sim: Simulation, root: SimulationRoot, opts: Options) { +async fn test_setup_driver_no_ns(test_sim: Simulation, test_sim_root: SimulationRoot, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); fake_apiserver .handle_not_found(format!("/api/v1/namespaces/{DEFAULT_METRICS_NS}")) .build(); assert!(matches!( - setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE) + setup_driver(&ctx, &test_sim, &test_sim_root, TEST_CTRL_NAMESPACE) .await .unwrap_err() .downcast::() .unwrap(), SkControllerError::NamespaceNotFound(_) - )) + )); + fake_apiserver.assert(); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_setup_driver_lease_claim_fails(test_sim: Simulation, test_sim_root: SimulationRoot, opts: Options) { + let (mut fake_apiserver, client) = make_fake_apiserver(); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); + fake_apiserver + .handle(|when, then| { + when.method(GET).path(format!("/api/v1/namespaces/{DEFAULT_METRICS_NS}")); + then.json_body(json!({ + "kind": "Namespace", + })); + }) + .handle_not_found(format!( + "/apis/coordination.k8s.io/v1/namespaces/{TEST_CTRL_NAMESPACE}/leases/{SK_LEASE_NAME}" + )) + .handle(move |when, then| { + when.method(POST) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_CTRL_NAMESPACE}/leases")); + then.status(409).json_body(json!({ + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "message": "the object has been modified; please apply your changes to the latest version and try again", + "status": "Failure", + "reason": "Conflict", + "code": 409 + })); + }) + .build(); + + let err = setup_driver(&ctx, &test_sim, &test_sim_root, TEST_CTRL_NAMESPACE) + .await + .unwrap_err() + .downcast::() + .unwrap(); + assert!(matches!(err, kube::api::entry::CommitError::Save(..))); + fake_apiserver.assert(); } #[rstest] #[traced_test] #[tokio::test] -async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, opts: Options) { +async fn test_setup_driver_create_prom(test_sim: Simulation, test_sim_root: SimulationRoot, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); - let lease_obj = build_lease(&sim, &root, TEST_CTRL_NAMESPACE, UtcClock.now()); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, UtcClock.now()); let driver_ns = ctx.driver_ns.clone(); let prom_name = ctx.prometheus_name.clone(); - let driver_ns_obj = build_driver_namespace(&ctx, &sim); - let prom_obj = build_prometheus(&ctx.prometheus_name, &sim, &sim.spec.metrics_config.clone().unwrap()); + let driver_ns_obj = build_driver_namespace(&ctx, &test_sim); + let prom_obj = build_prometheus(&ctx.prometheus_name, &test_sim, &test_sim.spec.metrics_config.clone().unwrap()); fake_apiserver .handle(|when, then| { @@ -189,7 +201,9 @@ async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, op }) .build(); assert_eq!( - setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE).await.unwrap(), + setup_driver(&ctx, &test_sim, &test_sim_root, TEST_CTRL_NAMESPACE) + .await + .unwrap(), Action::requeue(REQUEUE_DURATION) ); fake_apiserver.assert(); @@ -202,15 +216,15 @@ async fn test_setup_driver_create_prom(sim: Simulation, root: SimulationRoot, op #[traced_test] #[tokio::test] async fn test_setup_driver_wait_prom( - mut sim: Simulation, - root: SimulationRoot, + mut test_sim: Simulation, + test_sim_root: SimulationRoot, opts: Options, #[case] ready: bool, #[case] disabled: bool, ) { env::set_var("POD_SVC_ACCOUNT", "asdf"); let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let driver_ns = ctx.driver_ns.clone(); let prom_name = ctx.prometheus_name.clone(); @@ -218,11 +232,11 @@ async fn test_setup_driver_wait_prom( let webhook_name = ctx.webhook_name.clone(); let driver_name = ctx.driver_name.clone(); - let lease_obj = build_lease(&sim, &root, TEST_CTRL_NAMESPACE, UtcClock.now()); - let driver_ns_obj = build_driver_namespace(&ctx, &sim); - let driver_svc_obj = build_driver_service(&ctx, &root); - let webhook_obj = build_mutating_webhook(&ctx, &root); - let driver_obj = build_driver_job(&ctx, &sim, "".into(), TEST_CTRL_NAMESPACE).unwrap(); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, UtcClock.now()); + let driver_ns_obj = build_driver_namespace(&ctx, &test_sim); + let driver_svc_obj = build_driver_service(&ctx, &test_sim_root); + let webhook_obj = build_mutating_webhook(&ctx, &test_sim_root); + let driver_obj = build_driver_job(&ctx, &test_sim, "".into(), TEST_CTRL_NAMESPACE).unwrap(); fake_apiserver .handle(|when, then| { @@ -242,9 +256,10 @@ async fn test_setup_driver_wait_prom( }); if disabled { - sim.spec.metrics_config = None; + test_sim.spec.metrics_config = None; } else { - let prom_obj = build_prometheus(&ctx.prometheus_name, &sim, &sim.spec.metrics_config.clone().unwrap()); + let prom_obj = + build_prometheus(&ctx.prometheus_name, &test_sim, &test_sim.spec.metrics_config.clone().unwrap()); fake_apiserver.handle(move |when, then| { when.method(GET) .path(format!("/apis/monitoring.coreos.com/v1/namespaces/monitoring/prometheuses/{prom_name}")); @@ -288,7 +303,9 @@ async fn test_setup_driver_wait_prom( }); } fake_apiserver.build(); - let res = setup_driver(&ctx, &sim, &root, TEST_CTRL_NAMESPACE).await.unwrap(); + let res = setup_driver(&ctx, &test_sim, &test_sim_root, TEST_CTRL_NAMESPACE) + .await + .unwrap(); if ready { assert_eq!(res, Action::await_change()); } else { @@ -301,9 +318,9 @@ async fn test_setup_driver_wait_prom( #[rstest] #[traced_test] #[tokio::test] -async fn test_cleanup(sim: Simulation, opts: Options) { +async fn test_cleanup(test_sim: Simulation, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let root = ctx.metaroot_name.clone(); let prom = ctx.prometheus_name.clone(); @@ -316,9 +333,9 @@ async fn test_cleanup(sim: Simulation, opts: Options) { .handle(move |when, then| { when.path(format!("/apis/monitoring.coreos.com/v1/namespaces/monitoring/prometheuses/{prom}")); then.json_body(status_ok()); - }); - fake_apiserver.build(); - cleanup(&ctx, &sim).await; + }) + .build(); + cleanup(&ctx, &test_sim).await; assert!(!logs_contain("ERROR")); fake_apiserver.assert(); @@ -329,9 +346,9 @@ async fn test_cleanup(sim: Simulation, opts: Options) { #[rstest] #[traced_test] #[tokio::test] -async fn test_cleanup_not_found(sim: Simulation, opts: Options) { +async fn test_cleanup_not_found(test_sim: Simulation, opts: Options) { let (mut fake_apiserver, client) = make_fake_apiserver(); - let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&sim); + let ctx = Arc::new(SimulationContext::new(client, opts)).with_sim(&test_sim); let root = ctx.metaroot_name.clone(); let prom = ctx.prometheus_name.clone(); @@ -343,7 +360,7 @@ async fn test_cleanup_not_found(sim: Simulation, opts: Options) { }) .handle_not_found(format!("/apis/monitoring.coreos.com/v1/namespaces/monitoring/prometheuses/{prom}")) .build(); - cleanup(&ctx, &sim).await; + cleanup(&ctx, &test_sim).await; assert!(logs_contain("WARN")); fake_apiserver.assert(); diff --git a/driver/runner.rs b/driver/runner.rs index 25832d4c..096526be 100644 --- a/driver/runner.rs +++ b/driver/runner.rs @@ -95,7 +95,7 @@ pub async fn run_trace(ctx: DriverContext, client: kube::Client) -> EmptyResult let sim_end_ts = ctx.store.end_ts().ok_or(anyhow!("no trace data"))?; let sim_duration = sim_end_ts - sim_ts; - try_update_lease(client.clone(), &ctx.sim, &ctx.ctrl_ns, sim_duration, &UtcClock).await?; + try_update_lease(client.clone(), &ctx.sim, &ctx.ctrl_ns, sim_duration, Box::new(UtcClock)).await?; for (evt, maybe_next_ts) in ctx.store.iter() { // We're currently assuming that all tracked objects are namespace-scoped, diff --git a/lib/k8s/lease.rs b/lib/k8s/lease.rs index 809f6563..f257cc57 100644 --- a/lib/k8s/lease.rs +++ b/lib/k8s/lease.rs @@ -1,4 +1,3 @@ -use anyhow::bail; use chrono::{ DateTime, Utc, @@ -8,9 +7,13 @@ use kube::api::Patch; use kube::ResourceExt; use serde_json::json; -use crate::k8s::build_object_meta; +use crate::k8s::{ + build_object_meta, + KubernetesError, +}; use crate::prelude::*; +#[derive(Debug, Eq, PartialEq)] pub enum LeaseState { Unknown, Claimed, @@ -31,30 +34,12 @@ pub fn build_lease(sim: &Simulation, metaroot: &SimulationRoot, ns: &str, now: D } } -fn compute_remaining_lease_time( - maybe_duration_seconds: &Option, - maybe_renew_time: &Option, - now_ts: i64, -) -> i64 { - let duration_seconds = maybe_duration_seconds.map_or(0, |secs| secs as i64) + RETRY_DELAY_SECONDS; - let renew_time = maybe_renew_time - .clone() - .map(|microtime| microtime.0.timestamp()) - .unwrap_or(now_ts); - let sleep_time = renew_time + duration_seconds - now_ts; - if sleep_time <= 0 { - warn!("exceeded the lease time but something hasn't released it; trying again"); - return RETRY_DELAY_SECONDS; - } - sleep_time -} - pub async fn try_claim_lease( client: kube::Client, sim: &Simulation, metaroot: &SimulationRoot, lease_ns: &str, - clock: &(dyn Clockable + Send + Sync), + clock: Box, ) -> anyhow::Result { // Try to claim the lease -- leases are namespaced, so we create the lease in the same // namespace as the controller. You could hypothetically work around this by running two @@ -106,12 +91,12 @@ pub async fn try_update_lease( sim: &Simulation, lease_ns: &str, lease_duration: i64, - clock: &(dyn Clockable + Send + Sync), + clock: Box, ) -> EmptyResult { let lease_api = kube::Api::::namespaced(client.clone(), lease_ns); match lease_api.get(SK_LEASE_NAME).await?.spec { Some(coordinationv1::LeaseSpec { holder_identity: Some(holder), .. }) if holder != sim.name_any() => { - bail!("lease not owned by current sim: {holder} != {}", sim.name_any()) + return Err(KubernetesError::lease_held_by_other(&holder)); }, _ => (), } @@ -130,3 +115,21 @@ pub async fn try_update_lease( .await?; Ok(()) } + +pub(super) fn compute_remaining_lease_time( + maybe_duration_seconds: &Option, + maybe_renew_time: &Option, + now_ts: i64, +) -> i64 { + let duration_seconds = maybe_duration_seconds.map_or(0, |secs| secs as i64) + RETRY_DELAY_SECONDS; + let renew_time = maybe_renew_time + .clone() + .map(|microtime| microtime.0.timestamp()) + .unwrap_or(now_ts); + let sleep_time = renew_time + duration_seconds - now_ts; + if sleep_time <= 0 { + warn!("exceeded the lease time but something hasn't released it; trying again"); + return RETRY_DELAY_SECONDS; + } + sleep_time +} diff --git a/lib/k8s/mod.rs b/lib/k8s/mod.rs index c53cb745..2a65b36b 100644 --- a/lib/k8s/mod.rs +++ b/lib/k8s/mod.rs @@ -28,6 +28,9 @@ err_impl! {KubernetesError, #[error("field not found in struct: {0}")] FieldNotFound(String), + #[error("lease has different owner: {0}")] + LeaseHeldByOther(String), + #[error("malformed container status: {0:?}")] MalformedContainerState(corev1::ContainerState), diff --git a/lib/k8s/tests/lease_test.rs b/lib/k8s/tests/lease_test.rs new file mode 100644 index 00000000..1523e0e5 --- /dev/null +++ b/lib/k8s/tests/lease_test.rs @@ -0,0 +1,203 @@ +use chrono::DateTime; +use httpmock::Method::*; +use k8s_openapi::api::coordination::v1 as coordinationv1; +use kube::error::ErrorResponse; +use serde_json::json; + +use super::*; + +const NOW: i64 = 15; +const TEST_LEASE_NS: &str = "simlease-ns"; +const TEST_LEASE_DURATION: i64 = 10; + +#[fixture] +fn lease_other_holder() -> coordinationv1::Lease { + let holder = "some-other-sim"; + coordinationv1::Lease { + spec: Some(coordinationv1::LeaseSpec { + holder_identity: Some(holder.into()), + ..Default::default() + }), + ..Default::default() + } +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_claim_lease_already_owned_by_us(test_sim: Simulation, test_sim_root: SimulationRoot) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, clock.now()); + fake_apiserver + .handle(move |when, then| { + when.method(GET) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&lease_obj); + }) + .build(); + let res = try_claim_lease(client, &test_sim, &test_sim_root, TEST_LEASE_NS, clock) + .await + .unwrap(); + fake_apiserver.assert(); + assert_eq!(res, LeaseState::Claimed); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_claim_lease_other_lease_unowned(test_sim: Simulation, test_sim_root: SimulationRoot) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + let other_lease: coordinationv1::Lease = Default::default(); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, clock.now()); + fake_apiserver + .handle(move |when, then| { + when.method(GET) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&other_lease); + }) + .handle(move |when, then| { + when.method(PUT) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&lease_obj); + }) + .build(); + let res = try_claim_lease(client, &test_sim, &test_sim_root, TEST_LEASE_NS, clock) + .await + .unwrap(); + fake_apiserver.assert(); + assert_eq!(res, LeaseState::Claimed); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_claim_lease_already_owned_by_other( + test_sim: Simulation, + test_sim_root: SimulationRoot, + lease_other_holder: coordinationv1::Lease, +) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + fake_apiserver + .handle(move |when, then| { + when.method(GET) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&lease_other_holder); + }) + .build(); + let res = try_claim_lease(client, &test_sim, &test_sim_root, TEST_LEASE_NS, clock) + .await + .unwrap(); + fake_apiserver.assert(); + assert!(matches!(res, LeaseState::WaitingForClaim(..))); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_claim_lease(test_sim: Simulation, test_sim_root: SimulationRoot) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, clock.now()); + fake_apiserver + .handle_not_found(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")) + .handle(move |when, then| { + when.method(POST) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases")); + then.json_body_obj(&lease_obj); + }) + .build(); + let res = try_claim_lease(client, &test_sim, &test_sim_root, TEST_LEASE_NS, clock) + .await + .unwrap(); + fake_apiserver.assert(); + assert_eq!(res, LeaseState::Claimed); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_update_lease_no_lease_found(test_sim: Simulation) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + fake_apiserver + .handle_not_found(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")) + .build(); + let res = try_update_lease(client, &test_sim, TEST_LEASE_NS, 10, clock).await.unwrap_err(); + let err = res.downcast::().unwrap(); + fake_apiserver.assert(); + assert!(matches!(err, kube::Error::Api(ErrorResponse { code: 404, .. }))); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_update_lease_wrong_owner(test_sim: Simulation, lease_other_holder: coordinationv1::Lease) { + let clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + fake_apiserver + .handle(move |when, then| { + when.method(GET) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&lease_other_holder); + }) + .build(); + let res = try_update_lease(client, &test_sim, TEST_LEASE_NS, 10, clock).await.unwrap_err(); + let err = res.downcast::().unwrap(); + fake_apiserver.assert(); + assert!(matches!(err, KubernetesError::LeaseHeldByOther(..))); +} + +#[rstest] +#[traced_test] +#[tokio::test] +async fn test_try_update_lease(test_sim: Simulation, test_sim_root: SimulationRoot) { + let mut clock = MockUtcClock::new(NOW); + let (mut fake_apiserver, client) = make_fake_apiserver(); + let lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, clock.now()); + let mut patched_lease_obj = build_lease(&test_sim, &test_sim_root, TEST_CTRL_NAMESPACE, clock.now()); + + clock.advance(5); + let renew_time = metav1::MicroTime(clock.now()); + patched_lease_obj.spec.as_mut().unwrap().lease_duration_seconds = Some(TEST_LEASE_DURATION as i32); + patched_lease_obj.spec.as_mut().unwrap().renew_time = Some(renew_time.clone()); + + fake_apiserver + .handle(move |when, then| { + when.method(GET) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")); + then.json_body_obj(&lease_obj); + }) + .handle(move |when, then| { + when.method(PATCH) + .path(format!("/apis/coordination.k8s.io/v1/namespaces/{TEST_LEASE_NS}/leases/{SK_LEASE_NAME}")) + .json_body(json!({ + "spec": { + "leaseDurationSeconds": TEST_LEASE_DURATION, + "renewTime": renew_time, + } + })); + then.json_body_obj(&patched_lease_obj); + }) + .build(); + assert_eq!((), try_update_lease(client, &test_sim, TEST_LEASE_NS, 10, clock).await.unwrap()); + fake_apiserver.assert(); +} + +#[rstest] +#[case::no_data(None, None, RETRY_DELAY_SECONDS)] +#[case::no_renew_time(Some(TEST_LEASE_DURATION), None, TEST_LEASE_DURATION + RETRY_DELAY_SECONDS)] +#[case::no_duration_seconds(None, Some(13), 13 + RETRY_DELAY_SECONDS - NOW)] +#[case::valid(Some(TEST_LEASE_DURATION), Some(13), 23 + RETRY_DELAY_SECONDS - NOW)] +#[case::negative(Some(5), Some(2), RETRY_DELAY_SECONDS)] +fn test_compute_remaining_lease_time_no_data( + #[case] maybe_duration_seconds_64: Option, + #[case] maybe_renew_ts: Option, + #[case] expected: i64, +) { + let maybe_renew_time = maybe_renew_ts.map(|ts| metav1::MicroTime(DateTime::from_timestamp(ts, 0).unwrap())); + let maybe_duration_seconds = maybe_duration_seconds_64.map(|secs| secs as i32); + assert_eq!(compute_remaining_lease_time(&maybe_duration_seconds, &maybe_renew_time, NOW), expected); +} diff --git a/lib/k8s/tests/mod.rs b/lib/k8s/tests/mod.rs index f40deb8c..241e8516 100644 --- a/lib/k8s/tests/mod.rs +++ b/lib/k8s/tests/mod.rs @@ -1,4 +1,5 @@ mod container_state_test; +mod lease_test; mod owners_test; mod pod_lifecycle_test; mod util_test; diff --git a/lib/testutils/mod.rs b/lib/testutils/mod.rs index b2e4e1f8..21166569 100644 --- a/lib/testutils/mod.rs +++ b/lib/testutils/mod.rs @@ -1,6 +1,7 @@ pub mod clock; pub mod fake; pub mod pods; +pub mod sim; pub mod store; pub const EMPTY_OBJ_HASH: u64 = 15130871412783076140; @@ -23,4 +24,8 @@ pub use fake::{ }; pub use pods::test_pod; use rstest::*; +pub use sim::{ + test_sim, + test_sim_root, +}; pub use store::MockTraceStore; diff --git a/lib/testutils/sim.rs b/lib/testutils/sim.rs new file mode 100644 index 00000000..be1f0c14 --- /dev/null +++ b/lib/testutils/sim.rs @@ -0,0 +1,32 @@ +use super::*; +use crate::prelude::*; + +#[fixture] +pub fn test_sim() -> Simulation { + Simulation { + metadata: metav1::ObjectMeta { + name: Some(TEST_SIM_NAME.into()), + uid: Some("1234-asdf".into()), + ..Default::default() + }, + spec: SimulationSpec { + driver_namespace: TEST_NAMESPACE.into(), + trace_path: "file:///foo/bar".into(), + metrics_config: Some(Default::default()), + ..Default::default() + }, + status: Default::default(), + } +} + +#[fixture] +pub fn test_sim_root() -> SimulationRoot { + SimulationRoot { + metadata: metav1::ObjectMeta { + name: Some(format!("sk-{TEST_SIM_NAME}-root")), + uid: Some("qwerty-5678".into()), + ..Default::default() + }, + spec: SimulationRootSpec {}, + } +}