Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
changed sleep loop to notify
Browse files Browse the repository at this point in the history
  • Loading branch information
Makoto-Tomokiyo committed Apr 29, 2024
1 parent c389d2f commit b0f2c46
Show file tree
Hide file tree
Showing 19 changed files with 129 additions and 86,923 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ Cargo.lock
.DS_Store

**/*.parquet

# Test data files
**/*.tbl
160 changes: 80 additions & 80 deletions job_summary.json

Large diffs are not rendered by default.

22 changes: 12 additions & 10 deletions src/bin/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@
//! - **Polling Interval**: The frequency of status checks in ongoing tasks is configurable
//!
use clap::{App, Arg, SubCommand};
use datafusion::error::DataFusionError;
use futures::TryFutureExt;
use prost::Message;
use scheduler2::composable_database::QueryStatus::{Done, InProgress};
use scheduler2::frontend::JobInfo;
use scheduler2::integration_test::IntegrationTest;
use scheduler2::parser::ExecutionPlanParser;
use scheduler2::composable_database::QueryStatus::{Done, InProgress};
use scheduler2::profiling;
use scheduler2::SchedulerError;
use clap::{App, Arg, SubCommand};
use datafusion::error::DataFusionError;
use futures::TryFutureExt;
use prost::Message;
use std::collections::HashMap;
use std::io::{self, Write};
use std::path::Path;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::io::AsyncWriteExt;

#[tokio::main]
Expand Down Expand Up @@ -197,6 +197,7 @@ pub async fn file_mode(file_paths: Vec<&str>, verify_correctness: bool) -> HashM
let mut query_ids = Vec::new();

// for each file selected...
let start = SystemTime::now();
for file_path in file_paths {
println!("Executing tests from file: {:?}", file_path);

Expand Down Expand Up @@ -236,12 +237,14 @@ pub async fn file_mode(file_paths: Vec<&str>, verify_correctness: bool) -> HashM
break;
}
}
let time = SystemTime::now().duration_since(start).unwrap();
println!("Execution time: {:?}ms", time.as_millis());

// Collect and print all job results
let jobs_map = tester.frontend.lock().await.get_all_jobs();
for (job_id, job_info) in jobs_map.iter() {
println!("Query ID: {}, Info: {}", job_id, job_info);
}
// for (job_id, job_info) in jobs_map.iter() {
// println!("Query ID: {}, Info: {}", job_id, job_info);
// }

if verify_correctness {
for (i, query_id) in query_ids.iter().enumerate() {
Expand Down Expand Up @@ -330,4 +333,3 @@ mod tests {
}
}
}

