Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support marking columns as system columns via Field's metadata #14362

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Jan 29, 2025

@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate labels Jan 29, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Jan 29, 2025

I don't know if there's any other "known" metadata, but I feel like it would be good to have an extension trait along the lines of:

/// Extension of [`Field`] to manage DataFusion specific field metadata.
pub trait DataFusionFieldInfo {
    /// A nice docstring!
    fn is_system_column(&self) -> bool;
   /// Another nice docstring :)
    fn as_system_column(mut self) -> Self;
}

impl DataFusionFieldInfo for Field { ... }

And then we can call field.is_system_column() from anywhere that needs to check.

Edit: done!

@github-actions github-actions bot added the common Related to common crate label Jan 29, 2025
@adriangb adriangb changed the title Support marking columns as system columns via metadata Support marking columns as system columns via Field's metadata Jan 29, 2025
Comment on lines -743 to +815
let flat_name = format!("{}.{}", qualifier, field.name());
let flat_name =
Column::new(Some(qualifier.clone()), field.name())
.flat_name();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit of the existing code. I'd rather re-use the logic for creating flat names at a trivial cost just to avoid having arbitrary string formatting going on here.

@adriangb
Copy link
Contributor Author

@chenkovsky @jayzhan211 could you please review?

@chenkovsky
Copy link

as described in document, https://spark.apache.org/docs/3.5.1/api/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.html

If a table column and a metadata column have the same name, the conflict is resolved by either renaming or suppressing the metadata column. I think we should also guarantee suppressing the metadata column in case of arbitrary fields order.

@adriangb
Copy link
Contributor Author

@chenkovsky could you maybe translate that into a test that we can add? I'm having trouble imagining in what sorts of situations this would apply. Generally in SQL if you have two columns with the same name it's an error (and this is enforced by DFSchema).

@chenkovsky
Copy link

something like this.

#[tokio::test]
async fn test_name_conflict() {
    let batch = record_batch!(
        ("_rowid", UInt32, [0, 1, 2]),
        ("_rowid", Utf8, ["file-0", "file-1", "file-2"])
    )
    .unwrap();
    let batch = batch
        .with_schema(Arc::new(Schema::new(vec![
            Field::new("_rowid", DataType::UInt32, true).to_system_column(),
            Field::new("_rowid", DataType::Utf8, true),
        ])))
        .unwrap();

    let ctx = SessionContext::new_with_config(
        SessionConfig::new().with_information_schema(true),
    );
    let _ = ctx.register_batch("test", batch);
    let select = "SELECT _rowid FROM test";
    let df = ctx.sql(select).await.unwrap();
    let batchs = df.collect().await.unwrap();
    let expected = [
        "+--------+",
        "| _rowid |",
        "+--------+",
        "| file-0 |",
        "| file-1 |",
        "| file-2 |",
        "+--------+",
    ];
    assert_batches_sorted_eq!(expected, &batchs);
}

Currently DuplicateQualifiedField error is thrown. But it's valid in spark.

@adriangb
Copy link
Contributor Author

Is this that important to support? The example seems a bit contrived, I think it'd be more reasonable if it occurred naturally as part of a join or something where a user could unexpectedly run into it. Otherwise it seems to me like something very spark specific.

Postgres for example doesn't let you duplicate system columns:

create table t (ctid int);
ERROR:  column name "ctid" conflicts with a system column name

@adriangb
Copy link
Contributor Author

I will still give a shot at adding that feature tomorrow. But I'm not sold on the behavior being ideal even if that's what Spark does. Besides if it's an error now we can always make it not error in the future.

@chenkovsky
Copy link

Is this that important to support? The example seems a bit contrived, I think it'd be more reasonable if it occurred naturally as part of a join or something where a user could unexpectedly run into it. Otherwise it seems to me like something very spark specific.

Postgres for example doesn't let you duplicate system columns:

create table t (ctid int);
ERROR:  column name "ctid" conflicts with a system column name

because spark is not a database, it's a compute engine. For databases such as postgres, it can manipulate schema and data by itself. so it's ok to disable system column and normal column name conflict. but for spark, the data comes from other systems. it cannot guarantee data doesn't contain these conflict fields.

@adriangb
Copy link
Contributor Author

Was that working in #14057? I didn't see a test for it.

Hypothetically speaking we could do something in DFSchema to deduplicate but I worry that won't make it work e.g. we'll end up with mismatched fields and arrays.

@chenkovsky
Copy link

Yes, it supports. it's my fault. I have added the missing ut.

@chenkovsky
Copy link

by the way, I also encapsulate METADATA_OFFSET.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

@chenkovsky does 3cfce67 satisfy the use case?

@chenkovsky
Copy link

@chenkovsky does 3cfce67 satisfy the use case?

yes,it's the case. but here you decided whether this is system field. it should be decided by table provider automatically.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

it should be decided by table provider automatically

I'm not sure I follow. At some point you have to mark a field as a system column. I used explicit batches / schemas just to make the tests easier, but of course you can do it via a TableProvider and register that, you just have to return the right schema from TableProvider::schema. I can add cook up an example of a TableProvider with system columns if you'd like.

@chenkovsky
Copy link

chenkovsky commented Jan 30, 2025

i dont mean that this way cannot reach the target. but when i use spark style api i dont need to take care it. it's battery included. no need to call extra api to do it.

please correct me if I'm wrong.
the benefit of this approch is

  1. dont need to change api

