diff --git a/src/benchmark.rs b/src/benchmark.rs index 565c957..09896f3 100644 --- a/src/benchmark.rs +++ b/src/benchmark.rs @@ -1,66 +1,78 @@ -use istziio_client::client_api::{StorageRequest, StorageClient}; +use istziio_client::client_api::{StorageClient, StorageRequest}; +use std::error::Error; use std::path::PathBuf; +use std::thread::sleep; use std::time::Instant; -use tokio::sync::mpsc; use std::time::{Duration, SystemTime}; -use std::thread::sleep; -use std::collections::VecDeque; -use std::error::Error; -use csv; +use tokio::sync::mpsc; // This scans the bench_files dir to figure out which test files are present, // then builds a map of TableId -> filename to init storage client(only when catalog is not available) // and also generates workload based on table ids. Finally it runs the workload - - pub struct TraceEntry { pub timestamp: u64, - pub request: StorageRequest + pub request: StorageRequest, } pub enum ClientType { Client1(), - Client2() + Client2(), } -pub fn parse_trace(trace_path: PathBuf) -> Result, Box> { - let mut trace: VecDeque = VecDeque::new(); +pub fn parse_trace(trace_path: PathBuf) -> Result, Box> { let mut rdr = csv::Reader::from_path(trace_path)?; + let mut traces = Vec::new(); for result in rdr.records() { - // The iterator yields Result, so we check the - // error here. let record = result?; - trace.push_back(TraceEntry{timestamp: record.get(0).unwrap().parse().unwrap(), request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap())}); + traces.push(TraceEntry { + timestamp: record.get(0).unwrap().parse().unwrap(), + request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap()), + }); } - Ok(trace) + Ok(traces) } -pub async fn run_trace(mut trace: VecDeque, client_builder: &dyn Fn() -> Box) { + +pub async fn run_trace( + traces: Vec, + client_builder: &dyn Fn() -> Box, +) { let start_time = SystemTime::now(); - let request_num = trace.len(); + let request_num = traces.len(); let (tx, mut rx) = mpsc::channel(32); - while !trace.is_empty() { - let next_entry = trace.pop_front().unwrap(); - if let Some(diff) = Duration::from_millis(next_entry.timestamp).checked_sub(start_time.elapsed().unwrap()) { + for (i, trace) in traces.into_iter().enumerate() { + if let Some(diff) = + Duration::from_millis(trace.timestamp).checked_sub(start_time.elapsed().unwrap()) + { sleep(diff); } - println!("next trace: {}", next_entry.timestamp); let tx = tx.clone(); let client = client_builder(); tokio::spawn(async move { - let table_id = match next_entry.request { + let table_id = match trace.request { StorageRequest::Table(id) => id, _ => panic!("Invalid request type"), }; - println!("start thread reading {}", table_id); + println!( + "Trace {} sends request for table {} at timestamp {}", + i, table_id, trace.timestamp + ); let client_start = Instant::now(); - let req = next_entry.request; - let res = client.request_data_sync(req.clone()).await; - if let Err(e) = res { - println!("Error: {}", e); + let res = client.request_data(trace.request).await; + if res.is_err() { + println!("Error: {}", res.as_ref().err().unwrap()); + } + let mut rx = res.unwrap(); + let mut total_num_rows = 0; + while let Some(rb) = rx.recv().await { + total_num_rows += rb.num_rows(); } let client_duration = client_start.elapsed(); + println!( + "Trace {} gets {} rows from the client, latency is {:?}", + i, total_num_rows, client_duration + ); tx.send(client_duration).await.unwrap(); }); } @@ -69,7 +81,6 @@ pub async fn run_trace(mut trace: VecDeque, client_builder: &dyn Fn( let mut duration_sum = Duration::new(0, 0); for _ in 0..request_num { let client_duration = rx.recv().await.unwrap(); - println!("Client latency: {:?}", client_duration); duration_sum += client_duration; }