Skip to content

Commit

Permalink
feat: Upstream Spice AI changes to datafusion-federation (#51)
Browse files Browse the repository at this point in the history
Co-authored-by: sgrebnov <[email protected]>
Co-authored-by: yfu <[email protected]>
Co-authored-by: Qianqian <[email protected]>
Co-authored-by: Michiel De Backker <[email protected]>
Co-authored-by: Suriya Kandaswamy <[email protected]>
Co-authored-by: Suriya Kandaswamy <[email protected]>
Co-authored-by: hozan23 <[email protected]>
Co-authored-by: hozan23 <[email protected]>
  • Loading branch information
9 people authored Sep 2, 2024
1 parent c8fa466 commit af09abe
Show file tree
Hide file tree
Showing 17 changed files with 2,264 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/node_modules
package-lock.json
package.json
.DS_Store
213 changes: 213 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'datafusion_federation'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=datafusion-federation"
],
"filter": {
"name": "datafusion_federation",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'sqlite'",
"cargo": {
"args": [
"build",
"--example=sqlite",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "sqlite",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'sqlite'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=sqlite",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "sqlite",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'flight-sql'",
"cargo": {
"args": [
"build",
"--example=flight-sql",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "flight-sql",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'flight-sql'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=flight-sql",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "flight-sql",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'postgres-partial'",
"cargo": {
"args": [
"build",
"--example=postgres-partial",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "postgres-partial",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'postgres-partial'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=postgres-partial",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "postgres-partial",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'sqlite-partial'",
"cargo": {
"args": [
"build",
"--example=sqlite-partial",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "sqlite-partial",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'sqlite-partial'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=sqlite-partial",
"--package=datafusion-federation-examples"
],
"filter": {
"name": "sqlite-partial",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'datafusion_federation_flight_sql'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=datafusion-federation-flight-sql"
],
"filter": {
"name": "datafusion_federation_flight_sql",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'datafusion_federation_sql'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=datafusion-federation-sql"
],
"filter": {
"name": "datafusion_federation_sql",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ members = [
]

[workspace.package]
version = "0.1.3"
version = "0.1.6"
edition = "2021"
license = "MIT"
license = "Apache-2.0"
readme = "README.md"

[workspace.dependencies]
async-trait = "0.1.81"
async-stream = "0.3.5"
futures = "0.3.30"
datafusion = "41.0.0"
datafusion-substrait = "41.0.0"
arrow-json = "52.2.0"
8 changes: 6 additions & 2 deletions datafusion-federation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ all-features = true
no-default-features = true

[features]
sql = ["futures"]
sql = []

[dependencies]
futures.workspace = true
async-trait.workspace = true
datafusion.workspace = true
async-stream.workspace = true
arrow-json.workspace = true

futures = { version = "0.3.30", optional = true }

[dev-dependencies]
tokio = { version = "1.39.3", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing = "0.1.40"

[[example]]
name = "df-csv"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-federation/examples/df-csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion::{
options::CsvReadOptions,
},
physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
sql::sqlparser::dialect::{Dialect, GenericDialect},
sql::unparser::dialect::{DefaultDialect, Dialect},
};
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLSchemaProvider};
use futures::TryStreamExt;
Expand Down Expand Up @@ -106,7 +106,7 @@ impl SQLExecutor for InMemorySQLExecutor {
}

fn dialect(&self) -> Arc<dyn Dialect> {
Arc::new(GenericDialect {})
Arc::new(DefaultDialect {})
}
}

Expand Down
8 changes: 7 additions & 1 deletion datafusion-federation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ use datafusion::{
};

pub use optimizer::{get_table_source, FederationOptimizerRule};
pub use plan_node::{FederatedPlanNode, FederatedQueryPlanner, FederationPlanner};
pub use plan_node::{
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederationPlanner,
};
pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource};

// TODO clean up this
// TODO move schema_cast.rs to schema_cast directory
pub mod schema_cast;

pub fn default_session_state() -> SessionState {
let rules = default_optimizer_rules();
SessionStateBuilder::new()
Expand Down
6 changes: 3 additions & 3 deletions datafusion-federation/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode {
}

fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Federated\n {:?}", self.plan)
write!(f, "Federated\n {}", self.plan)
}

fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Hash for FederatedPlanNode {
}

#[derive(Default)]
struct FederatedPlanner {}
pub struct FederatedPlanner {}

impl FederatedPlanner {
pub fn new() -> Self {
Expand All @@ -152,7 +152,7 @@ impl ExtensionPlanner for FederatedPlanner {
));
}

let fed_planner = fed_node.planner.clone();
let fed_planner = Arc::clone(&fed_node.planner);
let exec_plan = fed_planner.plan_federation(fed_node, session_state).await?;
return Ok(Some(exec_plan));
}
Expand Down
Loading

0 comments on commit af09abe

Please sign in to comment.