the drawbacks are

  1. table provider developers have to take care of that and call an api to merge system field and normal field.
  2. hashmap performance is not good.
  3. hashmap need more effort to take care. for example, maybe we have another method that can also change hashmap later. we have to make sure that method wont change this special key value.

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on

Projection

Join

SubqueryAlias

@adriangb
Copy link
Contributor Author

I don't think (2) or (3) are real concerns. Things manipulating metadata should already be careful about not clobbering existing metadata and I really, really don't think the performance of inserting into a small HashMap will ever be an issue.

@adriangb
Copy link
Contributor Author

For (1) yes implementers of TableProvider will have to decide what fields they return, but isn't that already the case? I don't know how DataFusion would automatically determine what is a system column or not. It really depends on the data source.

@adriangb
Copy link
Contributor Author

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on Projection, Join, SubqueryAlias

Could you give example test cases that you think are relevant? I have a hunch that the current implementation may already handle these correctly.

@chenkovsky
Copy link

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on Projection, Join, SubqueryAlias

Could you give example test cases that you think are relevant? I have a hunch that the current implementation may already handle these correctly.

sorry I have no test. I haven't implemented these features now. because my implementation changes some APIs, so I think it's better to do it when APIs are stable.

@chenkovsky
Copy link

I don't think (2) or (3) are real concerns. Things manipulating metadata should already be careful about not clobbering existing metadata and I really, really don't think the performance of inserting into a small HashMap will ever be an issue.

for(3) give an example. many file formats contain field metadata, e.g. parquet. if input file contains the magic metadata in this approach, normal field will also be treated as system field.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

So you're worried that if a Parquet file has this special metadata on a field we will wrongly interpret it as a system column? Or are you saying that's a good thing / the goal?

Personally I think it's safe to say that if a parquet file has field metadata that begins with datafusion.x we should be able to interpret that.

@chenkovsky
Copy link

So you're worried that if a Parquet file has this special metadata on a field we will wrongly interpret it as a system column? Or are you saying that's a good thing / the goal?

I'm worried about it.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

hashmap need more effort to take care. for example, maybe we have another method that can also change hashmap later. we have to make sure that method wont change this special key value.

See also #12736. If a method mutates the metadata and drops an unknown key on the floor that's a bug in that method / piece of code.

@adriangb
Copy link
Contributor Author

So you're worried that if a Parquet file has this special metadata on a field we will wrongly interpret it as a system column? Or are you saying that's a good thing / the goal?

I'm worried about it.

You're worried that someone by accident (eg because it was written by another system) would add field metadata with the key datafusion.system_column and the value true and that their intention would not be for DataFusion to interpret that as a system column?

@chenkovsky
Copy link

So you're worried that if a Parquet file has this special metadata on a field we will wrongly interpret it as a system column? Or are you saying that's a good thing / the goal?

I'm worried about it.

You're worried that someone by accident (eg because it was written by another system) would add field metadata with the key datafusion.system_column and the value true and that their intention would not be for DataFusion to interpret that as a system column?

event not by accident. sometimes we select _rowid and save it in parquet. the parquet will contain magic metadata.
then we read this parquet. we get two system columns _rowid.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 31, 2025

I see. That makes sense for something like row_id: once you write it to a new file it may not be valid anymore. But for other metadata you might want to round trip it as a system column. I think the argument could be made both ways. You also should probably be doing EXCEPT (row_id) when you write.

But I do think there is a stronger argument for doing something fancier:

WITH cte AS (
  SELECT id, _row_id FROM t
)
SELECT * FROM cte

IMO _row_id should be included.
So should we say that a system column stops being a system column once it's projected?

@adriangb
Copy link
Contributor Author

One thing I didn't understand from you PR @chenkovsky: you got it to work only modifying TableScan? I had trouble understanding when I was grokking your PR. If you have a better way to implement this than modifying projection, join, etc. I'm all ears.

@github-actions github-actions bot added the catalog Related to the catalog crate label Jan 31, 2025
@adriangb
Copy link
Contributor Author

For now I pushed c2b58ee which has a test that I think covers all of the cases discussed.

So our job now is to get those tests passing, using either the approach in this PR or in #14057.

It seems to me the main difference is how to identify and track metadata columns. But the hooks needed into Projection, Join, etc. should be the same. Do you agree @chenkovsky ?

@chenkovsky
Copy link

One thing I didn't understand from you PR @chenkovsky: you got it to work only modifying TableScan? I had trouble understanding when I was grokking your PR. If you have a better way to implement this than modifying projection, join, etc. I'm all ears.

No, I have no better way. because for different logical plans, the system column propagation strategies are different. Currently in this PR, system column will be propagated by default. it's also not correct. you need to modify some logical plans to stop propagation. In my implementation, system columns are not propagated by default. I need to modify logical plan to do propagation. so I think logical plan should always be changed.

@adriangb
Copy link
Contributor Author

Makes sense.

I think I've gotten all of the important cases here working, could you take a look at the tests and see if you are happy with them?

I will note that if we drop just one of the requirements from Spark, namely the resolution of name conflicts in favor of non-system columns, the whole thing becomes a lot simpler: hiding system columns from SELECT * is not that difficult, but overriding how name collisions are resolved in the many places they can happen is a lot more involved. Although I think I did it correctly in this PR I fear that the extent of the modification to the rest of the codebase and risk of things falling through the cracks in the future (e.g. if someone adds a new join type or otherwise does more invasive work of their own) is higher than the other part of this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants