Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: RecordBatchReceiverStreamBuilder::spawn_blocking does not abort threads #9152

Open
tv42 opened this issue Feb 7, 2024 · 5 comments · May be fixed by #14995
Open

doc: RecordBatchReceiverStreamBuilder::spawn_blocking does not abort threads #9152

tv42 opened this issue Feb 7, 2024 · 5 comments · May be fixed by #14995
Labels
documentation Improvements or additions to documentation good first issue Good for newcomers

Comments

@tv42
Copy link
Contributor

tv42 commented Feb 7, 2024

Describe the bug

https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchReceiverStreamBuilder.html#method.spawn_blocking docs say

Spawn a blocking task that will be aborted if this builder (or the stream built from it) are dropped

There's no aborting happening, and I don't think one can do that safely (no abort in https://doc.rust-lang.org/std/thread/struct.Thread.html).

What actually happens is that tx.blocking_send and friends start to give errors, and the caller-provided closure is responsible for returning when that happens.

It's probably worthwhile checking whether this is also true for the async variant, or whether it actually aborts the tokio task at any await point.

https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.blocking_send

To Reproduce

use datafusion::arrow::array::{ArrayRef, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;

fn make_dummy_record_batch() -> Result<RecordBatch, DataFusionError> {
    let a: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2]));
    let record_batch = RecordBatch::try_from_iter(vec![("a", a)])
        .map_err(|error| DataFusionError::External(Box::new(error)))?;
    Ok(record_batch)
}

#[tokio::main]
async fn main() {
    let schema = {
        let mut builder = SchemaBuilder::new();
        builder.push(Field::new("a", DataType::UInt64, false));
        builder.finish()
    };
    let mut stream = {
        let mut builder = RecordBatchReceiverStreamBuilder::new(Arc::new(schema), 1);
        {
            let tx = builder.tx();
            builder.spawn_blocking(move || loop {
                let record_batch = make_dummy_record_batch()?;
                match tx.blocking_send(Ok(record_batch)) {
                    Ok(()) => (),
                    Err(_sent) => {
                        // If we ignore this error, nothing "aborts" us.
                        eprintln!("ignoring send error for demonstration purposes");
                    }
                };
                println!("tick");
                std::thread::sleep(Duration::from_millis(300));
            });
        }
        builder.build()
    };

    for i in 0..3 {
        let record_batch = stream.next().await.unwrap().unwrap();
        println!("Record Batch #{i}");
        print_batches(&[record_batch]).unwrap();
    }

    // Docs claim this will abort the producer.
    drop(stream);

    // If it were aborted, we should stop seeing "tick" messages any moment now!
    tokio::time::sleep(Duration::new(5, 0)).await;
    println!("is it still going? bailing out.");
    std::process::exit(0);
}

Expected behavior

The behavior makes sense, but should be documented.

Additional context

No response

@tv42 tv42 added the bug Something isn't working label Feb 7, 2024
@alamb
Copy link
Contributor

alamb commented Feb 8, 2024

It seems to me that a blocking task is actually aborted / cancelled on drop, but once a blocking task starts it can't be interrupted.

So for example, if the task hasn't started yet it will never start. However, once the blocking task starts running, it will run to completion as it never yields control back to tokio (via await)

@alamb alamb added good first issue Good for newcomers documentation Improvements or additions to documentation and removed bug Something isn't working labels Feb 8, 2024
@alamb
Copy link
Contributor

alamb commented Feb 8, 2024

I think we can summarize the discussion on this ticket into the doc comments and that would make a good first issue

@zhenglin-charlie-li
Copy link
Contributor

zhenglin-charlie-li commented Feb 15, 2024

Hi @alamb, I'm a newcomer to this project and interested in addressing this specific issue as my starting point.

May I seek clarification this issue as a task involving refining the documentation to accurately describe the current behavior of the code?

@shruti2522
Copy link

Hey @alamb, since the issue I have been working on is on hold for now, I would like to look into this in the meantime.

@shruti2522
Copy link

take

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants