diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs index 165d82627061..daa8ff6dc124 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_file_format.rs @@ -96,6 +96,10 @@ impl FileFormat for TSVFileFormat { .await } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + Ok(schema) + } + async fn infer_stats( &self, state: &dyn Session, diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 6f61c164f41d..7481579dfcfe 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -59,7 +59,8 @@ use tempfile::tempdir; #[tokio::main] async fn main() -> Result<()> { // The SessionContext is the main high level API for interacting with DataFusion - let ctx = SessionContext::new(); + let config = SessionConfig::new().with_parquet_force_view_metadata(false); + let ctx = SessionContext::new_with_config(config); read_parquet(&ctx).await?; read_csv(&ctx).await?; read_memory(&ctx).await?; diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 3614b788af90..10a94a7256fd 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -158,6 +158,10 @@ impl FileFormat for ArrowFormat { Ok(Arc::new(merged_schema)) } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + Ok(schema) + } + async fn infer_stats( &self, _state: &dyn Session, diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index e7314e839bf2..a299c22d1a82 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -136,6 +136,10 @@ impl FileFormat for AvroFormat { Ok(Arc::new(merged_schema)) } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + Ok(schema) + } + async fn infer_stats( &self, _state: &dyn Session, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 45ad3e8c1c30..5dba0222d63a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -400,6 +400,10 @@ impl FileFormat for CsvFormat { Ok(Arc::new(merged_schema)) } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + Ok(schema) + } + async fn infer_stats( &self, _state: &dyn Session, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 1a2aaf3af8be..47adcc681824 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -246,6 +246,10 @@ impl FileFormat for JsonFormat { Ok(Statistics::new_unknown(&table_schema)) } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + Ok(schema) + } + async fn create_physical_plan( &self, _state: &dyn Session, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4a24871aeef7..0368dd83af4a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -377,6 +377,21 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(schema)) } + async fn transform_schema(&self, schema: SchemaRef) -> Result { + let schema = if self.binary_as_string() { + Arc::new(transform_binary_to_string(schema.as_ref())) + } else { + schema + }; + + let schema = if self.force_view_types() { + Arc::new(transform_schema_to_view(schema.as_ref())) + } else { + schema + }; + Ok(schema) + } + async fn infer_stats( &self, _state: &dyn Session, diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e9..bfaa449df37c 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -143,8 +143,13 @@ impl TableProviderFactory for ListingTableFactory { schema } - Some(s) => s, + Some(s) => { + // For provided schema, we also need to transform it, + // and which is also done for the infer schema if it is not provided + options.format.transform_schema(s).await? + } }; + let config = ListingTableConfig::new(table_path) .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) .with_schema(resolved_schema); diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index b134ec54b13d..e306d3b6e6b7 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2400,7 +2400,8 @@ async fn write_json_with_order() -> Result<()> { #[tokio::test] async fn write_table_with_order() -> Result<()> { let tmp_dir = TempDir::new()?; - let ctx = SessionContext::new(); + let config = SessionConfig::new().with_parquet_force_view_metadata(false); + let ctx = SessionContext::new_with_config(config); let location = tmp_dir.path().join("test_table/"); let mut write_df = ctx diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index aa0338fab71d..77488ad5f2e7 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -73,6 +73,12 @@ pub trait FileFormat: Send + Sync + fmt::Debug { objects: &[ObjectMeta], ) -> Result; + /// Transform the schema of the provided object. For example for parquet files: + /// 1. Transform a schema so that any binary types are strings + /// 2. Transform a schema to use view types for Utf8 and Binary + /// Other file formats may have other transformations, but currently only for parquet + async fn transform_schema(&self, schema: SchemaRef) -> Result; + /// Infer the statistics for the provided object. The cost and accuracy of the /// estimated statistics might vary greatly between file formats. /// diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 53646dc5b468..e177529dfc4f 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -441,6 +441,14 @@ impl SessionConfig { self } + pub fn with_parquet_force_view_metadata( + mut self, + schema_force_view_types: bool, + ) -> Self { + self.options.execution.parquet.schema_force_view_types = schema_force_view_types; + self + } + /// Returns true if the joins will be enforced to output batches of the configured size pub fn enforce_batch_size_in_joins(&self) -> bool { self.options.execution.enforce_batch_size_in_joins diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 24982dfc28a7..344c49fa169e 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -456,13 +456,16 @@ explain insert into table_without_values select c1 from aggregate_test_100 order ---- logical_plan 01)Dml: op=[Insert Into] table=[table_without_values] -02)--Projection: aggregate_test_100.c1 AS c1 +02)--Projection: CAST(aggregate_test_100.c1 AS Utf8View) AS c1 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST 04)------TableScan: aggregate_test_100 projection=[c1] physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) -02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true +02)--CoalescePartitionsExec +03)----ProjectionExec: expr=[CAST(c1@0 AS Utf8View) as c1] +04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +05)--------SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true query I insert into table_without_values select c1 from aggregate_test_100 order by c1; diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..0f206f88125d 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -144,7 +144,7 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by b; ---- logical_plan 01)Sort: t_pushdown.b ASC NULLS LAST -02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8("bar")] +02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = Utf8View("bar")] physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]