Skip to content

Commit

Permalink
refactor objectstore so we can test it; add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Jun 26, 2024
1 parent 3c27ae0 commit fe18047
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 67 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ tracing = "0.1.37"
tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
url = "2.4.1"
async-trait = "0.1.80"

[dependencies.kube]
version = "0.85.0"
Expand Down
19 changes: 4 additions & 15 deletions cli/export.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use anyhow::bail;
use bytes::Bytes;
use object_store::PutPayload;
use reqwest::Url;
use simkube::prelude::*;
use simkube::store::external_storage::{
object_store_for_scheme,
ObjectStoreScheme,
ObjectStoreWrapper,
SkObjectStore,
};
use simkube::time::duration_to_ts;

Expand Down Expand Up @@ -76,20 +73,12 @@ pub async fn cmd(args: &Args) -> EmptyResult {
// didn't work from the tracer pod, so this will handle that case as well.
let data = res.bytes().await?;
if !data.is_empty() {
write_output(data, &args.output_path).await?;
let object_store = SkObjectStore::new(&args.output_path)?;
object_store.put(data).await?;
}
println!("Trace data exported to {}", args.output_path);
},
res => bail!("Received {} response; could not export trace data:\n\n{}", res.status(), res.text().await?),
};
Ok(())
}

async fn write_output(data: Bytes, output_path: &str) -> EmptyResult {
let url = Url::parse(output_path)?;
let (scheme, path) = ObjectStoreScheme::parse(&url)?;
let store = object_store_for_scheme(&scheme, output_path)?;
let payload = PutPayload::from_bytes(data);
store.put(&path, payload).await?;
Ok(())
}
11 changes: 4 additions & 7 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::time::Duration;

use anyhow::anyhow;
use clap::Parser;
use reqwest::Url;
use rocket::config::TlsConfig;
use simkube::k8s::{
ApiSet,
Expand All @@ -20,8 +19,8 @@ use simkube::k8s::{
use simkube::prelude::*;
use simkube::sim::hooks;
use simkube::store::external_storage::{
object_store_for_scheme,
ObjectStoreScheme,
ObjectStoreWrapper,
SkObjectStore,
};
use simkube::store::{
TraceStorable,
Expand Down Expand Up @@ -84,10 +83,8 @@ async fn run(opts: Options) -> EmptyResult {

let root_name = format!("{name}-root");

let url = Url::parse(&opts.trace_path)?;
let (scheme, path) = ObjectStoreScheme::parse(&url)?;
let store = object_store_for_scheme(&scheme, &opts.trace_path)?;
let trace_data = store.get(&path).await?.bytes().await?.to_vec();
let object_store = SkObjectStore::new(&opts.trace_path)?;
let trace_data = object_store.get().await?.to_vec();

let store = Arc::new(TraceStore::import(trace_data, &sim.spec.duration)?);

Expand Down
89 changes: 67 additions & 22 deletions lib/store/external_storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::{
DynObjectStore,
PutPayload,
};
use reqwest::Url;

use crate::errors::*;
Expand Down Expand Up @@ -33,7 +38,7 @@ use crate::errors::*;
// in that library. This code can all be deleted if/once https://github.com/apache/arrow-rs/pull/5912
// is merged.

#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ObjectStoreScheme {
Local,
Memory,
Expand Down Expand Up @@ -81,24 +86,64 @@ impl ObjectStoreScheme {

// End copy-pasta'ed code

pub fn object_store_for_scheme(scheme: &ObjectStoreScheme, path_str: &str) -> anyhow::Result<Box<DynObjectStore>> {
let store: Box<DynObjectStore> = match &scheme {
ObjectStoreScheme::Local => Box::new(object_store::local::LocalFileSystem::new()),
ObjectStoreScheme::Memory => Box::new(object_store::memory::InMemory::new()),
ObjectStoreScheme::AmazonS3 => {
Box::new(object_store::aws::AmazonS3Builder::from_env().with_url(path_str).build()?)
},
ObjectStoreScheme::MicrosoftAzure => Box::new(
object_store::azure::MicrosoftAzureBuilder::from_env()
.with_url(path_str)
.build()?,
),
ObjectStoreScheme::GoogleCloudStorage => Box::new(
object_store::gcp::GoogleCloudStorageBuilder::from_env()
.with_url(path_str)
.build()?,
),
ObjectStoreScheme::Http => Box::new(object_store::http::HttpBuilder::new().with_url(path_str).build()?),
};
Ok(store)
#[cfg(feature = "testutils")]
use mockall::automock;

#[cfg_attr(feature = "testutils", automock)]
#[async_trait]
pub trait ObjectStoreWrapper {
fn scheme(&self) -> ObjectStoreScheme;
async fn put(&self, data: Bytes) -> EmptyResult;
async fn get(&self) -> anyhow::Result<Bytes>;
}

#[derive(Debug)]
pub struct SkObjectStore {
scheme: ObjectStoreScheme,
store: Box<DynObjectStore>,
path: Path,
}

impl SkObjectStore {
pub fn new(path_str: &str) -> anyhow::Result<SkObjectStore> {
let url = Url::parse(path_str)?;
let (scheme, path) = ObjectStoreScheme::parse(&url)?;
let store: Box<DynObjectStore> = match scheme {
ObjectStoreScheme::Local => Box::new(object_store::local::LocalFileSystem::new()),
ObjectStoreScheme::Memory => Box::new(object_store::memory::InMemory::new()),
ObjectStoreScheme::AmazonS3 => {
Box::new(object_store::aws::AmazonS3Builder::from_env().with_url(path_str).build()?)
},
ObjectStoreScheme::MicrosoftAzure => Box::new(
object_store::azure::MicrosoftAzureBuilder::from_env()
.with_url(path_str)
.build()?,
),
ObjectStoreScheme::GoogleCloudStorage => Box::new(
object_store::gcp::GoogleCloudStorageBuilder::from_env()
.with_url(path_str)
.build()?,
),
ObjectStoreScheme::Http => Box::new(object_store::http::HttpBuilder::new().with_url(path_str).build()?),
};

Ok(SkObjectStore { scheme, store, path })
}
}

#[async_trait]
impl ObjectStoreWrapper for SkObjectStore {
fn scheme(&self) -> ObjectStoreScheme {
self.scheme.clone()
}

async fn put(&self, data: Bytes) -> EmptyResult {
let payload = PutPayload::from_bytes(data);
self.store.put(&self.path, payload).await?;
Ok(())
}

async fn get(&self) -> anyhow::Result<Bytes> {
Ok(self.store.get(&self.path).await?.bytes().await?)
}
}
12 changes: 12 additions & 0 deletions lib/store/tests/external_storage_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use super::*;

#[rstest]
fn test_new_sk_object_store_invalid() {
let _ = SkObjectStore::new("oracle3://foo/bar").unwrap_err();
}

#[rstest]
fn test_new_sk_object_store() {
let store = SkObjectStore::new("s3://foo/bar").unwrap();
assert_eq!(store.scheme(), ObjectStoreScheme::AmazonS3);
}
6 changes: 6 additions & 0 deletions lib/store/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
mod external_storage_test;
mod import_export_test;
mod pod_owners_map_test;
mod trace_store_test;

use rstest::*;
use tracing_test::traced_test;

use super::external_storage::{
ObjectStoreScheme,
ObjectStoreWrapper,
SkObjectStore,
};
use super::pod_owners_map::filter_lifecycles_map;
use super::*;
use crate::testutils::*;
8 changes: 2 additions & 6 deletions lib/testutils/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use httpmock::{
use serde_json::json;

pub struct MockServerBuilder {
server: MockServer,
pub server: MockServer,
handlers: Vec<Box<dyn Fn(When, Then)>>,
mock_ids: Vec<usize>,
}
Expand Down Expand Up @@ -61,16 +61,12 @@ impl MockServerBuilder {
when.matches(print_req);
});
}

pub fn url(&self) -> http::Uri {
http::Uri::try_from(self.server.url("/")).unwrap()
}
}

pub fn make_fake_apiserver() -> (MockServerBuilder, kube::Client) {
let builder = MockServerBuilder::new();
let config = kube::Config {
cluster_url: builder.url(),
cluster_url: http::Uri::try_from(builder.server.url("/")).unwrap(),
default_namespace: "default".into(),
root_cert: None,
connect_timeout: None,
Expand Down
1 change: 1 addition & 0 deletions lib/testutils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use fake::{
make_fake_apiserver,
status_not_found,
status_ok,
MockServerBuilder,
};
pub use pods::test_pod;
use rstest::*;
Expand Down
31 changes: 17 additions & 14 deletions tracer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
mod errors;

use std::ops::Deref;
use std::sync::{
Arc,
Expand All @@ -9,15 +8,14 @@ use std::sync::{
use bytes::Bytes;
use clap::Parser;
use kube::Client;
use object_store::PutPayload;
use reqwest::Url;
use rocket::serde::json::Json;
use simkube::api::v1::ExportRequest;
use simkube::k8s::ApiSet;
use simkube::prelude::*;
use simkube::store::external_storage::{
object_store_for_scheme,
ObjectStoreScheme,
ObjectStoreWrapper,
SkObjectStore,
};
use simkube::store::TraceStore;
use simkube::watch::{
Expand All @@ -39,18 +37,18 @@ struct Options {
verbosity: String,
}

async fn export_helper(req: &ExportRequest, store: &Arc<Mutex<TraceStore>>) -> anyhow::Result<Vec<u8>> {
let trace_data = store.lock().unwrap().export(req.start_ts, req.end_ts, &req.filters)?;
async fn export_helper(
req: &ExportRequest,
trace_store: &Arc<Mutex<TraceStore>>,
object_store: &(dyn ObjectStoreWrapper + Sync),
) -> anyhow::Result<Vec<u8>> {
let trace_data = trace_store.lock().unwrap().export(req.start_ts, req.end_ts, &req.filters)?;

let url = Url::parse(&req.export_path)?;
let (scheme, path) = ObjectStoreScheme::parse(&url)?;
match scheme {
match object_store.scheme() {
// If we're writing to a cloud provider, we want to write from the location that the
// tracer's running from, ostensibly to minimize transport costs.
ObjectStoreScheme::AmazonS3 | ObjectStoreScheme::GoogleCloudStorage | ObjectStoreScheme::MicrosoftAzure => {
let store = object_store_for_scheme(&scheme, &req.export_path)?;
let payload = PutPayload::from_bytes(Bytes::from(trace_data));
store.put(&path, payload).await?;
object_store.put(Bytes::from(trace_data)).await?;
Ok(vec![])
},

Expand All @@ -64,10 +62,12 @@ async fn export_helper(req: &ExportRequest, store: &Arc<Mutex<TraceStore>>) -> a
#[rocket::post("/export", data = "<req>")]
async fn export(
req: Json<ExportRequest>,
store: &rocket::State<Arc<Mutex<TraceStore>>>,
trace_store: &rocket::State<Arc<Mutex<TraceStore>>>,
) -> Result<Vec<u8>, ExportResponseError> {
info!("export called with {:?}", req);
let res = export_helper(req.deref(), store).await;

let object_store = SkObjectStore::new(&req.export_path)?;
let res = export_helper(req.deref(), trace_store, &object_store).await;

// anyhow::Error Debug implementation prints the entire chain of errors, but once this gets
// sucked up into rocket it no longer knows anything about that, so here we print the full
Expand Down Expand Up @@ -110,3 +110,6 @@ async fn main() -> EmptyResult {
logging::setup(&args.verbosity);
run(args).await
}

#[cfg(test)]
mod tests;
5 changes: 5 additions & 0 deletions tracer/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod tracer_test;

use rstest::*;

use super::*;
Loading

0 comments on commit fe18047

Please sign in to comment.