Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDFs are no longer executed in a thread #7453

Closed
orf opened this issue Aug 30, 2023 · 2 comments
Closed

UDFs are no longer executed in a thread #7453

orf opened this issue Aug 30, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@orf
Copy link

orf commented Aug 30, 2023

Describe the bug

In Datafusion 28 and below, UDFs where executed in a separate thread when writing to parquet. The example code below does not fail the assertion in version 28 but does in version 30 and git main.

If you have a UDF that expects to be running in a thread, or does some form of blocking computation then this change means your previously parallel dataframe plan becomes serial.

I couldn't spot anything in the release notes about this.

To Reproduce

deps:

[dependencies]
tokio = { version = "^1.0", features = ["rt-multi-thread", "full"] }
datafusion = { version = "=30", default-features = false, features = ["encoding__expressions", "zstd"] }

code:

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
    use std::sync::Arc;
    use datafusion::arrow::array::{ArrayRef, BooleanArray};
    use datafusion::arrow::datatypes::DataType;
    use datafusion::logical_expr::Volatility;
    use datafusion::parquet::file::properties::WriterProperties;
    use datafusion::physical_plan::functions::make_scalar_function;
    use datafusion::prelude::*;

    let config = SessionConfig::default();
    let ctx = SessionContext::with_config(config);
    let df = ctx
        .read_parquet("input.parquet", ParquetReadOptions::default())
        .await
        .unwrap();

    let main_thread_id = std::thread::current().id();

    let func = create_udf(
        "some_func",
        vec![DataType::Binary],
        Arc::new(DataType::Boolean),
        Volatility::Immutable,
        make_scalar_function(move |args: &[ArrayRef]| {
            let func_thread_id = std::thread::current().id();
            assert_ne!(main_thread_id, func_thread_id);
            return Ok(Arc::new(BooleanArray::from(vec![true; args[0].len()])));
        }));

    let df = df
        .select(vec![col("hash"), func.call(vec![col("hash")])])
        .unwrap();

    let props = WriterProperties::builder()
        .build();
    df.write_parquet("some_dir/", Some(props)).await.unwrap();
}

Expected behavior

Sync UDF functions should be executed in a blocking thread pool.

Additional context

I thought this might be related to #7205, but it doesn't appear to be culprit.

@orf orf added the bug Something isn't working label Aug 30, 2023
@alamb
Copy link
Contributor

alamb commented Sep 7, 2023

hi @orf -- I am not sure that DataFusion guarantees that udfs will be running in a separate thread or that they can do blocking operations without stopping DataFusion

What is your blocking udf doing? Is it doing network operations or something where having async udfs (like in #6518 ) would help?

@alamb
Copy link
Contributor

alamb commented Jan 9, 2025

Given the age of this ticket I am going to close it as not a bug. please reopen if you disagree - otherwise let's continue the conversation on

@alamb alamb closed this as not planned Won't fix, can't repro, duplicate, stale Jan 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants