Skip to content

Commit

Permalink
test: add struct_list to json test
Browse files Browse the repository at this point in the history
  • Loading branch information
kysshsy committed Dec 30, 2024
1 parent c97511d commit 035f247
Showing 1 changed file with 113 additions and 3 deletions.
116 changes: 113 additions & 3 deletions tests/tests/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ mod fixtures;

use anyhow::Result;
use datafusion::arrow::array::{
ArrowPrimitiveType, BooleanBuilder, LargeStringArray, LargeStringBuilder, ListArray,
ListBuilder, PrimitiveBuilder, StringArray, StringBuilder,
ArrayBuilder, ArrowPrimitiveType, BooleanBuilder, LargeStringArray, LargeStringBuilder,
ListArray, ListBuilder, PrimitiveBuilder, StringArray, StringBuilder, StructBuilder,
};
use datafusion::arrow::datatypes::{
DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type, Schema,
DataType, Field, Fields, Int16Type, Int32Type, Int64Type, Int8Type, Schema,
};
use datafusion::{arrow::record_batch::RecordBatch, parquet::arrow::ArrowWriter};
use rstest::*;
Expand Down Expand Up @@ -88,6 +88,7 @@ fn primitive_list_array<T: ArrowPrimitiveType<Native: From<V>>, V>(

list_builder.finish()
}

fn string_list_array(values: Vec<Vec<Option<&str>>>) -> ListArray {
let builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(builder);
Expand Down Expand Up @@ -195,6 +196,78 @@ pub fn json_list_record_batch() -> Result<RecordBatch> {
)?)
}

pub fn struct_list_record_batch() -> Result<RecordBatch> {
let struct_fileds = vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
];
let fields = vec![Field::new(
"struct_array",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(struct_fileds.clone())),
true,
))),
false,
)];

let schema = Arc::new(Schema::new(fields));

let struct_values = vec![
vec![
Some(("joe", 12)),
None,
Some(("jane", 13)),
Some(("jim", 14)),
],
vec![Some(("joe", 12))],
];

let struct_array = {
let mut struct_list_builder = ListBuilder::new(StructBuilder::new(
struct_fileds,
vec![
Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
Box::new(PrimitiveBuilder::<Int32Type>::new()) as Box<dyn ArrayBuilder>,
],
));

for sublist in struct_values {
for value in sublist {
if let Some((name, age)) = value {
struct_list_builder.values().append(true);
struct_list_builder
.values()
.field_builder::<StringBuilder>(0)
.unwrap()
.append_value(name);
struct_list_builder
.values()
.field_builder::<PrimitiveBuilder<Int32Type>>(1)
.unwrap()
.append_value(age);
} else {
struct_list_builder.values().append(false);
struct_list_builder
.values()
.field_builder::<StringBuilder>(0)
.unwrap()
.append_null();
struct_list_builder
.values()
.field_builder::<PrimitiveBuilder<Int32Type>>(1)
.unwrap()
.append_null();
}
}
struct_list_builder.append(true);
}
struct_list_builder.finish()
};

Ok(RecordBatch::try_new(schema, vec![Arc::new(struct_array)])?)
}

#[rstest]
async fn test_json_cast_from_string(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = json_string_record_batch()?;
Expand Down Expand Up @@ -281,3 +354,40 @@ fn test_json_cast_from_list(mut conn: PgConnection, tempdir: TempDir) -> Result<

Ok(())
}

#[rstest]
fn test_json_cast_from_struct_list(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = struct_list_record_batch()?;
let parquet_path = tempdir
.path()
.join("test_json_cast_from_struct_list.parquet");
let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

primitive_create_foreign_data_wrapper(
"parquet_wrapper",
"parquet_fdw_handler",
"parquet_fdw_validator",
)
.execute(&mut conn);
primitive_create_server("parquet_server", "parquet_wrapper").execute(&mut conn);
format!(
"CREATE FOREIGN TABLE json_table ()
SERVER parquet_server OPTIONS (files '{}')",
parquet_path.to_str().unwrap()
)
.execute(&mut conn);

let r = "SELECT * FROM json_table".execute_result(&mut conn);
assert!(r.is_ok(), "error in query:'{}'", r.unwrap_err());

let row: (Json<JsonValue>,) =
"SELECT struct_array FROM json_table where struct_array = '[{\"name\": \"joe\", \"age\": 12}]'"
.fetch_one(&mut conn);
assert_eq!(row.0, Json::from(json!([{"name": "joe", "age": 12}])));

Ok(())
}

0 comments on commit 035f247

Please sign in to comment.