Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSSION] Add separate crate to cover spark builtin functions #5600

Open
comphead opened this issue Mar 14, 2023 · 37 comments · May be fixed by #14392
Open

[DISCUSSION] Add separate crate to cover spark builtin functions #5600

comphead opened this issue Mar 14, 2023 · 37 comments · May be fixed by #14392
Labels
enhancement New feature or request

Comments

@comphead
Copy link
Contributor

comphead commented Mar 14, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The discussion is to collect community thoughts on implementing spark builtin functions listed https://spark.apache.org/docs/3.2.0/api/sql/

We more often face the requests to implement Spark functions and the use case highly depends on person/company stack, one treats the spark more important, for others its opposite and PG compatibility is a priority

Builtin function list between Postgres and Spark are expectedly not the same. I believe it can be rare and worse cases when the function name the same but signature and/or return type is different.

The discussion goal is to find out how to organize DF and keep compatibility for majors like Spark, Postgres, and perhaps other systems

Describe the solution you'd like
@alamb in #5568 (comment) made a proposition to create an extensible crate for spark functions, or even it can be a separate subproject so the PG users have the possibility to exclude Spark functions.

Related Issues

Describe alternatives you've considered

Not doing this
Additional context

Created after #5568 (comment)

@comphead comphead added the enhancement New feature or request label Mar 14, 2023
@andygrove
Copy link
Member

It seems reasonable to me to have separate Spark and PG implementations of functions. This would make it more attractive for users wanting to migrate from Spark to DataFusion.

I like the idea of having a new datafusion-spark crate with anything related to Spark compatibility and integration (such as Spark SQL dialect and Spark built-in functions). I wonder if @yjshen has any thoughts on this since he built a Spark + DataFusion integration in Blaze.

@yjshen
Copy link
Member

yjshen commented Mar 14, 2023

Separating Spark functions into a special crate seems reasonable but supporting Spark UDFs requires significant effort. This is because many UDFs in Spark are designed to be compatible with Hive and handle corner cases differently than other databases like PG. These corner cases increase the workload of integrating Spark/Hive with DataFusion.

When developing Blaze, we must compare the implementations of both engines or port tests first to ensure that they have identical semantics before passing a UDF for execution by DataFusion.

@alamb
Copy link
Contributor

alamb commented Mar 15, 2023

I also agree (unsurprisingly) that a separate crate for libraries of functions would be valuable.

Overall, keeping datafusion setup with a core and then a bunch of extension points, such that different sets of functionalty can be assembled for whatever usecase people want would be my ideal.

For example, there are already a bunch of functions that in theory should be "optional" via feature flags--

https://github.com/apache/arrow-datafusion/blob/6bfede0e1e1a09bd06ea0166e01c0ad4834a1721/datafusion/physical-expr/Cargo.toml#L35-L42

But I am not sure how well the vision and reality match (as in does anyone use datafusion without them)

@comphead
Copy link
Contributor Author

@alamb you are right, we can't provide optional builtin functions, that would be more than unexpected.

In wonderful example #5568 (Implement to_unixtime function) the same functionality can be achieved differently in
Spark to_unix_timestamp(now())
PG extract(epoch from now())
Trino select to_unixtime(now())

I'm not even sure if we need extra crates now. My personal feeling is Trino trying to adopt all possible syntaxes.
Perhaps we go that way and just keep extending the list of builtin functions and in case of conflicts(if any) the PG signature/return type could be as dominant

@comphead
Copy link
Contributor Author

@alamb do we need more discussion?
We can try to adopt syntaxes from major engines like Spark, PG, etc, this will make the migration to DF easier.
However The downside is supporting the same functionality in different implementations leading to code growth, and probably more time to support.

@alamb
Copy link
Contributor

alamb commented Mar 25, 2023

So my opinion on this matter is that ideally DataFusion should be an extensible engine and so people using it can pick whatever parts they want to use and replace what they don't with their own implementations.

DataFusion includes a bunch of pre-built functionality (like the mostly compatible PG functions, parquet / json / etc readers, a memory catalog, etc) in order to get people started so they can focus on extending whatever is most important for their usecase.

So I think it would be great to have a separate crate with "spark compatible functions" (maybe also the same could be done for a "postgres compatible functions crate"). I think the BuiltInFunction thing is not required long term and it would be better if all functions could behave the same as user defined functions

Then the question becomes "where is that crate's code stored" -- it is probably fine initially to be in the main datafusion repo initially and if it gets too unweildy we could break it into its own repo or something.

