Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: schema_force_view_type configuration not working for CREATE EXTERNAL TABLE #14922

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl FileFormat for TSVFileFormat {
.await
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Ok(schema)
}

async fn infer_stats(
&self,
state: &dyn Session,
Expand Down
3 changes: 2 additions & 1 deletion datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use tempfile::tempdir;
#[tokio::main]
async fn main() -> Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
let config = SessionConfig::new().with_parquet_force_view_metadata(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we need to make the datatype consistent for insert into case.

let ctx = SessionContext::new_with_config(config);
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl FileFormat for ArrowFormat {
Ok(Arc::new(merged_schema))
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Ok(schema)
}

async fn infer_stats(
&self,
_state: &dyn Session,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl FileFormat for AvroFormat {
Ok(Arc::new(merged_schema))
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Ok(schema)
}

async fn infer_stats(
&self,
_state: &dyn Session,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Ok(schema)
}

async fn infer_stats(
&self,
_state: &dyn Session,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ impl FileFormat for JsonFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Ok(schema)
}

async fn create_physical_plan(
&self,
_state: &dyn Session,
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,21 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(schema))
}

async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this transformation rule adapted from some existing transformation? Because there is an additional rule here: transform_binary_to_string, so I made a such guess.
If so, perhaps we should extract the common logic to keep it consistent, or at least let them reference to each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we call it at the end of infer_shema, but it's not limited to infer_shema:

     let schema = if self.binary_as_string() {
            transform_binary_to_string(&schema)
        } else {
            schema
        };

        let schema = if self.force_view_types() {
            transform_schema_to_view(&schema)
        } else {
            schema
        };

I will try to extract the common logic to make it consistent.

let schema = if self.binary_as_string() {
Arc::new(transform_binary_to_string(schema.as_ref()))
} else {
schema
};

let schema = if self.force_view_types() {
Arc::new(transform_schema_to_view(schema.as_ref()))
} else {
schema
};
Ok(schema)
}

async fn infer_stats(
&self,
_state: &dyn Session,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ impl TableProviderFactory for ListingTableFactory {

schema
}
Some(s) => s,
Some(s) => {
// For provided schema, we also need to transform it,
// and which is also done for the infer schema if it is not provided
options.format.transform_schema(s).await?
}
};

let config = ListingTableConfig::new(table_path)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
.with_schema(resolved_schema);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,8 @@ async fn write_json_with_order() -> Result<()> {
#[tokio::test]
async fn write_table_with_order() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = SessionContext::new();
let config = SessionConfig::new().with_parquet_force_view_metadata(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to disable this?

I can explain, because

  1. We select from the datasource or memory which are not forced to utf8view type, but we create external table with utf8view field when we enable the force_view.
  2. When we insert into cases, we check the insert data type and the table data type, so we should make the data type consistent.

Related ticket to support cast it to utf8view automaticlly:
#13408 (comment)

statement ok
create table t(a varchar) as values ('1'), ('2');

query T
select arrow_typeof(a) from t;
----
Utf8
Utf8

statement ok
drop table t

let ctx = SessionContext::new_with_config(config);
let location = tmp_dir.path().join("test_table/");

let mut write_df = ctx
Expand Down
6 changes: 6 additions & 0 deletions datafusion/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
objects: &[ObjectMeta],
) -> Result<SchemaRef>;

/// Transform the schema of the provided object. For example for parquet files:
/// 1. Transform a schema so that any binary types are strings
/// 2. Transform a schema to use view types for Utf8 and Binary
/// Other file formats may have other transformations, but currently only for parquet
async fn transform_schema(&self, schema: SchemaRef) -> Result<SchemaRef>;

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
///
Expand Down
8 changes: 8 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ impl SessionConfig {
self
}

pub fn with_parquet_force_view_metadata(
mut self,
schema_force_view_types: bool,
) -> Self {
self.options.execution.parquet.schema_force_view_types = schema_force_view_types;
self
}

/// Returns true if the joins will be enforced to output batches of the configured size
pub fn enforce_batch_size_in_joins(&self) -> bool {
self.options.execution.enforce_batch_size_in_joins
Expand Down
9 changes: 6 additions & 3 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,16 @@ explain insert into table_without_values select c1 from aggregate_test_100 order
----
logical_plan
01)Dml: op=[Insert Into] table=[table_without_values]
02)--Projection: aggregate_test_100.c1 AS c1
02)--Projection: CAST(aggregate_test_100.c1 AS Utf8View) AS c1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also expected, for the sql way insert into, we will automatically cast the type to match the table schema, see details:

The code here:

.cast_to(target_field.data_type(), source.schema())?

03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
04)------TableScan: aggregate_test_100 projection=[c1]
physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
02)--CoalescePartitionsExec
Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Mar 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this PR:

I am confused why we add CAST() for logic plan will cause the physical plan change to add

02)--CoalescePartitionsExec
03)----ProjectionExec: expr=[CAST(c1@0 AS Utf8View) as c1]
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

03)----ProjectionExec: expr=[CAST(c1@0 AS Utf8View) as c1]
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
05)--------SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false]
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true

query I
insert into table_without_values select c1 from aggregate_test_100 order by c1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b;
----
logical_plan
01)Sort: t_pushdown.b ASC NULLS LAST
02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8("bar")]
02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8View("bar")]
Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Mar 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

## Create table without pushdown
statement ok
CREATE EXTERNAL TABLE t_pushdown(a varchar, b int, c float) STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/';

This is expected, now we support it for external table creation with Utf8View for this PR.

physical_plan
01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
Expand Down