Skip to content

Commit

Permalink
split up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jan 31, 2025
1 parent fa0793c commit c574335
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 267 deletions.
34 changes: 34 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,11 @@ pub trait FieldExt {
///
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn to_system_column(self) -> Self;

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
///
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn to_non_system_column(self) -> Self;
}

/// See [`FieldExt`].
Expand All @@ -1121,6 +1126,35 @@ impl FieldExt for Field {
self.set_metadata(metadata);
self
}

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
/// See [`FieldExt::to_non_system_column`] for more information on what a system column is.
fn to_non_system_column(mut self) -> Self {
let mut metadata = self.metadata().clone();
metadata.remove("datafusion.system_column");
self.set_metadata(metadata);
self
}
}

impl FieldExt for Arc<Field> {
/// Check if this field is a system column.
/// See [`FieldExt::is_system_column`] for more information on what a system column is.
fn is_system_column(&self) -> bool {
FieldExt::is_system_column(self.as_ref())
}

/// Mark this field as a system column.
/// See [`FieldExt::to_system_column`] for more information on what a system column is.
fn to_system_column(self) -> Self {
Arc::new(FieldExt::to_system_column(Arc::unwrap_or_clone(self)))
}

/// Mark this field as a non system column by removing the `datafusion.system_column` key from the field's metadata.
/// See [`FieldExt::to_non_system_column`] for more information on what a system column is.
fn to_non_system_column(self) -> Self {
Arc::new(FieldExt::to_non_system_column(Arc::unwrap_or_clone(self)))
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub mod joins;
mod path_partition;
pub mod select;
mod sql_api;
pub mod system_columns;

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = test_util::arrow_test_data();
Expand Down
267 changes: 1 addition & 266 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::*;
use datafusion_common::{FieldExt, ScalarValue};
use datafusion_common::ScalarValue;

#[tokio::test]
async fn test_list_query_parameters() -> Result<()> {
Expand Down Expand Up @@ -350,268 +350,3 @@ async fn test_version_function() {

assert_eq!(version.value(0), expected_version);
}

#[tokio::test]
async fn test_select_system_column() {
let batch = record_batch!(
("id", UInt8, [1, 2, 3]),
("bank_account", UInt64, [9000, 100, 1000]),
("_rowid", UInt32, [0, 1, 2]),
("_file", Utf8, ["file-0", "file-1", "file-2"])
)
.unwrap();
let batch = batch
.with_schema(Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt8, true),
Field::new("bank_account", DataType::UInt64, true),
Field::new("_rowid", DataType::UInt32, true).to_system_column(),
Field::new("_file", DataType::Utf8, true).to_system_column(),
])))
.unwrap();

let ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);
let _ = ctx.register_batch("test", batch);

let select0 = "SELECT * FROM test order by id";
let df = ctx.sql(select0).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 100 |",
"| 3 | 1000 |",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select1 = "SELECT _rowid FROM test order by _rowid";
let df = ctx.sql(select1).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"| 1 |",
"| 2 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select2 = "SELECT _rowid, id FROM test order by _rowid";
let df = ctx.sql(select2).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0";
let df = ctx.sql(select3).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select4 = "SELECT _rowid FROM test LIMIT 1";
let df = ctx.sql(select4).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1";
let df = ctx.sql(select5).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 1 | 2 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select6 = "SELECT _rowid, _file FROM test order by _rowid";
let df = ctx.sql(select6).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+--------+--------+",
"| _rowid | _file |",
"+--------+--------+",
"| 0 | file-0 |",
"| 1 | file-1 |",
"| 2 | file-2 |",
"+--------+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select6 = "SELECT id FROM test order by _rowid asc";
let df = ctx.sql(select6).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+",
"| id |",
"+----+",
"| 1 |",
"| 2 |",
"| 3 |",
"+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let show_columns = "show columns from test;";
let df_columns = ctx.sql(show_columns).await.unwrap();
let batchs = df_columns
.select(vec![col("column_name"), col("data_type")])
.unwrap()
.collect()
.await
.unwrap();
let expected = [
"+--------------+-----------+",
"| column_name | data_type |",
"+--------------+-----------+",
"| id | UInt8 |",
"| bank_account | UInt64 |",
"+--------------+-----------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let batch = record_batch!(
("other_id", UInt8, [1, 2, 3]),
("bank_account", UInt64, [9, 10, 11]),
("_rowid", UInt32, [10, 11, 12]) // not a system column!
)
.unwrap();
let _ = ctx.register_batch("test2", batch);

// Normally _rowid would be a name conflict and throw an error during planning.
// But when it's a conflict between a system column and a non system column,
// the non system column should be used.
let select7 =
"SELECT id, other_id, _rowid FROM test INNER JOIN test2 ON id = other_id";
let df = ctx.sql(select7).await.unwrap();
let batchs = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----------+---------+",
"| id | other_id | _rowid |",
"+----+----------+---------+",
"| 1 | 1 | 10 |",
"| 2 | 2 | 11 |",
"| 3 | 3 | 12 |",
"+----+----------+---------+",
];
assert_batches_sorted_eq!(expected, &batchs);

// Sanity check: for other columns we do get a conflict
let select7 =
"SELECT id, other_id, bank_account FROM test INNER JOIN test2 ON id = other_id";
assert!(ctx.sql(select7).await.is_err());

// Demonstrate that we can join on _rowid
let batch = record_batch!(
("other_id", UInt8, [2, 3, 4]),
("_rowid", UInt32, [2, 3, 4])
)
.unwrap();
let batch = batch
.with_schema(Arc::new(Schema::new(vec![
Field::new("other_id", DataType::UInt8, true),
Field::new("_rowid", DataType::UInt32, true).to_system_column(),
])))
.unwrap();
let _ = ctx.register_batch("test2", batch);

let select8 = "SELECT id, other_id, _rowid FROM test JOIN test2 ON _rowid = _rowid";
let df = ctx.sql(select8).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----------+---------+",
"| id | other_id | _rowid |",
"+----+----------+---------+",
"| 2 | 2 | 2 |",
"+----+----------+---------+",
];
assert_batches_sorted_eq!(expected, &batches);

// Once passed through a projection, system columns are no longer available
let select9 = r"
WITH cte AS (SELECT * FROM test)
SELECT * FROM cte
";
let df = ctx.sql(select9).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----------+---------+",
"| id | other_id | _rowid |",
"+----+----------+---------+",
"| 2 | 2 | 2 |",
"+----+----------+---------+",
];
assert_batches_sorted_eq!(expected, &batches);
let select10 = r"
WITH cte AS (SELECT * FROM test)
SELECT _rowid FROM cte
";
let df = ctx.sql(select10).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----------+---------+",
"| id | other_id | _rowid |",
"+----+----------+---------+",
"| 2 | 2 | 2 |",
"+----+----------+---------+",
];
assert_batches_sorted_eq!(expected, &batches);

// And if passed explicitly selected and passed through a projection
// they are no longer system columns.
let select11 = r"
WITH cte AS (SELECT id, _rowid FROM test)
SELECT * FROM cte
";
let df = ctx.sql(select11).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+---------+",
"| id | _rowid |",
"+----+---------+",
"| 2 | 2 |",
"+----+---------+",
];
assert_batches_sorted_eq!(expected, &batches);
}
Loading

0 comments on commit c574335

Please sign in to comment.