But the ability to customize the functions available I think is key

@alamb
Copy link
Contributor

alamb commented Feb 1, 2024

BTW #8045 tracks the work to break apart the function libraries into smaller pieces. Once this is done I think adding making a crate of spark compatible functions will be pretty straightforward

@Omega359
Copy link
Contributor

Omega359 commented Feb 1, 2024

I would like to recommend that each function crate also has a benches directory and any Cargo.toml file for those crates have criterion setup as a dev dependency.

@Omega359
Copy link
Contributor

Omega359 commented Feb 1, 2024

But I am not sure how well the vision and reality match (as in does anyone use datafusion without them)

Well, I can say that for my current use case crypto and array functions are something that I'm unlikely to need. Every one's use case is different of course but it's a data point.

@alamb
Copy link
Contributor

alamb commented Feb 1, 2024

Here is a proposal of how to extract / organize the functions: #9100

@alamb
Copy link
Contributor

alamb commented Jan 26, 2025

FWIW we have now completed migrating all functions to User Defined Functions and I think there is growing interest in

BTW I think there are many people interested in spark compatible functions. For exmaple:

Maybe now is time to actually start collaborating on a spark compatible UDF function library. If anyone is interested in working on this, let me know and I can make a repo in https://github.com/datafusion-contrib

This could be similar to what @demetribu was working on in https://github.com/datafusion-contrib/datafusion-functions-extra (and maybe we can take a friendly look at how that was organized)

@andygrove
Copy link
Member

We already have a crate with Spark-compatible expressions maintained as part of the Comet subproject.

https://docs.rs/datafusion-comet-spark-expr/0.5.0/datafusion_comet_spark_expr/

It does not depend on Spark, JVM, or Comet—it is just a library of Spark-compatible DataFusion expressions.

We deliberately maintain them in one crate so that other DataFusion-based projects can use them.

@andygrove
Copy link
Member

The crate supports more than 100 expressions so far, most of which are listed here:

https://datafusion.apache.org/comet/user-guide/expressions.html

@shehabgamin
Copy link
Contributor

I love the idea of collaborating on Spark compatible UDFs.

As of writing, 243/402 Spark functions doc-tests pass on Sail. We haven't focused on performance yet and instead have been focusing on just knocking all of them out because there are so many of them. Our implementations can be found:

I will say that we have encountered numerous problems relying on downstream DataFusion-based crates, to the extent that we have removed all of them as dependencies (and included appropriate [Credit] comments in various places to acknowledge the original sources). The issue isn't with the crates themselves but arises when it's time to upgrade DataFusion versions, requiring us to wait for each crate to update and release a new version.

