Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Pull actual data from the server instead of directly return in benchmark #3

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,78 @@
use istziio_client::client_api::{StorageRequest, StorageClient};
use istziio_client::client_api::{StorageClient, StorageRequest};
use std::error::Error;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Instant;
use tokio::sync::mpsc;
use std::time::{Duration, SystemTime};
use std::thread::sleep;
use std::collections::VecDeque;
use std::error::Error;
use csv;
use tokio::sync::mpsc;

// This scans the bench_files dir to figure out which test files are present,
// then builds a map of TableId -> filename to init storage client(only when catalog is not available)
// and also generates workload based on table ids. Finally it runs the workload



pub struct TraceEntry {
pub timestamp: u64,
pub request: StorageRequest
pub request: StorageRequest,
}

pub enum ClientType {
Client1(),
Client2()
Client2(),
}

pub fn parse_trace(trace_path: PathBuf) -> Result<VecDeque<TraceEntry>, Box<dyn Error>> {
let mut trace: VecDeque<TraceEntry> = VecDeque::new();
pub fn parse_trace(trace_path: PathBuf) -> Result<Vec<TraceEntry>, Box<dyn Error>> {
let mut rdr = csv::Reader::from_path(trace_path)?;
let mut traces = Vec::new();
for result in rdr.records() {
// The iterator yields Result<StringRecord, Error>, so we check the
// error here.
let record = result?;
trace.push_back(TraceEntry{timestamp: record.get(0).unwrap().parse().unwrap(), request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap())});
traces.push(TraceEntry {
timestamp: record.get(0).unwrap().parse().unwrap(),
request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap()),
});
}
Ok(trace)
Ok(traces)
}
pub async fn run_trace(mut trace: VecDeque<TraceEntry>, client_builder: &dyn Fn() -> Box<dyn StorageClient>) {

pub async fn run_trace(
traces: Vec<TraceEntry>,
client_builder: &dyn Fn() -> Box<dyn StorageClient>,
) {
let start_time = SystemTime::now();
let request_num = trace.len();
let request_num = traces.len();
let (tx, mut rx) = mpsc::channel(32);
while !trace.is_empty() {
let next_entry = trace.pop_front().unwrap();
if let Some(diff) = Duration::from_millis(next_entry.timestamp).checked_sub(start_time.elapsed().unwrap()) {
for (i, trace) in traces.into_iter().enumerate() {
if let Some(diff) =
Duration::from_millis(trace.timestamp).checked_sub(start_time.elapsed().unwrap())
{
sleep(diff);
}
println!("next trace: {}", next_entry.timestamp);
let tx = tx.clone();
let client = client_builder();
tokio::spawn(async move {
let table_id = match next_entry.request {
let table_id = match trace.request {
StorageRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!("start thread reading {}", table_id);
println!(
"Trace {} sends request for table {} at timestamp {}",
i, table_id, trace.timestamp
);
let client_start = Instant::now();
let req = next_entry.request;

let res = client.request_data_sync(req.clone()).await;
if let Err(e) = res {
println!("Error: {}", e);
let res = client.request_data(trace.request).await;
if res.is_err() {
println!("Error: {}", res.as_ref().err().unwrap());
}
let mut rx = res.unwrap();
let mut total_num_rows = 0;
while let Some(rb) = rx.recv().await {
total_num_rows += rb.num_rows();
}
let client_duration = client_start.elapsed();
println!(
"Trace {} gets {} rows from the client, latency is {:?}",
i, total_num_rows, client_duration
);
tx.send(client_duration).await.unwrap();
});
}
Expand All @@ -69,7 +81,6 @@ pub async fn run_trace(mut trace: VecDeque<TraceEntry>, client_builder: &dyn Fn(
let mut duration_sum = Duration::new(0, 0);
for _ in 0..request_num {
let client_duration = rx.recv().await.unwrap();
println!("Client latency: {:?}", client_duration);
duration_sum += client_duration;
}

Expand Down
Loading