diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index f26f73eb44..ead7a425a3 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -28,6 +28,7 @@ edition = "2021" rust-version = "1.72" [dependencies] +async-trait = { workspace = true } ballista-core = { path = "../core", version = "0.12.0" } ballista-executor = { path = "../executor", version = "0.12.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = true } @@ -40,8 +41,12 @@ sqlparser = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } +[dev-dependencies] +ctor = { version = "0.2" } +env_logger = { workspace = true } + [features] azure = ["ballista-core/azure"] -default = [] +default = ["standalone"] s3 = ["ballista-core/s3"] standalone = ["ballista-executor", "ballista-scheduler"] diff --git a/ballista/client/src/extension.rs b/ballista/client/src/extension.rs new file mode 100644 index 0000000000..e231ce8c58 --- /dev/null +++ b/ballista/client/src/extension.rs @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::{ + config::BallistaConfig, + serde::protobuf::{ + scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair, + }, + utils::{create_df_ctx_with_ballista_query_planner, create_grpc_client_connection}, +}; +use datafusion::{error::DataFusionError, prelude::SessionContext}; +use datafusion_proto::protobuf::LogicalPlanNode; + +#[async_trait::async_trait] +pub trait SessionContextExt { + #[cfg(feature = "standalone")] + async fn standalone( + config: &BallistaConfig, + ) -> datafusion::error::Result; + // To be added at the later stage + // #[cfg(feature = "standalone")] + // async fn standalone_with_state( + // config: &BallistaConfig, + // session_state: SessionState, + // ) -> datafusion::error::Result; + + async fn remote( + host: &str, + port: u16, + config: &BallistaConfig, + ) -> datafusion::error::Result; + // To be added at the later stage + // async fn remote_with_state( + // host: &str, + // port: u16, + // config: &BallistaConfig, + // session_state: SessionState, + // ) -> datafusion::error::Result; +} + +#[async_trait::async_trait] +impl SessionContextExt for SessionContext { + async fn remote( + host: &str, + port: u16, + config: &BallistaConfig, + ) -> datafusion::error::Result { + let scheduler_url = format!("http://{}:{}", &host, port); + log::info!( + "Connecting to Ballista scheduler at {}", + scheduler_url.clone() + ); + let connection = create_grpc_client_connection(scheduler_url.clone()) + .await + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + + let limit = config.default_grpc_client_max_message_size(); + let mut scheduler = SchedulerGrpcClient::new(connection) + .max_encoding_message_size(limit) + .max_decoding_message_size(limit); + + let remote_session_id = scheduler + .create_session(CreateSessionParams { + settings: config + .settings() + .iter() + .map(|(k, v)| KeyValuePair { + key: k.to_owned(), + value: v.to_owned(), + }) + .collect::>(), + }) + .await + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))? + .into_inner() + .session_id; + + log::info!( + "Server side SessionContext created with session id: {}", + remote_session_id + ); + + let ctx = { + create_df_ctx_with_ballista_query_planner::( + scheduler_url, + remote_session_id, + &config, + ) + }; + + Ok(ctx) + } + + #[cfg(feature = "standalone")] + async fn standalone(config: &BallistaConfig) -> datafusion::error::Result { + use ballista_core::serde::BallistaCodec; + use datafusion_proto::protobuf::PhysicalPlanNode; + + log::info!("Running in local mode. Scheduler will be run in-proc"); + + let addr = ballista_scheduler::standalone::new_standalone_scheduler() + .await + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + let scheduler_url = format!("http://localhost:{}", addr.port()); + let mut scheduler = loop { + match SchedulerGrpcClient::connect(scheduler_url.clone()).await { + Err(_) => { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + log::info!("Attempting to connect to in-proc scheduler..."); + } + Ok(scheduler) => break scheduler, + } + }; + + let remote_session_id = scheduler + .create_session(CreateSessionParams { + settings: config + .settings() + .iter() + .map(|(k, v)| KeyValuePair { + key: k.to_owned(), + value: v.to_owned(), + }) + .collect::>(), + }) + .await + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))? + .into_inner() + .session_id; + + log::info!( + "Server side SessionContext created with session id: {}", + remote_session_id + ); + + let ctx = { + create_df_ctx_with_ballista_query_planner::( + scheduler_url, + remote_session_id, + &config, + ) + }; + + let default_codec: BallistaCodec = + BallistaCodec::default(); + + let parallelism = std::thread::available_parallelism() + .map(|v| v.get()) + .unwrap_or(2); + + ballista_executor::new_standalone_executor(scheduler, parallelism, default_codec) + .await + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + Ok(ctx) + } +} diff --git a/ballista/client/src/lib.rs b/ballista/client/src/lib.rs index e61dfef281..76bd0c940b 100644 --- a/ballista/client/src/lib.rs +++ b/ballista/client/src/lib.rs @@ -18,4 +18,5 @@ #![doc = include_str!("../README.md")] pub mod context; +pub mod extension; pub mod prelude; diff --git a/ballista/client/tests/common/mod.rs b/ballista/client/tests/common/mod.rs new file mode 100644 index 0000000000..8618fb6733 --- /dev/null +++ b/ballista/client/tests/common/mod.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::env; +use std::error::Error; +use std::path::PathBuf; + +/// Returns the parquet test data directory, which is by default +/// stored in a git submodule rooted at +/// `examples/testdata`. +/// +/// The default can be overridden by the optional environment variable +/// `EXAMPLES_TEST_DATA` +/// +/// panics when the directory can not be found. +/// +/// Example: +/// ``` +/// use ballista_examples::test_util; +/// let testdata = test_util::examples_test_data(); +/// let filename = format!("{testdata}/aggregate_test_100.csv"); +/// assert!(std::path::PathBuf::from(filename).exists()); +/// ``` +pub fn example_test_data() -> String { + match get_data_dir("EXAMPLES_TEST_DATA", "testdata") { + Ok(pb) => pb.display().to_string(), + Err(err) => panic!("failed to get examples test data dir: {err}"), + } +} + +/// Returns a directory path for finding test data. +/// +/// udf_env: name of an environment variable +/// +/// submodule_dir: fallback path (relative to CARGO_MANIFEST_DIR) +/// +/// Returns either: +/// The path referred to in `udf_env` if that variable is set and refers to a directory +/// The submodule_data directory relative to CARGO_MANIFEST_PATH +fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result> { + // Try user defined env. + if let Ok(dir) = env::var(udf_env) { + let trimmed = dir.trim().to_string(); + if !trimmed.is_empty() { + let pb = PathBuf::from(trimmed); + if pb.is_dir() { + return Ok(pb); + } else { + return Err(format!( + "the data dir `{}` defined by env {udf_env} not found", + pb.display() + ) + .into()); + } + } + } + + // The env is undefined or its value is trimmed to empty, let's try default dir. + + // env "CARGO_MANIFEST_DIR" is "the directory containing the manifest of your package", + // set by `cargo run` or `cargo test`, see: + // https://doc.rust-lang.org/cargo/reference/environment-variables.html + let dir = env!("CARGO_MANIFEST_DIR"); + + let pb = PathBuf::from(dir).join(submodule_data); + if pb.is_dir() { + Ok(pb) + } else { + Err(format!( + "env `{udf_env}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\ + HINT: try running `git submodule update --init`", + pb.display(), + ).into()) + } +} + +#[ctor::ctor] +fn init() { + // Enable RUST_LOG logging configuration for test + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Info) + .parse_filters("ballista=debug,ballista_scheduler-rs=debug,ballista_executor=debug,datafusion=debug") + .is_test(true) + .try_init(); +} diff --git a/ballista/client/tests/standalone.rs b/ballista/client/tests/standalone.rs new file mode 100644 index 0000000000..2260b7e15e --- /dev/null +++ b/ballista/client/tests/standalone.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod common; + +#[cfg(test)] +#[cfg(feature = "standalone")] +mod standalone { + use ballista::{extension::SessionContextExt, prelude::*}; + use datafusion::prelude::*; + use datafusion::{ + assert_batches_eq, error::DataFusionError, prelude::SessionContext, + }; + + #[tokio::test] + async fn should_execute_sql_show() -> datafusion::error::Result<()> { + let test_data = crate::common::example_test_data(); + + let config = BallistaConfig::new() + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + let ctx: SessionContext = SessionContext::standalone(&config).await?; + ctx.register_parquet( + "test", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await?; + + let result = ctx + .sql("select string_col, timestamp_col from test where id > 4") + .await? + .collect() + .await?; + let expected = vec![ + "+------------+---------------------+", + "| string_col | timestamp_col |", + "+------------+---------------------+", + "| 31 | 2009-03-01T00:01:00 |", + "| 30 | 2009-04-01T00:00:00 |", + "| 31 | 2009-04-01T00:01:00 |", + "+------------+---------------------+", + ]; + + assert_batches_eq!(expected, &result); + + Ok(()) + } + + #[tokio::test] + async fn should_execute_sql_create_table() -> datafusion::error::Result<()> { + let test_data = crate::common::example_test_data(); + + let config = BallistaConfig::new() + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + let ctx: SessionContext = SessionContext::standalone(&config).await?; + ctx.sql(&format!("CREATE EXTERNAL TABLE tbl_test STORED AS PARQUET LOCATION '{}/alltypes_plain.parquet'", test_data, )).await?.show().await?; + + let result = ctx + .sql("select id, string_col, timestamp_col from tbl_test where id > 4") + .await? + .collect() + .await?; + let expected = vec![ + "+----+------------+---------------------+", + "| id | string_col | timestamp_col |", + "+----+------------+---------------------+", + "| 5 | 31 | 2009-03-01T00:01:00 |", + "| 6 | 30 | 2009-04-01T00:00:00 |", + "| 7 | 31 | 2009-04-01T00:01:00 |", + "+----+------------+---------------------+", + ]; + + assert_batches_eq!(expected, &result); + + Ok(()) + } + + #[tokio::test] + async fn should_execute_dataframe() -> datafusion::error::Result<()> { + let test_data = crate::common::example_test_data(); + + let config = BallistaConfig::new() + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + let ctx: SessionContext = SessionContext::standalone(&config).await?; + + let df = ctx + .read_parquet( + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await? + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(5)))?; + + let result = df.collect().await?; + + let expected = vec![ + "+----+----------+---------------------+", + "| id | bool_col | timestamp_col |", + "+----+----------+---------------------+", + "| 6 | true | 2009-04-01T00:00:00 |", + "| 7 | false | 2009-04-01T00:01:00 |", + "+----+----------+---------------------+", + ]; + + assert_batches_eq!(expected, &result); + + Ok(()) + } + + #[tokio::test] + #[ignore = "at the moment it does not write correctly"] + async fn should_execute_sql_write() -> datafusion::error::Result<()> { + let test_data = crate::common::example_test_data(); + + let config = BallistaConfig::new() + .map_err(|e| DataFusionError::Configuration(e.to_string()))?; + + let ctx: SessionContext = SessionContext::standalone(&config).await?; + ctx.register_parquet( + "test", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await?; + let write_dir = tempfile::tempdir().expect("temporary directory to be created"); + let write_dir_path = write_dir + .path() + .to_str() + .expect("path to be converted to str"); + + ctx.sql("select * from test") + .await? + .write_parquet(&write_dir_path, Default::default(), Default::default()) + .await?; + // there is discrepancy between logical plan encoded and decoded + // for some reason decoded format is csv instead of parquet. + // + // client encoded: + // CopyTo: format=parquet output_url=/var/folders/82/9qj_ms4d4cx01xzxjcdf1_f80000gn/T/.tmpNl0TDp options: () + // TableScan: test projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col] + // + // scheduler decoded: + // CopyTo: format=csv output_url=/var/folders/82/9qj_ms4d4cx01xzxjcdf1_f80000gn/T/.tmpNl0TDp options: () + // TableScan: test projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col] + // + // on scheduler side file type is decoded as: + // file_type Some(DefaultFileType { file_format_factory: CsvFormatFactory { options: Some(CsvOptions { has_header: None, delimiter: 44, quote: 34, terminator: None, escape: None, double_quote: None, newlines_in_values: None, compression: GZIP, schema_infer_max_rec: 0, date_format: None, datetime_format: None, timestamp_format: None, timestamp_tz_format: None, time_format: None, null_value: None, comment: None }) } }) + ctx.register_parquet("written_table", &write_dir_path, Default::default()) + .await?; + + let result = ctx + .sql("select id, string_col, timestamp_col from written_table where id > 4") + .await? + .collect() + .await?; + let expected = vec![ + "+----+------------+---------------------+", + "| id | string_col | timestamp_col |", + "+----+------------+---------------------+", + "| 5 | 31 | 2009-03-01T00:01:00 |", + "| 6 | 30 | 2009-04-01T00:00:00 |", + "| 7 | 31 | 2009-04-01T00:01:00 |", + "+----+------------+---------------------+", + ]; + + assert_batches_eq!(expected, &result); + Ok(()) + } +} diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index 746a0be92a..6841f5b0b7 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -25,7 +25,10 @@ use std::result; use crate::error::{BallistaError, Result}; -use datafusion::arrow::datatypes::DataType; +use datafusion::{arrow::datatypes::DataType, common::config_err}; + +// TODO: to be revisited, do we need all of them or +// we can reuse datafusion properties pub const BALLISTA_JOB_NAME: &str = "ballista.job.name"; pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions"; @@ -303,6 +306,48 @@ impl BallistaConfig { } } +impl datafusion::config::ExtensionOptions for BallistaConfig { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> datafusion::error::Result<()> { + // TODO: this is just temporary until i figure it out + // what to do with it + let entries = Self::valid_entries(); + + if entries.contains_key(key) { + self.settings.insert(key.to_string(), value.to_string()); + Ok(()) + } else { + config_err!("configuration key `{}` does not exist", key) + } + } + + fn entries(&self) -> Vec { + self.settings + .iter() + .map(|(key, value)| datafusion::config::ConfigEntry { + key: key.clone(), + value: Some(value.clone()), + description: "", + }) + .collect() + } +} + +impl datafusion::config::ConfigExtension for BallistaConfig { + const PREFIX: &'static str = "ballista"; +} + // an enum used to configure the scheduler policy // needs to be visible to code generated by configure_me #[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)] diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 7e88ffaf33..71a5e50ec4 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -248,12 +248,18 @@ pub fn create_df_ctx_with_ballista_query_planner( session_id: String, config: &BallistaConfig, ) -> SessionContext { + // TODO: put ballista configuration as part of sessions state + // planner can get it from there. + // This would make it changeable during run time + // using SQL SET statement let planner: Arc> = Arc::new(BallistaQueryPlanner::new(scheduler_url, config.clone())); let session_config = SessionConfig::new() .with_target_partitions(config.default_shuffle_partitions()) - .with_information_schema(true); + .with_information_schema(true) + .with_option_extension(config.clone()); + let session_state = SessionStateBuilder::new() .with_default_features() .with_config(session_config) @@ -320,19 +326,30 @@ impl QueryPlanner for BallistaQueryPlanner { logical_plan: &LogicalPlan, session_state: &SessionState, ) -> std::result::Result, DataFusionError> { + log::debug!( + "create_physical_plan - called for a plan: {:?}", + logical_plan + ); match logical_plan { - LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => { - // table state is managed locally in the BallistaContext, not in the scheduler + LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_t)) => { + log::debug!("create_physical_plan - handling ddl statement"); + Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) + } + LogicalPlan::EmptyRelation(_) => { + log::debug!("create_physical_plan - handling empty exec"); Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } - _ => Ok(Arc::new(DistributedQueryExec::with_repr( - self.scheduler_url.clone(), - self.config.clone(), - logical_plan.clone(), - self.extension_codec.clone(), - self.plan_repr, - session_state.session_id().to_string(), - ))), + _ => { + log::debug!("create_physical_plan - handling general statement"); + Ok(Arc::new(DistributedQueryExec::with_repr( + self.scheduler_url.clone(), + self.config.clone(), + logical_plan.clone(), + self.extension_codec.clone(), + self.plan_repr, + session_state.session_id().to_string(), + ))) + } } } } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 0d7d5e3662..653bda8347 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -512,7 +512,10 @@ impl SchedulerGrpc } }; - debug!("Received plan for execution: {:?}", plan); + debug!( + "Decoded logical plan for execution:\n{}", + plan.display_indent() + ); let job_id = self.state.task_manager.generate_job_id(); let job_name = query_settings