We haven't done as good a job as @andygrove and the Comet folks with documenting what we do and don't support (we're currently a small team of just two people). However, we do have test reports for every pull request to track coverage.

Sail has also put lots of efforts in supporting PySpark’s 10+ APIs for Python UDFs/UDAFs/UDWFs/UDTFs. We support all the PySpark UDF types except one (the experimental applyInPandasWithState() method of pyspark.sql.GroupedData):

@alamb
Copy link
Contributor

alamb commented Jan 30, 2025

I love the idea of collaborating on Spark compatible UDFs.

As of writing, 243/402 Spark functions doc-tests pass on Sail. We haven't focused on performance yet and instead have been focusing on just knocking all of them out because there are so many of them. Our implementations can be found:

Nice!

I will say that we have encountered numerous problems relying on downstream DataFusion-based crates,...The issue isn't with the crates themselves but arises when it's time to upgrade DataFusion versions, requiring us to wait for each crate to update and release a new version.

Indeed -- I think the key challenge is mustering enough maintainer bandwidth across the crates to keep them up to date / updated quickly.

We haven't done as good a job as @andygrove and the Comet folks with documenting what we do and don't support (we're currently a small team of just two people). However, we do have test reports for every pull request to track coverage.

It sounds prretty amazing. There is a tradeoff between moving fast in the short term and building up capacity in the longer term to maintain it

@alamb
Copy link
Contributor

alamb commented Jan 30, 2025

@andygrove (and maybe @Omega359 ) given the importance of spark and the fact that comet already has spark compatible expressions what would you say about moving those expressions into the datafusion core repo?

That would allow us to apply some community effort and share across projects that wanted spark based expressions.

That would look like moving this code
https://github.com/apache/datafusion-comet/tree/main/native/spark-expr

Into the main datafusion repo

@shehabgamin if we did that, would you be willing to help implement / upstream some of your implementations and tests?

@andygrove
Copy link
Member

I almost started a conversation about this but held back. Moving this crate upstream has a lot of value, and I support doing so.

However, assuming that most DataFusion contributors would be unhappy about running Apache Spark as part of the test suite (I think this is a safe assumption), we need a testing plan.

In Comet, we rely on integration tests that run Spark with and without Comet enabled and compare results. We also run a subset of Spark's test suite with Comet enabled.

If we move the spark-expr crate into the main DataFusion repository, we will have to rely more on unit testing to demonstrate correct behavior. This is not a bad thing, of course, but it makes reviews much harder. There is an increased risk of PRs being accepted that are not fully compatible with Spark, and we won't find out in Comet until we upgrade to the next DataFusion version and run the Spark tests.

@shehabgamin
Copy link
Contributor

@shehabgamin if we did that, would you be willing to help implement / upstream some of your implementations and tests?

Yes!

@alamb @andygrove I will give a more thoughtful response to the rest of the discussion after I exercise and eat dinner.

@kazuyukitanimura
Copy link
Contributor

Echoing @andygrove 's point. Also if we move the spark-expr to DataFusion core, release management might get harder. E.g. we may want to fix spark-expr bugs quickly, but after moving the spark-expr to DataFusion, we first need to release DataFusion before releasing in Comet (we can use github branch/hashid potentially, but still not straight forward even though the code gets streamlined)

@shehabgamin
Copy link
Contributor

I will say that we have encountered numerous problems relying on downstream DataFusion-based crates,...The issue isn't with the crates themselves but arises when it's time to upgrade DataFusion versions, requiring us to wait for each crate to update and release a new version.

Indeed -- I think the key challenge is mustering enough maintainer bandwidth across the crates to keep them up to date / updated quickly.

Thinking through this further, the release cycle bottleneck I mentioned earlier wouldn’t apply here. With @linhr and me included as repo owners, version updates wouldn't face delays. Comet and Sail would naturally keep the codebase synchronized with each DataFusion release since we would have to handle these updates during our pre-release testing process anyway.

With that in mind, a joint effort on something in the main DataFusion repo or a datafusion-contrib repo could both work, and we are open to either option.

However, assuming that most DataFusion contributors would be unhappy about running Apache Spark as part of the test suite (I think this is a safe assumption), we need a testing plan.

Agreed, we should not bring in Spark as a hard dependency of the DataFusion core test suite.

Because there is no JVM involvement when running a Sail server, it would be a relatively straightforward process to port function tests from Sail upstream. Functions are tested in three different scopes:

  1. Gold Data Tests
    Gold Data tests are SQL queries; functions get tested using Scalar input. We should be able to port over the vast majority of Gold Data tests for functions upstream to the SLTs.
    The only caveat here is that Sail does not depend on DataFusion’s SqlToRel. We have to roll out our own implementation since Spark’s SQL semantics (Hive dialect) are different from DataFusion’s SQL semantics.
    The implication of this is that some Gold Data tests can’t be ported upstream. However, I expect that the number of tests we can’t port over will be very small. The vast majority of Gold Data tests specifically for functions are very basic queries (e.g., SELECT ascii(2)).

  2. Spark Functions Doc Tests
    These tests use the PySpark client (no JVM is required), and functions generally get tested using Array input (e.g., by calling a function with some DataFrame column as input). We should be able to translate almost all of these tests (if not all) to SQL.

  3. Spark Connect Functions Tests
    Thankfully, these tests are in the minority because they are the most impractical to port over. The JVM is required because results are being compared to the output from a Spark server.

In Comet, we rely on integration tests that run Spark with and without Comet enabled and compare results. We also run a subset of Spark's test suite with Comet enabled.

If we move the spark-expr crate into the main DataFusion repository, we will have to rely more on unit testing to demonstrate correct behavior. This is not a bad thing, of course, but it makes reviews much harder. There is an increased risk of PRs being accepted that are not fully compatible with Spark, and we won't find out in Comet until we upgrade to the next DataFusion version and run the Spark tests.

Echoing @andygrove 's point. Also if we move the spark-expr to DataFusion core, release management might get harder. E.g. we may want to fix spark-expr bugs quickly, but after moving the spark-expr to DataFusion, we first need to release DataFusion before releasing in Comet (we can use github branch/hashid potentially, but still not straight forward even though the code gets streamlined)

These are strong points, and they highlight why maintaining a joint effort in a standalone datafusion-contrib repo could be the best approach. A standalone repo would give us more flexibility in the test setup.

@alamb @andygrove @kazuyukitanimura

@Omega359
Copy link
Contributor

Omega359 commented Jan 31, 2025

I'm of the opinion that while I could see the benefit of spark udfs in datafusion I really think they would be best handled as a datafusion-contrib. That is mostly for 3 reasons:

  1. Most df users would never need them
  2. It's more maintenance, and testing them is non-trivial (I spin up docker images of spark for my testing but it's single node - not clustered). It's generally slow, especially when compared to rust tests and sqllogictests.
  3. Lastly, while spark is a use case via comet, sail, etc for datafusion it's not the only one where a custom set of udf's might be useful. I'm not sure we want to say yes to spark but no to other udf suites.

I personally think it would be awesome to have all the udf suites within a common repo where you could feature include just the suite you wanted and then either bulk add them to a context or pick and choose.

@andygrove
Copy link
Member

With that in mind, a joint effort on something in the main DataFusion repo or a datafusion-contrib repo could both work, and we are open to either option.

I am -1 on moving any Comet code to datafusion-contrib. Apache governance is important to my employer and I would not be able to contribute to any datafusion-contrib repo. Of course, others are welcome to go create something there.

@andygrove
Copy link
Member

3. I'm not sure we want to say yes to spark but no to other udf suites.

This is a valid point also.

@andygrove
Copy link
Member

It seems to me that we already have an Apache DataFusion project that provides Spark-compatible DataFusion expressions (Comet).

I think @shehabgamin's main concern is that Comet releases tend to lag behind DataFusion releases. This is avoidable if we either keep the main branch using the latest main from DataFusion or pin it to a revision and update that revision regularly (perhaps weekly) so that we catch breaking changes early and don't fall far behind. We could aim to release within a few days of each DataFusion release.

I would love to see more contributors (and eventually committers) on the Comet project to help us move faster with releases.

@Blizzara
Copy link
Contributor

I (personally and on behalf of my employer) am very much +1 for having Spark-compatible expressions. We currently use a mix of DF expressions, Comet's stuff, wrappers around DF's expressions, and custom-written ones. Since we originally only support a subset of Spark, we can sometimes take shortcuts in our own impls, but otherwise it'd be nice to share the work.

I don't have strong opinions on where they should live. So far the keeping of DF and Comet in sync has mostly been fine, but sometimes when DF breaks the API it can take a while before Comet catches up and during that time we're unable to upgrade either.

One challenge currently with Comet's expression is that since Comet operates on the physical plan level, many of the expressions have been written as implementing PhysicalExpr rather than eg. ScalarUDFImpl. That's not a deal-breaker, but a small annoyance in using them elsewhere :)

@andygrove
Copy link
Member

One challenge currently with Comet's expression is that since Comet operates on the physical plan level, many of the expressions have been written as implementing PhysicalExpr rather than eg. ScalarUDFImpl. That's not a deal-breaker, but a small annoyance in using them elsewhere :)

This is mostly for historical reasons. It would be great to fix this.

@andygrove
Copy link
Member

I don't have strong opinions on where they should live. So far the keeping of DF and Comet in sync has mostly been fine, but sometimes when DF breaks the API it can take a while before Comet catches up and during that time we're unable to upgrade either.

This has been painful. Almost every release of DataFuson/Arrow seems to have breaking changes for Comet. I would love to see a stabilized API for projects like Comet that are using DataFusion's physical plan.

Again, having more maintainers would help here.

@andygrove
Copy link
Member

andygrove commented Jan 31, 2025

Another point worth mentioning. Previously, the Comet release schedule was being driven by progress on performance, but now that we have "ok" performance (2x on TPC-H) and based on the discussion in this issue, we've decided to switch to following the DataFusion release cadence.

@Omega359
Copy link
Contributor

With that in mind, a joint effort on something in the main DataFusion repo or a datafusion-contrib repo could both work, and we are open to either option.

I am -1 on moving any Comet code to datafusion-contrib. Apache governance is important to my employer and I would not be able to contribute to any datafusion-contrib repo. Of course, others are welcome to go create something there.

Perhaps as an alternative we could setup a datafusion-udfs (pick an appropriate name) under the apache umbrella and managed by datafusion pmc's where this could live? Just a thought.

@comphead
Copy link
Contributor Author

With that in mind, a joint effort on something in the main DataFusion repo or a datafusion-contrib repo could both work, and we are open to either option.

