-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async User Defined Functions (UDF) #6518
Comments
I agree there is currently no good way to make a scalar function async. You could potentially use a table provider and write to your function like That might not work for your usecase however |
My idea was to do something likes this: SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter)) It would probably work with your example. CREATE EXTERNAL TABLE your_table STORED AS CSV WITH HEADER ROW LOCATION 'localhost:3000';
INSERT INTO your_table SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter);
SELECT * FROM your_table; And on insert call endpoint and store returned value. Seems cumbersome to use and implement. |
@alamb wouldn't it be possible to turn the SQL statement SELECT call('localhost:3000', num, letter FROM (SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter)) into a LogicalPlan. Rewrite the Expr::ScalarUDF to a Expr::SubQuery with an LogicalPlan::Extension. |
Yes, that sounds like it would work (I am sorry I didn't suggest that) If you get it to work, I think it would be a great example to include in DataFusion to show both the power of the existing extension APIs and custom table functions) |
I got an example working which replaces an UDF with a user defined Extension. It works great with VALUES as inputs but not with a csv file. I will clean it up and create a PullRequest. I hope you can help me figure out why inputs from VALUES is processed different to a csv files. |
I filed #7926 to track user defined table functions |
@alamb this is very useful for me now but it is blocking, any chance I can resume the existing work? |
Sure -- sounds good to me @edmondop What I would personally suggest doing is make an example showing what you are trying to do -- and then with that example modify the DataFusion APIs acoordinatly. That way you'll both have the API changes needed as well as an example of what you were trying to do that serves as documentation |
I have been working with @goldmedal on a POC of this, and I am quite pleased with how it has come out. See: Basically, the user just has to implement the following trait /// A scalar UDF that can invoke using async methods
///
/// Note this is less efficient than the ScalarUDFImpl, but it can be used
/// to register remote functions in the context.
///
/// The name is chosen to mirror ScalarUDFImpl
#[async_trait]
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
/// the function cast as any
fn as_any(&self) -> &dyn Any;
/// The name of the function
fn name(&self) -> &str;
/// The signature of the function
fn signature(&self) -> &Signature;
/// The return type of the function
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;
/// Invoke the function asynchronously with the async arguments
async fn invoke_async(&self, args: &RecordBatch) -> Result<ArrayRef>;
} |
This is amazing! And extremely useful (I have an idea of invoking flight endpoints from a udf !) Does the plannning require to be adapted ? |
Not sure what you mean here @edmondop -- the implementation in goldmedal/datafusion-llm-function#1 does need a custom physical optimizer pass, but works with an unmodified DataFusion 44.0.0 |
I read the code in the PR it totally makes sense: there is an optimizer that intercepts async udf and create a separate physical node for their execution. In the past I brainstormed about the implications of adding asynchronous UDFs on aspects such as RecordBatch size, etc (i.e. one could send a larger RecordBatch into the UDF that will emit smaller RecordBatches due to latency of the remote system, for example). Will move the conversation to that repo maybe |
let sql = r#"
CREATE FUNCTION an_llm_function(STRING)
RETURNS STRING
LANGUAGE MODEL
AS 'microsoft/phi-4'
"#;
ctx.sql(sql).await?.show().await?; greatest way to disrespect all those tears and sweat put into getting datafusion to be as performant as it is 😀 joke aside good job @goldmedal & @alamb! |
BTW @Omega359 pointed out in Discord that there is something seemingly similar looking in Arroyo Looks like it came in via this PR from @mwylde and @jacksonrnewhouse |
LOL, though I think the idea of combining the fast local excution to prep the data to send could still be compelling. Something crazy like this to have an LLM summarize bad comments 🤔 SELECT llm_summarize(array_agg(comments)), company_id
GROUP BY company_id
WHERE company_id IN (1,2,3) and comments ILIKE '%not good%'
I hadn't thought this far -- I think it would definitely make sense to be in datafusion-contrib. Maybe depending on how much interest there is more broadly we could also potentially put it into datafusion. However, the fact that you can implement async functions via the existing extension mechanisms is pretty neat -- the benefit of putting it into the core seems like it would make it easier to integrate |
It definitely expands reach of datafusion. Databricks have a whole set of AI functions https://docs.databricks.com/en/large-language-models/ai-functions.html With this change we can make datafusion-contrib-ai 😀 |
I just came across this use case today and am very interested, it would be amazing if DataFusion just had #6518 (comment) built in. My use case is essentially storing a link to an asset in a column and materializing that so that I can search it e.g. |
Sounds good to me -- I am not sure I have time over the next few weeks to push this along but I woud be happy to do code reviews, etc I think the biggest piece of work needed a if we ported goldmedal/datafusion-llm-function#1 to the core would be tests and support for other plan nodes (goldmedal/datafusion-llm-function#1 only does ProjectionExec, but supporting others I think is more mechanical) |
I'd like to port the implementation to the core, possibly by first implementing the trait for async UDFs and the corresponding planning behavior. As for the LLM functions, I think they would be better suited for a separate datafusion-contrib repo. I'm also considering implementing this at the logical plan level. Some thoughts here: goldmedal/datafusion-llm-function#6. Feel free to share your opinion. |
I have created a draft PR for this issue. It still has some remaining work, but feel free to share your opinion. |
Is your feature request related to a problem or challenge?
I would like to use async code in an UDF. I couldn't find an example or API documentation on how to do that. It would be nice if it would be possible/documented.
Describe the solution you'd like
datafusion::physical_plan::functions::make_scalar_function() accepts functions which return a Future.
Describe alternatives you've considered
Creating another tokio runtime and offloading the async function onto it.
The main runtime waits in the UDF till async function is done with execution.
Additional context
No response
The text was updated successfully, but these errors were encountered: