-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support marking columns as system columns via Field's metadata #14362
base: main
Are you sure you want to change the base?
Conversation
adriangb
commented
Jan 29, 2025
•
edited by alamb
Loading
edited by alamb
- Related feat: metadata columns #14057
- Related metadata column support #13975
I don't know if there's any other "known" metadata, but I feel like it would be good to have an extension trait along the lines of: /// Extension of [`Field`] to manage DataFusion specific field metadata.
pub trait DataFusionFieldInfo {
/// A nice docstring!
fn is_system_column(&self) -> bool;
/// Another nice docstring :)
fn as_system_column(mut self) -> Self;
}
impl DataFusionFieldInfo for Field { ... } And then we can call Edit: done! |
let flat_name = format!("{}.{}", qualifier, field.name()); | ||
let flat_name = | ||
Column::new(Some(qualifier.clone()), field.name()) | ||
.flat_name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nit of the existing code. I'd rather re-use the logic for creating flat names at a trivial cost just to avoid having arbitrary string formatting going on here.
@chenkovsky @jayzhan211 could you please review? |
as described in document, https://spark.apache.org/docs/3.5.1/api/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.html If a table column and a metadata column have the same name, the conflict is resolved by either renaming or suppressing the metadata column. I think we should also guarantee suppressing the metadata column in case of arbitrary fields order. |
@chenkovsky could you maybe translate that into a test that we can add? I'm having trouble imagining in what sorts of situations this would apply. Generally in SQL if you have two columns with the same name it's an error (and this is enforced by |
something like this. #[tokio::test]
async fn test_name_conflict() {
let batch = record_batch!(
("_rowid", UInt32, [0, 1, 2]),
("_rowid", Utf8, ["file-0", "file-1", "file-2"])
)
.unwrap();
let batch = batch
.with_schema(Arc::new(Schema::new(vec![
Field::new("_rowid", DataType::UInt32, true).to_system_column(),
Field::new("_rowid", DataType::Utf8, true),
])))
.unwrap();
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);
let _ = ctx.register_batch("test", batch);
let select = "SELECT _rowid FROM test";
let df = ctx.sql(select).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| file-0 |",
"| file-1 |",
"| file-2 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);
} Currently DuplicateQualifiedField error is thrown. But it's valid in spark. |
Is this that important to support? The example seems a bit contrived, I think it'd be more reasonable if it occurred naturally as part of a join or something where a user could unexpectedly run into it. Otherwise it seems to me like something very spark specific. Postgres for example doesn't let you duplicate system columns: create table t (ctid int);
ERROR: column name "ctid" conflicts with a system column name |
I will still give a shot at adding that feature tomorrow. But I'm not sold on the behavior being ideal even if that's what Spark does. Besides if it's an error now we can always make it not error in the future. |
because spark is not a database, it's a compute engine. For databases such as postgres, it can manipulate schema and data by itself. so it's ok to disable system column and normal column name conflict. but for spark, the data comes from other systems. it cannot guarantee data doesn't contain these conflict fields. |
Was that working in #14057? I didn't see a test for it. Hypothetically speaking we could do something in DFSchema to deduplicate but I worry that won't make it work e.g. we'll end up with mismatched fields and arrays. |
Yes, it supports. it's my fault. I have added the missing ut. |
by the way, I also encapsulate METADATA_OFFSET. |
aa5abb9
to
3cfce67
Compare
@chenkovsky does 3cfce67 satisfy the use case? |
yes,it's the case. but here you decided whether this is system field. it should be decided by table provider automatically. |
I'm not sure I follow. At some point you have to mark a field as a system column. I used explicit batches / schemas just to make the tests easier, but of course you can do it via a |
i dont mean that this way cannot reach the target. but when i use spark style api i dont need to take care it. it's battery included. no need to call extra api to do it. please correct me if I'm wrong.
the drawbacks are
Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on |
I don't think (2) or (3) are real concerns. Things manipulating metadata should already be careful about not clobbering existing metadata and I really, really don't think the performance of inserting into a small HashMap will ever be an issue. |
For (1) yes implementers of TableProvider will have to decide what fields they return, but isn't that already the case? I don't know how DataFusion would automatically determine what is a system column or not. It really depends on the data source. |
Could you give example test cases that you think are relevant? I have a hunch that the current implementation may already handle these correctly. |
sorry I have no test. I haven't implemented these features now. because my implementation changes some APIs, so I think it's better to do it when APIs are stable. |
for(3) give an example. many file formats contain field metadata, e.g. parquet. if input file contains the magic metadata in this approach, normal field will also be treated as system field. |
So you're worried that if a Parquet file has this special metadata on a field we will wrongly interpret it as a system column? Or are you saying that's a good thing / the goal? Personally I think it's safe to say that if a parquet file has field metadata that begins with |
I'm worried about it. |
See also #12736. If a method mutates the metadata and drops an unknown key on the floor that's a bug in that method / piece of code. |
You're worried that someone by accident (eg because it was written by another system) would add field metadata with the key |
event not by accident. sometimes we select _rowid and save it in parquet. the parquet will contain magic metadata. |
I see. That makes sense for something like row_id: once you write it to a new file it may not be valid anymore. But for other metadata you might want to round trip it as a system column. I think the argument could be made both ways. You also should probably be doing But I do think there is a stronger argument for doing something fancier: WITH cte AS (
SELECT id, _row_id FROM t
)
SELECT * FROM cte IMO _row_id should be included. |
One thing I didn't understand from you PR @chenkovsky: you got it to work only modifying TableScan? I had trouble understanding when I was grokking your PR. If you have a better way to implement this than modifying projection, join, etc. I'm all ears. |
450ed48
to
c2b58ee
Compare
For now I pushed c2b58ee which has a test that I think covers all of the cases discussed. So our job now is to get those tests passing, using either the approach in this PR or in #14057. It seems to me the main difference is how to identify and track metadata columns. But the hooks needed into Projection, Join, etc. should be the same. Do you agree @chenkovsky ? |
No, I have no better way. because for different logical plans, the system column propagation strategies are different. Currently in this PR, system column will be propagated by default. it's also not correct. you need to modify some logical plans to stop propagation. In my implementation, system columns are not propagated by default. I need to modify logical plan to do propagation. so I think logical plan should always be changed. |
0ff35d2
to
c574335
Compare
ff49c37
to
366398f
Compare
Makes sense. I think I've gotten all of the important cases here working, could you take a look at the tests and see if you are happy with them? I will note that if we drop just one of the requirements from Spark, namely the resolution of name conflicts in favor of non-system columns, the whole thing becomes a lot simpler: hiding system columns from |