I am -1 on moving any Comet code to datafusion-contrib. Apache governance is important to my employer and I would not be able to contribute to any datafusion-contrib repo. Of course, others are welcome to go create something there.

Perhaps as an alternative we could setup a datafusion-udfs (pick an appropriate name) under the apache umbrella and managed by datafusion pmc's where this could live? Just a thought.

Agree, this idea was floating for some time. To separate a core + some set of common builtin functions and having extension UDF crates per each specific use case (Spark, etc). It might be interesting challenge if someone wants to create a cross db use case and import the same function from datafusion-spark-udfs-crate and datafusion-pg-udfs-crate how to differentiate them in SQL stmt for cross system read but this is a more complicated scenario, may be not even realistic

@alamb
Copy link
Contributor

alamb commented Jan 31, 2025

Perhaps as an alternative we could setup a datafusion-udfs (pick an appropriate name) under the apache umbrella and managed by datafusion pmc's where this could live? Just a thought.

We can definitely do this with minimal organization overhead (aka create https://github.com/apache/datafusion-functions-spark for example)

However then there would be an additional release that would need to happen and the release overhead (3 day voting, etc) is pretty high

I'm of the opinion that while I could see the benefit of spark udfs in datafusion I really think they would be best handled as a datafusion-contrib. That is mostly for 3 reasons:

I personally think Spark functions satisfy my criteria for inclusion in the main datafusion repo:

  1. are used by many other systems (not just Comet, for example)
  2. have enough interest level we we would have the bandwidth to maintain them
  3. Would be an excellent advertisement for DataFusion's extensibility

Having Spark compatible functions in the main repo would also help relieve the tension of postgres vs Spark semantics in functions

Proposal (looking for 🧱 -bats)

  1. Make a datafusion-functions-spark crate in datafusion/src/functions-spark
  2. Do not add a dependency on datafusion crate (datafusion/core)

I think we could also do some neat things like allow datafusion-cli to run in either mode, such as

$ datafusion-cli 
Running in Postges mode
> 
$ datafusion-cli --spark
Running in Spark mode
> 

@alamb
Copy link
Contributor

alamb commented Jan 31, 2025

In terms of testing, I think some combination of sqllogictest / gold data style tests and maybe even real-spark runs in the extended.yml suite (see XXX that @Omega359 made recently) could minimize CI overhead but still make sure the functions worked in spark

@kazuyukitanimura
Copy link
Contributor

+1 for including spark functions in DataFusion main

Do not add a dependency on datafusion crate (datafusion/core)

@alamb just making sure if I understand, CI will still run and check datafusion/src/functions-spark even it is not a dependency? For example, testing interface changes that affects both datafusion/src/functions-spark and datafusion/core
Sounds like so based on your follow up comment

In terms of testing, I think some combination of sqllogictest / gold data style tests and maybe even real-spark runs in the extended.yml suite (see XXX that @Omega359 made recently) could minimize CI overhead but still make sure the functions worked in spark

@andygrove andygrove linked a pull request Jan 31, 2025 that will close this issue
4 tasks
@andygrove
Copy link
Member

andygrove commented Jan 31, 2025

I created a draft PR #14392

@findepi
Copy link
Member

findepi commented Jan 31, 2025

I feel positive in principle about creating a crate as a home for Spark-compatible functions, especially if it gonna be maintained ("not dead project") and released together with core DF. My assumption is this is a totally optional feature for downstream projects (core dependencies do not change).

I didn't read the discussion above yet (just responding to Andy's call for opinions from mailing list). So it might there are some arguments that I should be aware of.

@andygrove
Copy link
Member

My assumption is this is a totally optional feature for downstream projects (core dependencies do not change).

Correct. It is part of the workspace, but no other DataFusion crates depend on it.

Perhaps there is an argument for datafusion-cli having an optional dependency on it in the future.

@alamb
Copy link
Contributor

alamb commented Jan 31, 2025

@alamb just making sure if I understand, CI will still run and check datafusion/src/functions-spark even it is not a dependency?

Yes that is my assumption

For example, testing interface changes that affects both datafusion/src/functions-spark and datafusion/core

Yes that is what I was thinking. Exactly what those tests would be I am not 100% sure (e.g. there probably needs to be some way to test them without actually running spark

My assumption is this is a totally optional feature for downstream projects (core dependencies do not change).

Correct. It is part of the workspace, but no other DataFusion crates depend on it.

Perhaps there is an argument for datafusion-cli having an optional dependency on it in the future.

100%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants