Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for reading read statistics from parquet files without statistics and boolean & struct data type #10608

Merged
merged 9 commits into from
May 22, 2024
93 changes: 84 additions & 9 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use std::fs::File;
use std::sync::Arc;

use arrow_array::{
make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array,
make_array, Array, ArrayRef, BooleanArray, Decimal128Array, FixedSizeBinaryArray,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
Expand All @@ -33,7 +34,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderB
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{EnabledStatistics, WriterProperties};

use crate::parquet::Scenario;
use crate::parquet::{struct_array, Scenario};

use super::make_test_file_rg;

Expand Down Expand Up @@ -73,6 +74,28 @@ pub fn parquet_file_one_column(
no_null_values_start: i64,
no_null_values_end: i64,
row_per_group: usize,
) -> ParquetRecordBatchReaderBuilder<File> {
parquet_file_one_column_stats(
num_null,
no_null_values_start,
no_null_values_end,
row_per_group,
EnabledStatistics::Chunk,
)
}

// Create a parquet file with one column for data type i64
// Data of the file include
// . Number of null rows is the given num_null
// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row
// . The file is divided into row groups of size row_per_group
// . Statistics are enabled/disabled based on the given enable_stats
pub fn parquet_file_one_column_stats(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add some doc comments about what this function does / how it is different? It is clear from this PR that it adds enable_stats but that might not be obvious in the future

While we are on this topic, I personally find a struct with named parameters easier to read than something the following where you have to look up parquet_file_one_column_stats to know what the 0, 4 and 7 mean.

    let reader =
        parquet_file_one_column_stats(0, 4, 7, row_per_group, EnabledStatistics::None);

I wonder if we could change the code to something like this to make it easier to read 🤔

  let reader = TestFile {
    num_null: 0
    no_null_values_start: 4,
    no_null_values_end: 7,
    row_per_group,
    enable_stats: EnabledStatistic::None,
  }
  .build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do this, we need to think about parquet_file_many_columns, too.
I have to sign off now. How about we do this in following PR?

num_null: usize,
no_null_values_start: i64,
no_null_values_end: i64,
row_per_group: usize,
enable_stats: EnabledStatistics,
) -> ParquetRecordBatchReaderBuilder<File> {
let mut output_file = tempfile::Builder::new()
.prefix("parquert_statistics_test")
Expand All @@ -82,7 +105,7 @@ pub fn parquet_file_one_column(

let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
.set_statistics_enabled(EnabledStatistics::Chunk)
.set_statistics_enabled(enable_stats)
.build();

let batches = vec![make_int64_batches_with_null(
Expand Down Expand Up @@ -203,9 +226,7 @@ impl Test {
// TESTS
//
// Remaining cases
// - Create parquet files / metadata with missing statistic values
// - Create parquet files / metadata with different data types -- included but not all data types yet
// - Create parquet files / metadata with different row group sizes -- done
// f64::NAN
// - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact)

#[tokio::test]
Expand Down Expand Up @@ -898,8 +919,62 @@ async fn test_period_in_column_names() {
.run("service.name");
}

// TODO:
// WITHOUT Stats
// Boolean
#[tokio::test]
async fn test_boolean() {
let row_per_group = 5;
// This creates a parquet files of 1 column named "bool"
// The file is created by 2 record batches each has 5 rows --> 2 row groups
let reader = parquet_file_many_columns(Scenario::Boolean, row_per_group).await;

Test {
reader,
expected_min: Arc::new(BooleanArray::from(vec![false, false])),
expected_max: Arc::new(BooleanArray::from(vec![true, false])),
expected_null_counts: UInt64Array::from(vec![1, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5]),
}
.run("bool");
}

// struct array
// BUG
// https://github.com/apache/datafusion/issues/10609
// Note that: since I have not worked on struct array before, there may be a bug in the test code rather than the real bug in the code
#[ignore]
#[tokio::test]
async fn test_struct() {
let row_per_group = 5;
// This creates a parquet files of 1 column named "struct"
// The file is created by 1 record batch with 3 rows in the struct array
let reader = parquet_file_many_columns(Scenario::StructArray, row_per_group).await;

Test {
reader,
expected_min: Arc::new(struct_array(vec![(Some(1), Some(6.0), Some(12.0))])),
expected_max: Arc::new(struct_array(vec![(Some(2), Some(8.5), Some(14.0))])),
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![3]),
}
.run("struct");
}
////// Files with missing statistics ///////

#[tokio::test]
async fn test_missing_statistics() {
let row_per_group = 5;
let reader =
parquet_file_one_column_stats(0, 4, 7, row_per_group, EnabledStatistics::None);

Test {
reader,
expected_min: Arc::new(Int64Array::from(vec![None])),
expected_max: Arc::new(Int64Array::from(vec![None])),
expected_null_counts: UInt64Array::from(vec![None]),
expected_row_counts: UInt64Array::from(vec![3]), // stil has row count statistics
}
.run("i64");
}

/////// NEGATIVE TESTS ///////
// column not found
Expand Down
70 changes: 69 additions & 1 deletion datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::make_array;
use arrow_array::{make_array, BooleanArray, Float32Array, StructArray};
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -65,6 +65,7 @@ fn init() {

/// What data to use
enum Scenario {
Boolean,
Timestamps,
Dates,
Int,
Expand All @@ -81,6 +82,7 @@ enum Scenario {
PeriodsInColumnNames,
WithNullValues,
WithNullValuesPageLevel,
StructArray,
}

enum Unit {
Expand Down Expand Up @@ -312,6 +314,16 @@ impl ContextWithParquet {
}
}

fn make_boolean_batch(v: Vec<Option<bool>>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new(
"bool",
DataType::Boolean,
true,
)]));
let array = Arc::new(BooleanArray::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

/// Return record batch with a few rows of data for all of the supported timestamp types
/// values with the specified offset
///
Expand Down Expand Up @@ -699,6 +711,24 @@ fn make_int_batches_with_null(

fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Boolean => {
vec![
make_boolean_batch(vec![
Some(true),
Some(false),
Some(true),
Some(false),
None,
]),
make_boolean_batch(vec![
Some(false),
Some(false),
Some(false),
Some(false),
Some(false),
]),
]
}
Scenario::Timestamps => {
vec![
make_timestamp_batch(TimeDelta::try_seconds(0).unwrap()),
Expand Down Expand Up @@ -881,6 +911,20 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_int_batches_with_null(5, 1, 6),
]
}
Scenario::StructArray => {
let struct_array_data = struct_array(vec![
(Some(1), Some(6.0), Some(12.0)),
(Some(2), Some(8.5), None),
(None, Some(8.5), Some(14.0)),
]);

let schema = Arc::new(Schema::new(vec![Field::new(
"struct",
struct_array_data.data_type().clone(),
true,
)]));
vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()]
}
}
}

Expand Down Expand Up @@ -936,3 +980,27 @@ async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTe
writer.close().unwrap();
output_file
}

// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values
fn struct_array(input: Vec<(Option<i32>, Option<f32>, Option<f64>)>) -> ArrayRef {
let int_32: Int32Array = input.iter().map(|(i, _, _)| i).collect();
let float_32: Float32Array = input.iter().map(|(_, f, _)| f).collect();
let float_64: Float64Array = input.iter().map(|(_, _, f)| f).collect();

let nullable = true;
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("int32_col", DataType::Int32, nullable)),
Arc::new(int_32) as ArrayRef,
),
(
Arc::new(Field::new("float32_col", DataType::Float32, nullable)),
Arc::new(float_32) as ArrayRef,
),
(
Arc::new(Field::new("float64_col", DataType::Float64, nullable)),
Arc::new(float_64) as ArrayRef,
),
]);
Arc::new(struct_array)
}