Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' of github.com:cmu-db/15721-s24-scheduler2 into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Makoto-Tomokiyo committed Apr 29, 2024
2 parents b0f2c46 + 855b79e commit 37a20e6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Cargo.lock
**/.vscode
.DS_Store

test_data/
**/*.parquet

# Test data files
Expand Down
27 changes: 19 additions & 8 deletions src/executor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::bytes::physical_plan_from_bytes;
use std::path::Path;
use std::path::PathBuf;
use tokio::fs::remove_file;
use tonic::transport::Channel;

pub struct ExecutorClient {
id: i32,
ctx: SessionContext,
scheduler: Option<SchedulerApiClient<Channel>>, // api client for the scheduler
executor: MockExecutor,
log_path: Option<String>,
log_directory_path: Option<String>,
}

impl ExecutorClient {
Expand All @@ -58,15 +60,24 @@ impl ExecutorClient {

let log_file_path = match log_path {
None => None,
Some(path_str) => Some(format!("{}/{}.json", path_str.trim_end_matches('/'), id)),
Some(path_str) => {
let full_log_path = format!("{}/{}.json", path_str.trim_end_matches('/'), id);
let full_path = PathBuf::from(full_log_path.clone());
if full_path.exists() {
if let Err(e) = remove_file(&full_path).await {
panic!("Executor {}: failed to remove existing file: {}", id, e);
}
}
Some(full_log_path)
}
};

Self {
id,
ctx: (*ctx).clone(),
scheduler: None,
executor: MockExecutor::new(catalog_path).await,
log_path: log_file_path,
log_directory_path: log_file_path,
}
}

Expand Down Expand Up @@ -118,11 +129,11 @@ impl ExecutorClient {
QueryStatus::Failed
};

// if let Some(ref log_path) = self.log_path {
// crate::profiling::append_job_to_json_file(&cur_job, Path::new(log_path))
// .await
// .expect("Failed to log job info");
// }
if let Some(ref log_path) = self.log_directory_path {
crate::profiling::append_job_to_json_file(&cur_job, Path::new(log_path))
.await
.expect("Failed to log job info");
}

if execution_success {
let result = execution_result.unwrap();
Expand Down
10 changes: 7 additions & 3 deletions src/profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use datafusion::common::Result;
use serde_json;
use std::io::SeekFrom;
use std::path::Path;
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};

Expand All @@ -16,6 +17,12 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
/// # Returns
/// A `Result<(), serde_json::Error>` indicating success or failure.
pub async fn append_job_to_json_file(job: &JobInfo, path: &Path) -> Result<(), serde_json::Error> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.await
.expect("Unable to create directory");
}

let mut file = OpenOptions::new()
.create(true)
.read(true)
Expand Down Expand Up @@ -48,7 +55,6 @@ pub async fn append_job_to_json_file(job: &JobInfo, path: &Path) -> Result<(), s
.await
.expect("Unable to write data to file");
} else {
// Move the cursor back to overwrite the last "]" character
buf_writer
.seek(SeekFrom::End(-1))
.await
Expand All @@ -70,10 +76,8 @@ pub async fn append_job_to_json_file(job: &JobInfo, path: &Path) -> Result<(), s
.expect("Unable to write data to file");
}

// Flush and sync the buffer to ensure all data is written to disk
buf_writer.flush().await.expect("Failed to flush data");
file.sync_all().await.expect("Failed to sync file");

Ok(())
}

Expand Down

0 comments on commit 37a20e6

Please sign in to comment.