diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d90ec3333cb9..b34af6f05966 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -65,6 +65,7 @@ datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } diff --git a/datafusion-examples/examples/system_columns.rs b/datafusion-examples/examples/system_columns.rs new file mode 100644 index 000000000000..2385d1ae60cb --- /dev/null +++ b/datafusion-examples/examples/system_columns.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::record_batch; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; + +use datafusion::common::FieldExt; +use datafusion::{assert_batches_eq, prelude::*}; + +/// This example shows how to mark fields as system columns. +/// System columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, `ctid` in Postgres would be considered a metadata column +/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. +/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). +/// +/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata +/// to `true`. +/// +/// As an example of how this works in practice, if you have the following Postgres table: +/// +/// ```sql +/// CREATE TABLE t (x int); +/// INSERT INTO t VALUES (1); +/// ``` +/// +/// And you do a `SELECT * FROM t`, you would get the following schema: +/// +/// ```text +/// +---+ +/// | x | +/// +---+ +/// | 1 | +/// +---+ +/// ``` +/// +/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): +/// +/// ```text +/// +-----+---+ +/// | ctid| x | +/// +-----+---+ +/// | 0 | 1 | +/// +-----+---+ +/// ``` +#[tokio::main] +async fn main() { + let batch = record_batch!( + ("a", Int32, [1, 2, 3]), + ("b", Utf8, ["foo", "bar", "baz"]), + ("_row_num", UInt32, [1, 2, 3]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("_row_num", DataType::UInt32, true).to_system_column(), + ]))) + .unwrap(); + + let ctx = SessionContext::new(); + let _ = ctx.register_batch("t", batch); + + let res = ctx + .sql("SELECT a, b FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+----------+", + "| _row_num |", + "+----------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----------+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT * FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + // does not include _row_num + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT *, _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+----------+", + "| a | b | _row_num |", + "+---+-----+----------+", + "| 1 | foo | 1 |", + "| 2 | bar | 2 |", + "| 3 | baz | 3 |", + "+---+-----+----------+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT t._row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+----------+", + "| _row_num |", + "+----------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----------+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT t.* FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + // does not include _row_num + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| 1 | foo |", + "| 2 | bar |", + "| 3 | baz |", + "+---+-----+", + ]; + assert_batches_eq!(expected, &res); + + let res = ctx + .sql("SELECT t.*, _row_num FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + #[rustfmt::skip] + let expected: Vec<&str> = vec![ + "+---+-----+----------+", + "| a | b | _row_num |", + "+---+-----+----------+", + "| 1 | foo | 1 |", + "| 2 | bar | 2 |", + "| 3 | baz | 3 |", + "+---+-----+----------+", + ]; + assert_batches_eq!(expected, &res); +} diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index e68e636989f8..d46ea37d3943 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -31,6 +31,7 @@ use async_trait::async_trait; use datafusion_common::config::{ConfigEntry, ConfigOptions}; use datafusion_common::error::Result; use datafusion_common::DataFusionError; +use datafusion_common::FieldExt; use datafusion_execution::TaskContext; use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; use datafusion_expr::{TableType, Volatility}; @@ -190,6 +191,9 @@ impl InformationSchemaConfig { for (field_position, field) in table.schema().fields().iter().enumerate() { + if field.is_system_column() { + continue; + } builder.add_column( &catalog_name, &schema_name, diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 302d515e027e..4d2a19ab76a8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -494,6 +494,29 @@ impl DFSchema { // Project a.id as id TableScan b id // In this case, there isn't `ambiguous name` problem. When `matches` just contains // one field without qualifier, we should return it. + // Another scenario where we can disambiguate is when we have a conflict between + // a system column and a non system column. + // In this case we return the non system column. + let mut non_system_columns = HashSet::new(); + for (_, f) in matches.iter() { + if !f.is_system_column() { + non_system_columns.insert(f.name().to_string()); + } + } + let matches_filtered = matches + .iter() + .filter_map(|(q, f)| { + if f.is_system_column() && non_system_columns.contains(f.name()) { + None + } else { + Some((q, f)) + } + }) + .collect::>(); + if matches_filtered.len() == 1 { + let (q, f) = matches_filtered[0]; + return Ok((*q, *f)); + } let fields_without_qualifier = matches .iter() .filter(|(q, _)| q.is_none()) @@ -1056,6 +1079,107 @@ pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String } } +/// Extension trait to manage DataFusion specific metadata on Arrow fields. +pub trait FieldExt { + /// Check if this field is a system columns. + /// + /// System columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, `ctid` in Postgres would be considered a metadata column + /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. + /// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). + /// + /// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata + /// to `true`. + /// + /// As an example of how this works in practice, if you have the following Postgres table: + /// + /// ```sql + /// CREATE TABLE t (x int); + /// INSERT INTO t VALUES (1); + /// ``` + /// + /// And you do a `SELECT * FROM t`, you would get the following schema: + /// + /// ```text + /// +---+ + /// | x | + /// +---+ + /// | 1 | + /// +---+ + /// ``` + /// + /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): + /// + /// ```text + /// +-----+---+ + /// | ctid| x | + /// +-----+---+ + /// | 0 | 1 | + /// +-----+---+ + /// ``` + fn is_system_column(&self) -> bool; + + /// Mark this field as a system column. + /// + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn to_system_column(self) -> Self; + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn to_non_system_column(self) -> Self; +} + +/// See [`FieldExt`]. +impl FieldExt for Field { + /// Check if this field is a system column. + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn is_system_column(&self) -> bool { + self.metadata() + .get("datafusion.system_column") + .map(|v| v.to_lowercase().starts_with("t")) + .unwrap_or(false) + } + + /// Mark this field as a system column. + /// See [`FieldExt::to_system_column`] for more information on what a system column is. + fn to_system_column(mut self) -> Self { + let mut metadata = self.metadata().clone(); + metadata.insert("datafusion.system_column".to_string(), "true".to_string()); + self.set_metadata(metadata); + self + } + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// See [`FieldExt::to_non_system_column`] for more information on what a system column is. + fn to_non_system_column(mut self) -> Self { + let mut metadata = self.metadata().clone(); + metadata.remove("datafusion.system_column"); + self.set_metadata(metadata); + self + } +} + +impl FieldExt for Arc { + /// Check if this field is a system column. + /// See [`FieldExt::is_system_column`] for more information on what a system column is. + fn is_system_column(&self) -> bool { + FieldExt::is_system_column(self.as_ref()) + } + + /// Mark this field as a system column. + /// See [`FieldExt::to_system_column`] for more information on what a system column is. + fn to_system_column(self) -> Self { + Arc::new(FieldExt::to_system_column(Arc::unwrap_or_clone(self))) + } + + /// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata. + /// See [`FieldExt::to_non_system_column`] for more information on what a system column is. + fn to_non_system_column(self) -> Self { + Arc::new(FieldExt::to_non_system_column(Arc::unwrap_or_clone(self))) + } +} + #[cfg(test)] mod tests { use crate::assert_contains; diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 77e8cd60ede2..f83807303d43 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -52,7 +52,7 @@ pub mod utils; pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, + qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldExt, SchemaExt, ToDFSchema, }; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 03c4ad7c013e..1e4858bc8dd0 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -64,6 +64,7 @@ pub mod joins; mod path_partition; pub mod select; mod sql_api; +pub mod system_columns; async fn register_aggregate_csv_by_sql(ctx: &SessionContext) { let testdata = test_util::arrow_test_data(); diff --git a/datafusion/core/tests/sql/system_columns.rs b/datafusion/core/tests/sql/system_columns.rs new file mode 100644 index 000000000000..13002745b5e5 --- /dev/null +++ b/datafusion/core/tests/sql/system_columns.rs @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use datafusion_common::FieldExt; + +#[tokio::test] +async fn test_system_column_select() { + let ctx = setup_test_context().await; + + // System columns are not included in wildcard select + let select = "SELECT * FROM test order by id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 100 |", + "| 3 | 1000 |", + "+----+--------------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // But they are if explicitly selected + let select = "SELECT _rowid FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+", + "| _rowid |", + "+--------+", + "| 0 |", + "| 1 |", + "| 2 |", + "+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // They can be selected alongside regular columns + let select = "SELECT _rowid, id FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // As well as wildcard select + let select = "SELECT *, _rowid FROM test order by _rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+--------+", + "| id | bank_account | _rowid |", + "+----+--------------+--------+", + "| 1 | 9000 | 0 |", + "| 2 | 100 | 1 |", + "| 3 | 1000 | 2 |", + "+----+--------------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_filtering() { + let ctx = setup_test_context().await; + + // Filter by exact _rowid + let select = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 0 | 1 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Filter by _rowid with and operator + let select = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+----+", + "| _rowid | id |", + "+--------+----+", + "| 1 | 2 |", + "+--------+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Filter without selecting + let select = "SELECT id FROM test WHERE _rowid = 0"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 1 |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_ordering() { + let ctx = setup_test_context().await; + + let select = "SELECT id FROM test order by _rowid asc"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 1 |", + "| 2 |", + "| 3 |", + "+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_joins() { + let ctx = setup_test_context().await; + + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_rowid", UInt32, [0, 1, 3]) + ) + .unwrap(); + let _ = ctx.register_batch("test2", batch); + + let batch = record_batch!( + ("other_id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9, 10, 11]), + ("_rowid", UInt32, [0, 1, 3]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("other_id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + ]))) + .unwrap(); + let _ = ctx.register_batch("test2sys", batch); + + let select = "SELECT id, other_id FROM test INNER JOIN test2 USING (_rowid)"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = "SELECT id, other_id, _rowid FROM test INNER JOIN test2 USING (_rowid)"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = + "SELECT id, other_id FROM test LEFT JOIN test2 ON test._rowid = test2._rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + let select = + "SELECT id, other_id, _rowid FROM test LEFT JOIN test2 ON test._rowid = test2._rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "| 3 | | |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + let select = + "SELECT id, other_id FROM test JOIN test2 ON test._rowid = test2._rowid % 2"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 2 | 3 |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Join with non-system _rowid column and select _row_id + // Normally this would result in an AmbiguousReference error because both tables have a _rowid column + // But when this conflict is between a system column and a regular column, the regular column is chosen + // to resolve the conflict without error. + let select = + "SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+--------+", + "| id | other_id | _rowid |", + "+----+----------+--------+", + "| 1 | 1 | 0 |", + "| 2 | 2 | 1 |", + "| 3 | 3 | 3 |", + "+----+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + // but if it's a conflict between two system columns we do get an error + let select = + "SELECT id, other_id, _rowid FROM test INNER JOIN test2sys ON id = other_id"; + assert!(ctx.sql(select).await.is_err()); + // Same in this case, `test._rowid` is discarded because it is a system column + let select = + "SELECT test.*, test2._rowid FROM test INNER JOIN test2 ON id = other_id"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------------+--------+", + "| id | bank_account | _rowid |", + "+----+--------------+--------+", + "| 1 | 9000 | 0 |", + "| 2 | 100 | 1 |", + "| 3 | 1000 | 3 |", + "+----+--------------+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // Join on system _rowid columns + let select = + "SELECT id, other_id FROM test JOIN test2sys ON test._rowid = test2sys._rowid"; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----------+", + "| id | other_id |", + "+----+----------+", + "| 1 | 1 |", + "| 2 | 2 |", + "+----+----------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + // there should be an ambiguity error since two system columns are joined with the same name + let select = "SELECT id, other_id FROM test JOIN test2sys ON _rowid = _rowid"; + assert!(ctx.sql(select).await.is_err()); +} + +#[tokio::test] +async fn test_system_column_with_cte() { + let ctx = setup_test_context().await; + + // System columns not available after CTE + let select = r" + WITH cte AS (SELECT * FROM test) + SELECT _rowid FROM cte + "; + assert!(ctx.sql(select).await.is_err()); + + // Explicitly selected system columns become regular columns + let select = r" + WITH cte AS (SELECT id, _rowid FROM test) + SELECT * FROM cte + "; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +#[tokio::test] +async fn test_system_column_in_subquery() { + let ctx = setup_test_context().await; + + // System columns not available in subquery + let select = r" + SELECT _rowid FROM (SELECT * FROM test) + "; + assert!(ctx.sql(select).await.is_err()); + + // Explicitly selected system columns become regular columns + let select = r" + SELECT * FROM (SELECT id, _rowid FROM test) + "; + let df = ctx.sql(select).await.unwrap(); + let batches = df.collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+--------+", + "| id | _rowid |", + "+----+--------+", + "| 1 | 0 |", + "| 2 | 1 |", + "| 3 | 2 |", + "+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &batches); +} + +async fn setup_test_context() -> SessionContext { + let batch = record_batch!( + ("id", UInt8, [1, 2, 3]), + ("bank_account", UInt64, [9000, 100, 1000]), + ("_rowid", UInt32, [0, 1, 2]), + ("_file", Utf8, ["file-0", "file-1", "file-2"]) + ) + .unwrap(); + let batch = batch + .with_schema(Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, true), + Field::new("bank_account", DataType::UInt64, true), + Field::new("_rowid", DataType::UInt32, true).to_system_column(), + Field::new("_file", DataType::Utf8, true).to_system_column(), + ]))) + .unwrap(); + + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let _ = ctx.register_batch("test", batch); + ctx +} diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 049926fb0bcd..3c655527bd04 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ }; use datafusion_common::utils::get_at_indices; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap, - Result, TableReference, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, FieldExt, + HashMap, Result, TableReference, }; use indexmap::IndexSet; @@ -357,6 +357,68 @@ fn get_excluded_columns( Ok(result) } +/// Find system columns in the schema, if any. +/// +/// System columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, `ctid` in Postgres would be considered a metadata column +/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. +/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). +/// +/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata +/// to `true`. +/// +/// As an example of how this works in practice, if you have the following Postgres table: +/// +/// ```sql +/// CREATE TABLE t (x int); +/// INSERT INTO t VALUES (1); +/// ``` +/// +/// And you do a `SELECT * FROM t`, you would get the following schema: +/// +/// ```text +/// +---+ +/// | x | +/// +---+ +/// | 1 | +/// +---+ +/// ``` +/// +/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): +/// +/// ```text +/// +-----+---+ +/// | ctid| x | +/// +-----+---+ +/// | 0 | 1 | +/// +-----+---+ +/// ``` +/// +/// Returns: A list of `Column`s that are system columns. +fn get_system_columns( + schema: &DFSchema, + qualifier: Option<&TableReference>, +) -> Result> { + let mut result = vec![]; + // exclude columns with `datafusion.system_column` metadata set to true + if let Some(qualifier) = qualifier { + for field in schema.fields_with_qualified(qualifier) { + if field.is_system_column() { + result.push(Column::new(Some(qualifier.clone()), field.name())); + } + } + } else { + for field in schema.fields() { + if field.is_system_column() { + let (qualifier, field) = + schema.qualified_field_with_unqualified_name(field.name())?; + result.push(Column::new(qualifier.cloned(), field.name())); + } + } + } + Ok(result) +} + /// Returns all `Expr`s in the schema, except the `Column`s in the `columns_to_skip` fn get_exprs_except_skipped( schema: &DFSchema, @@ -413,6 +475,7 @@ pub fn expand_wildcard( wildcard_options: Option<&WildcardOptions>, ) -> Result> { let mut columns_to_skip = exclude_using_columns(plan)?; + columns_to_skip.extend(get_system_columns(schema, None)?); let excluded_columns = if let Some(WildcardOptions { exclude: opt_exclude, except: opt_except, @@ -467,6 +530,7 @@ pub fn expand_qualified_wildcard( }; // Add each excluded `Column` to columns_to_skip let mut columns_to_skip = HashSet::new(); + columns_to_skip.extend(get_system_columns(schema, Some(qualifier))?); columns_to_skip.extend(excluded_columns); Ok(get_exprs_except_skipped( &qualified_dfschema, @@ -718,6 +782,7 @@ pub fn exprlist_to_fields<'a>( wildcard_schema, None, )?); + excluded.extend(get_system_columns(wildcard_schema, None)?); Ok(wildcard_schema .iter() .filter(|(q, f)| { @@ -727,7 +792,7 @@ pub fn exprlist_to_fields<'a>( .collect::>()) } Some(qualifier) => { - let excluded: Vec = get_excluded_columns( + let mut excluded: Vec = get_excluded_columns( options.exclude.as_ref(), options.except.as_ref(), wildcard_schema, @@ -736,11 +801,18 @@ pub fn exprlist_to_fields<'a>( .into_iter() .map(|c| c.flat_name()) .collect(); + excluded.extend( + get_system_columns(wildcard_schema, None)? + .into_iter() + .map(|c| c.flat_name()), + ); Ok(wildcard_schema .fields_with_qualified(qualifier) .into_iter() .filter_map(|field| { - let flat_name = format!("{}.{}", qualifier, field.name()); + let flat_name = + Column::new(Some(qualifier.clone()), field.name()) + .flat_name(); if excluded.contains(&flat_name) { None } else { @@ -758,7 +830,10 @@ pub fn exprlist_to_fields<'a>( .collect::>>()? .into_iter() .flatten() - .collect(); + // After a projection any system columns that are included in the result cease to be system columns + .map(|(q, f)| (q, f.to_non_system_column())) + .collect::>(); + Ok(result) }