Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
remove query table
Browse files Browse the repository at this point in the history
  • Loading branch information
aidan-smith committed May 2, 2024
1 parent 43228d7 commit 72d54a9
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 64 deletions.
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mod mock_optimizer;
pub mod parser;
pub mod profiling;
mod query_graph;
mod query_table;
mod queue;
pub mod server;
mod task;
Expand Down
56 changes: 0 additions & 56 deletions src/query_table.rs

This file was deleted.

19 changes: 19 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::task::{
Task,
TaskStatus::{self, *},
};
use crate::SchedulerError;
use std::collections::{BTreeSet, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use datafusion_proto::bytes::physical_plan_to_bytes;
use tokio::sync::{Mutex, Notify};

// Must implement here since generated TaskId does not derive Hash.
Expand Down Expand Up @@ -39,6 +41,7 @@ pub struct Queue {
start_ts: SystemTime,
// Structure that maps query IDs to query keys.
query_map: HashMap<u64, (Arc<Mutex<QueryKey>>, Arc<Mutex<QueryGraph>>)>,
// table: DashMap<u64, RwLock<QueryGraph>>,
// List of currently running tasks.
running_task_map: HashMap<TaskId, Task>,
// Notify primitive that signals when new tasks are ready.
Expand Down Expand Up @@ -191,6 +194,22 @@ impl Queue {
self.query_map.remove(&qid);
}
}

pub async fn get_plan_bytes(
&self,
query_id: u64,
stage_id: u64,
) -> Result<Vec<u8>, SchedulerError> {
let t = &self.query_map;
if let Some((_, graph)) = t.get(&query_id) {
let plan = Arc::clone(&graph.lock().await.stages[stage_id as usize].plan);
Ok(physical_plan_to_bytes(plan)
.expect("Failed to serialize physical plan")
.to_vec())
} else {
Err(SchedulerError::Error("Graph not found.".to_string()))
}
}
}

#[cfg(test)]
Expand Down
11 changes: 4 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::intermediate_results::{get_results, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::parser::ExecutionPlanParser;
use crate::query_graph::{QueryGraph, StageStatus};
use crate::query_table::QueryTable;
use crate::queue::Queue;
use crate::SchedulerError;
use datafusion::arrow::util::pretty::print_batches;
Expand All @@ -23,7 +22,6 @@ use tonic::transport::Server;
use tonic::{Request, Response, Status};

pub struct SchedulerService {
query_table: Arc<QueryTable>,
queue: Arc<Mutex<Queue>>,
ctx: Arc<SessionContext>, // If we support changing the catalog at runtime, this should be a RwLock.
query_id_counter: AtomicU64,
Expand All @@ -34,8 +32,8 @@ impl fmt::Debug for SchedulerService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SchedulerService {{ query_table: {:?}, queue: {:?} }}",
self.query_table, self.queue,
"SchedulerService {{ queue: {:?} }}",
self.queue,
)
}
}
Expand All @@ -44,7 +42,6 @@ impl SchedulerService {
pub async fn new(catalog_path: &str) -> Self {
let avail = Arc::new(Notify::new());
Self {
query_table: Arc::new(QueryTable::new().await),
queue: Arc::new(Mutex::new(Queue::new(Arc::clone(&avail)))),
ctx: load_catalog(catalog_path).await,
query_id_counter: AtomicU64::new(0),
Expand All @@ -69,8 +66,7 @@ impl SchedulerService {
loop {
let mut queue = self.queue.lock().await;
if let Some(new_task_id) = queue.next_task().await {
let stage = self
.query_table
let stage = queue
.get_plan_bytes(new_task_id.query_id, new_task_id.stage_id)
.await?;
return Ok((new_task_id, stage));
Expand Down Expand Up @@ -105,6 +101,7 @@ impl SchedulerApi for SchedulerService {
let query = QueryGraph::new(qid, plan).await;
self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await;


let response = ScheduleQueryRet { query_id: qid };
Ok(Response::new(response))
}
Expand Down

0 comments on commit 72d54a9

Please sign in to comment.