Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async User Defined Functions (UDF) #6518

Open
Tracked by #7013
marshauf opened this issue Jun 1, 2023 · 20 comments · May be fixed by #14837
Open
Tracked by #7013

Async User Defined Functions (UDF) #6518

marshauf opened this issue Jun 1, 2023 · 20 comments · May be fixed by #14837
Labels
enhancement New feature or request

Comments

@marshauf
Copy link

marshauf commented Jun 1, 2023

Is your feature request related to a problem or challenge?

I would like to use async code in an UDF. I couldn't find an example or API documentation on how to do that. It would be nice if it would be possible/documented.

Describe the solution you'd like

datafusion::physical_plan::functions::make_scalar_function() accepts functions which return a Future.

Describe alternatives you've considered

Creating another tokio runtime and offloading the async function onto it.
The main runtime waits in the UDF till async function is done with execution.

Additional context

No response

@marshauf marshauf added the enhancement New feature or request label Jun 1, 2023
@alamb
Copy link
Contributor

alamb commented Jun 1, 2023

I agree there is currently no good way to make a scalar function async.

You could potentially use a table provider and write to your function like INSERT INTO your_table SELECT ... 🤔

That might not work for your usecase however

@alamb alamb changed the title Async UDF Async User Defined Functions (UDF) Jun 1, 2023
@marshauf
Copy link
Author

marshauf commented Jun 1, 2023

My idea was to do something likes this:

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

It would probably work with your example.

CREATE EXTERNAL TABLE your_table STORED AS CSV WITH HEADER ROW LOCATION 'localhost:3000';
INSERT INTO your_table SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter);
SELECT * FROM your_table;

And on insert call endpoint and store returned value. Seems cumbersome to use and implement.

@marshauf
Copy link
Author

@alamb wouldn't it be possible to turn the SQL statement

SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter))

into a LogicalPlan. Rewrite the Expr::ScalarUDF to a Expr::SubQuery with an LogicalPlan::Extension.
The Extension would point to a custom ExecutionPlan, in which I could run async code.
Similar to what is done in datafusion/core/tests/user_defined_plan.rs.

@alamb
Copy link
Contributor

alamb commented Jun 15, 2023

@alamb wouldn't it be possible to turn the SQL statement

Yes, that sounds like it would work (I am sorry I didn't suggest that)

If you get it to work, I think it would be a great example to include in DataFusion to show both the power of the existing extension APIs and custom table functions)

@marshauf
Copy link
Author

I got an example working which replaces an UDF with a user defined Extension.
The extension processes RecordBatches in an async function.

It works great with VALUES as inputs but not with a csv file.

I will clean it up and create a PullRequest. I hope you can help me figure out why inputs from VALUES is processed different to a csv files.

@alamb
Copy link
Contributor

alamb commented Oct 25, 2023

I filed #7926 to track user defined table functions

@edmondop
Copy link
Contributor

edmondop commented Aug 1, 2024

@alamb this is very useful for me now but it is blocking, any chance I can resume the existing work?

@alamb
Copy link
Contributor

alamb commented Aug 1, 2024

Sure -- sounds good to me @edmondop

What I would personally suggest doing is make an example showing what you are trying to do -- and then with that example modify the DataFusion APIs acoordinatly. That way you'll both have the API changes needed as well as an example of what you were trying to do that serves as documentation

@alamb
Copy link
Contributor

alamb commented Jan 11, 2025

I have been working with @goldmedal on a POC of this, and I am quite pleased with how it has come out. See:

Basically, the user just has to implement the following trait :bowtie:

 /// A scalar UDF that can invoke using async methods
///
/// Note this is less efficient than the ScalarUDFImpl, but it can be used
/// to register remote functions in the context.
///
/// The name is chosen to  mirror ScalarUDFImpl
#[async_trait]
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
    /// the function cast as any
    fn as_any(&self) -> &dyn Any;

    /// The name of the function
    fn name(&self) -> &str;

    /// The signature of the function
    fn signature(&self) -> &Signature;

    /// The return type of the function
    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;

    /// Invoke the function asynchronously with the async arguments
    async fn invoke_async(&self, args: &RecordBatch) -> Result<ArrayRef>;
}

@edmondop
Copy link
Contributor

This is amazing! And extremely useful (I have an idea of invoking flight endpoints from a udf !)

Does the plannning require to be adapted ?

@alamb
Copy link
Contributor

alamb commented Jan 11, 2025

Does the plannning require to be adapted ?

Not sure what you mean here @edmondop -- the implementation in goldmedal/datafusion-llm-function#1 does need a custom physical optimizer pass, but works with an unmodified DataFusion 44.0.0

@edmondop
Copy link
Contributor

I read the code in the PR it totally makes sense: there is an optimizer that intercepts async udf and create a separate physical node for their execution.

In the past I brainstormed about the implications of adding asynchronous UDFs on aspects such as RecordBatch size, etc (i.e. one could send a larger RecordBatch into the UDF that will emit smaller RecordBatches due to latency of the remote system, for example).

Will move the conversation to that repo maybe

@milenkovicm
Copy link
Contributor

let sql = r#"
CREATE FUNCTION an_llm_function(STRING)
RETURNS STRING
LANGUAGE MODEL
AS 'microsoft/phi-4'
"#;

ctx.sql(sql).await?.show().await?;

greatest way to disrespect all those tears and sweat put into getting datafusion to be as performant as it is 😀

joke aside good job @goldmedal & @alamb!
do you plan getting it merged in datafusion or df contrib ?

@alamb
Copy link
Contributor

alamb commented Jan 12, 2025

BTW @Omega359 pointed out in Discord that there is something seemingly similar looking in Arroyo

https://github.com/ArroyoSystems/arroyo/blob/4014db4824d52d535638da42958fbaf9961866e8/crates/arroyo-udf/arroyo-udf-plugin/src/async_udf.rs

Looks like it came in via this PR from @mwylde and @jacksonrnewhouse

@alamb
Copy link
Contributor

alamb commented Jan 12, 2025

let sql = r#"
CREATE FUNCTION an_llm_function(STRING)
RETURNS STRING
LANGUAGE MODEL
AS 'microsoft/phi-4'
"#;

ctx.sql(sql).await?.show().await?;

greatest way to disrespect all those tears and sweat put into getting datafusion to be as performant as it is 😀

LOL, though I think the idea of combining the fast local excution to prep the data to send could still be compelling.

Something crazy like this to have an LLM summarize bad comments 🤔

SELECT llm_summarize(array_agg(comments)), company_id 
GROUP BY company_id
WHERE company_id IN (1,2,3) and comments ILIKE '%not good%'

joke aside good job @goldmedal & @alamb! do you plan getting it merged in datafusion or df contrib ?

I hadn't thought this far -- I think it would definitely make sense to be in datafusion-contrib. Maybe depending on how much interest there is more broadly we could also potentially put it into datafusion.

However, the fact that you can implement async functions via the existing extension mechanisms is pretty neat -- the benefit of putting it into the core seems like it would make it easier to integrate

@milenkovicm
Copy link
Contributor

It definitely expands reach of datafusion.

Databricks have a whole set of AI functions https://docs.databricks.com/en/large-language-models/ai-functions.html

With this change we can make datafusion-contrib-ai 😀

@adriangb
Copy link
Contributor

I just came across this use case today and am very interested, it would be amazing if DataFusion just had #6518 (comment) built in.

My use case is essentially storing a link to an asset in a column and materializing that so that I can search it e.g. select * from chat_history where unblob(prompt) like 'You are an expert in%'. I don't want to store prompt inline with the rest of the data for several reasons the only easy one of which to explain is that I don't want to push the data around when I do compaction and other optimization of files.

@alamb
Copy link
Contributor

alamb commented Jan 16, 2025

I just came across this use case today and am very interested, it would be amazing if DataFusion just had #6518 (comment) built in.

Sounds good to me -- I am not sure I have time over the next few weeks to push this along but I woud be happy to do code reviews, etc

I think the biggest piece of work needed a if we ported goldmedal/datafusion-llm-function#1 to the core would be tests and support for other plan nodes (goldmedal/datafusion-llm-function#1 only does ProjectionExec, but supporting others I think is more mechanical)

@goldmedal
Copy link
Contributor

I think the biggest piece of work needed a if we ported goldmedal/datafusion-llm-function#1 to the core would be tests and support for other plan nodes (goldmedal/datafusion-llm-function#1 only does ProjectionExec, but supporting others I think is more mechanical)

I'd like to port the implementation to the core, possibly by first implementing the trait for async UDFs and the corresponding planning behavior. As for the LLM functions, I think they would be better suited for a separate datafusion-contrib repo.

I'm also considering implementing this at the logical plan level. Some thoughts here: goldmedal/datafusion-llm-function#6. Feel free to share your opinion.

@goldmedal goldmedal linked a pull request Feb 23, 2025 that will close this issue
6 tasks
@goldmedal
Copy link
Contributor

I have created a draft PR for this issue.

It still has some remaining work, but feel free to share your opinion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
6 participants