8 changes: 3 additions & 5 deletions src/executor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
//! To adapt the client to different execution models or to integrate a custom executor,
//! refer to `mock_executor.rs` and implement the required changes there.
use crate::composable_database::scheduler_api_client::SchedulerApiClient;
use crate::composable_database::QueryStatus::InProgress;
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId};
use crate::frontend::JobInfo;
use crate::intermediate_results::{insert_results, rewrite_query, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::mock_executor::MockExecutor;
use crate::composable_database::scheduler_api_client::SchedulerApiClient;
use crate::composable_database::QueryStatus::InProgress;
use crate::composable_database::{
NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId,
};
use chrono::Utc;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::ExecutionPlan;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ use tonic::transport::Channel;

use crate::composable_database::scheduler_api_client::SchedulerApiClient;

use crate::mock_catalog::load_catalog;
use crate::mock_optimizer::Optimizer;
use crate::parser::ExecutionPlanParser;
use crate::composable_database::QueryJobStatusArgs;
use crate::composable_database::QueryStatus;
use crate::composable_database::QueryStatus::InProgress;
use crate::mock_catalog::load_catalog;
use crate::mock_optimizer::Optimizer;
use crate::parser::ExecutionPlanParser;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::SessionContext;
Expand Down
2 changes: 1 addition & 1 deletion src/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::composable_database::scheduler_api_server::SchedulerApiServer;
use crate::executor_client::ExecutorClient;
use crate::frontend::MockFrontend;
use crate::mock_catalog::Config;
use crate::mock_catalog::{load_catalog, read_config};
use crate::parser::ExecutionPlanParser;
use crate::composable_database::scheduler_api_server::SchedulerApiServer;
use crate::server::SchedulerService;
use datafusion::arrow::array::RecordBatch;
use datafusion::error::DataFusionError;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ pub enum SchedulerError {
Error(String),
DfError(DataFusionError),
}

2 changes: 1 addition & 1 deletion src/profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ pub async fn write_jobs_to_json(jobs: Vec<JobInfo>, path: &Path) -> Result<(), s

#[cfg(test)]
mod tests {
use crate::composable_database::QueryStatus;
use crate::frontend::JobInfo;
use crate::profiling::{append_job_to_json_file, write_jobs_to_json};
use crate::composable_database::QueryStatus;
use chrono::{TimeZone, Utc};
use std::path::Path;
use tokio::fs;
Expand Down
2 changes: 1 addition & 1 deletion src/query_table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::query_graph::QueryGraph;
use crate::composable_database::QueryStatus;
use crate::query_graph::QueryGraph;
use crate::SchedulerError;
use datafusion_proto::bytes::physical_plan_to_bytes;
use std::collections::HashMap;
Expand Down
15 changes: 11 additions & 4 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::query_graph::{QueryGraph, QueryQueueStatus, StageStatus};
use crate::composable_database::{QueryStatus, TaskId};
use crate::query_graph::{QueryGraph, QueryQueueStatus, StageStatus};
use crate::task::{
Task,
TaskStatus::{self, *},
Expand All @@ -8,7 +8,7 @@ use std::collections::{BTreeSet, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Notify};

// Must implement here since generated TaskId does not derive Hash.
impl Hash for TaskId {
Expand Down Expand Up @@ -43,15 +43,19 @@ pub struct Queue {
query_map: HashMap<u64, (Arc<Mutex<QueryKey>>, Arc<Mutex<QueryGraph>>)>,
// List of currently running tasks.
running_task_map: HashMap<TaskId, Task>,
// Notify primitive that signals when new tasks are ready.
avail: Arc<Notify>,
}

// Notify variable is shared with scheduler service to control task dispatch.
impl Queue {
pub fn new() -> Self {
pub fn new(avail: Arc<Notify>) -> Self {
Self {
queue: BTreeSet::new(),
start_ts: SystemTime::now(),
query_map: HashMap::new(),
running_task_map: HashMap::new(),
avail,
}
}

Expand All @@ -70,6 +74,7 @@ impl Queue {
// TODO: only do this if the query key was changed?
let _ = self.queue.remove(&key);

// If graph has more tasks available, re-insert query and notify
if graph
.lock()
.await
Expand All @@ -79,6 +84,7 @@ impl Queue {
== QueryQueueStatus::Available
{
self.queue.insert(key);
self.avail.notify_waiters();
}
}

Expand Down Expand Up @@ -114,6 +120,7 @@ impl Queue {
self.query_map
.insert(qid, (Arc::new(Mutex::new(key)), Arc::clone(&graph)));
self.queue.insert(key);
self.avail.notify_waiters();
}

/*
Expand Down Expand Up @@ -183,9 +190,9 @@ mod tests {

use crate::parser::ExecutionPlanParser;
use crate::{
composable_database::TaskId,
query_graph::{QueryGraph, StageStatus},
queue::{QueryKey, Queue},
composable_database::TaskId,
};
use std::{
cmp::min,
Expand Down
26 changes: 14 additions & 12 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use crate::composable_database::scheduler_api_server::{SchedulerApi, SchedulerApiServer};
use crate::composable_database::{
AbortQueryArgs, AbortQueryRet, NotifyTaskStateArgs, NotifyTaskStateRet, QueryInfo,
QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet,
TaskId,
};
use crate::intermediate_results::{get_results, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::parser::ExecutionPlanParser;
use crate::query_graph::{QueryGraph, StageStatus};
use crate::query_table::QueryTable;
use crate::queue::Queue;
use crate::SchedulerError;
use crate::composable_database::scheduler_api_server::{SchedulerApi, SchedulerApiServer};
use crate::composable_database::{
AbortQueryArgs, AbortQueryRet, NotifyTaskStateArgs, NotifyTaskStateRet, QueryInfo,
QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet,
TaskId,
};
use datafusion::arrow::util::pretty::print_batches;
use datafusion::execution::context::SessionContext;
use datafusion_proto::bytes::physical_plan_from_bytes;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Notify};
use tokio::time::{sleep, Duration};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
Expand All @@ -27,6 +27,7 @@ pub struct SchedulerService {
queue: Arc<Mutex<Queue>>,
ctx: Arc<SessionContext>, // If we support changing the catalog at runtime, this should be a RwLock.
query_id_counter: AtomicU64,
avail: Arc<Notify>,
}

impl fmt::Debug for SchedulerService {
Expand All @@ -41,11 +42,13 @@ impl fmt::Debug for SchedulerService {

impl SchedulerService {
pub async fn new(catalog_path: &str) -> Self {
let avail = Arc::new(Notify::new());
Self {
query_table: Arc::new(QueryTable::new().await),
queue: Arc::new(Mutex::new(Queue::new())),
queue: Arc::new(Mutex::new(Queue::new(Arc::clone(&avail)))),
ctx: load_catalog(catalog_path).await,
query_id_counter: AtomicU64::new(0),
avail,
}
}

Expand All @@ -72,9 +75,8 @@ impl SchedulerService {
.await?;
return Ok((new_task_id, stage));
}
// TODO: add notify
drop(queue);
sleep(Duration::from_secs(1)).await;
self.avail.notified().await;
}
}
}
Expand Down Expand Up @@ -121,7 +123,7 @@ impl SchedulerApi for SchedulerService {
let final_result_opt = get_results(&TaskKey { stage_id, query_id }).await;
let final_result =
final_result_opt.expect("api.rs: query is done but no results in table");
print_batches(&final_result).unwrap();
// print_batches(&final_result).unwrap();

// ****************** BEGIN CHANGES FROM INTEGRATION TESTING ***************//
let final_result_bytes = ExecutionPlanParser::serialize_record_batches(final_result)
Expand Down Expand Up @@ -196,13 +198,13 @@ impl SchedulerApi for SchedulerService {
#[cfg(test)]
#[allow(unused_imports)]
mod tests {
use crate::parser::ExecutionPlanParser;
use crate::composable_database::scheduler_api_server::SchedulerApi;
use crate::composable_database::{
AbortQueryArgs, AbortQueryRet, NotifyTaskStateArgs, NotifyTaskStateRet, QueryInfo,
QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet,
TaskId,
};
use crate::parser::ExecutionPlanParser;
use crate::server::SchedulerService;
use tonic::Request;

Expand Down
Loading

0 comments on commit b0f2c46

Please sign in to comment.