Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strip table qualifiers from schema in UNION ALL #10707

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1369,33 +1369,32 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
}

// create union schema
let union_qualified_fields =
let union_fields =
zip(left_plan.schema().iter(), right_plan.schema().iter())
.map(
|((left_qualifier, left_field), (_right_qualifier, right_field))| {
|((_, left_field), (_, right_field))| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test that failed because of this change, and looking into the history the reason this was done was to avoid the following error:

#5410

SchemaError(DuplicateUnqualifiedField { name: "a" })

So it seems that this is intentional 🤔 If this only affects the Unparser, I could try stripping the qualifier only from the Sort node if the input node is a Union, but that seems a bit hacky to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further discussion that seems to indicate that removing the qualifier is the correct behavior: #2943

I'll dig into if there is a way around the DuplicateUnqualifiedField

Copy link
Contributor Author

@phillipleblanc phillipleblanc Jun 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asked on Discord why we need to enforce the DuplicateUnqualifiedField check - Postgres itself does not error if two columns have the same name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've raised #11082 as an alternative way to do this that only affects the Unparser.

let nullable = left_field.is_nullable() || right_field.is_nullable();
let data_type = comparison_coercion(
left_field.data_type(),
right_field.data_type(),
)
.ok_or_else(|| {
plan_datafusion_err!(
"UNION Column {} (type: {}) is not compatible with column {} (type: {})",
right_field.name(),
right_field.data_type(),
left_field.name(),
left_field.data_type()
)
"UNION Column {} (type: {}) is not compatible with column {} (type: {})",
right_field.name(),
right_field.data_type(),
left_field.name(),
left_field.data_type()
)
})?;
Ok((
left_qualifier.cloned(),
None,
Arc::new(Field::new(left_field.name(), data_type, nullable)),
))
},
)
.collect::<Result<Vec<_>>>()?;
let union_schema =
DFSchema::new_with_metadata(union_qualified_fields, HashMap::new())?;
let union_schema = DFSchema::new_with_metadata(union_fields, HashMap::new())?;

let inputs = vec![left_plan, right_plan]
.into_iter()
Expand Down Expand Up @@ -2255,4 +2254,29 @@ mod tests {

Ok(())
}

#[test]
fn test_union_strips_qualifiers() -> Result<()> {
let schema = Schema::new(vec![
Field::new("foo", DataType::Int32, false),
Field::new("bar", DataType::Int32, false),
]);
let result = table_scan(Some("t1"), &schema, None)?
.union(table_scan(Some("t2"), &schema, None)?.build()?)?
.build()?;

let LogicalPlan::Union(union) = result else {
panic!("expected union, got {result:?}")
};

assert!(
union
.schema
.iter()
.all(|(qualifier, _)| qualifier.is_none()),
"Expected the schema from a Union to not have any table qualifiers"
);

Ok(())
}
}
14 changes: 13 additions & 1 deletion datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
TestStatementWithDialect {
sql: "SELECT j1_id FROM j1
UNION ALL
SELECT tb.j2_id as j1_id FROM j2 tb
ORDER BY j1_id
LIMIT 10;",
expected: r#"SELECT j1.j1_id FROM j1 UNION ALL SELECT tb.j2_id AS j1_id FROM j2 AS tb ORDER BY j1_id ASC NULLS LAST LIMIT 10"#,
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect: Box::new(UnparserDefaultDialect {}),
},
];

for query in tests {
Expand All @@ -239,7 +249,9 @@ fn roundtrip_statement_with_dialect() -> Result<()> {

let context = MockContextProvider::default();
let sql_to_rel = SqlToRel::new(&context);
let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();
let plan = sql_to_rel
.sql_statement_to_plan(statement)
.unwrap_or_else(|e| panic!("Failed to parse sql: {}\n{e}", query.sql));

let unparser = Unparser::new(&*query.unparser_dialect);
let roundtrip_statement = unparser.plan_to_sql(&plan)?;
Expand Down
Loading