Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio timeout doesn't wake up when streaming while another tokio thread is running #3489

Open
SeseMueller opened this issue Oct 24, 2024 · 1 comment

Comments

@SeseMueller
Copy link

SeseMueller commented Oct 24, 2024

Expected Behavior

The tokio timeout and sleep should work and trigger after the supplied time.

Current Behavior

They are "blocked" by another tokio thread executing and do not trigger before that thread is done.

Steps to Reproduce (for bugs)

I created two examples; one where the problem occurs when using actix_web, and one where it doesn't occur when using tokio on very similar code. See Context for explanation.

Tokio example: working code

use futures_util::{Stream, StreamExt};
use std::convert::Infallible;
use std::{process::Command, time::Duration};
use tokio::pin;
use tokio::sync::mpsc::Receiver;

use futures_util::stream::unfold;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

#[tokio::main]
async fn main() {
    let stream = tokio::spawn(serve()).await;
    let stream = stream.unwrap();
    pin!(stream);
    loop {
        let item = stream.next().await;
        dbg!(item);
    }
}
// Whether this is async or not doesn't make a difference; just to make them more similar
async fn serve() -> impl Stream<Item = Result<String, Infallible>> {
    let c = |state: Option<Receiver<String>>| async move {
        let mut rx = if state.is_none() {
            let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not
                                                            // initialized yet
            println!("created channel");
            tokio::spawn(do_work(tx));
            println!("spawned work");
            rx
        } else {
            state.unwrap()
        };
        // vvv Problem line: timeout doesn't happen; the command is instead being waited on
        let t = timeout(Duration::from_secs(1), rx.recv()).await;
        // Tokio select! and Tokio sleep also don't work here. 
        dbg!(&t);
        if t == Ok(None) {
            // Stop the stream if the command ended
            return None;
        }
        Some((
            // Send a message on timeout
            Ok::<String, Infallible>("Stream fragment\n".to_owned()),
            Some(rx),
        ))
    };
    let stream = unfold(None, c); // Construct stream from closure
    stream
}

async fn do_work(tx: Sender<String>) {
    println!("Running Command");
    let _ = Command::new("sh").arg("-c").arg("sleep 10").output();
    println!("Command finished!");
    if let Err(e) = tx.send("".to_owned()).await {
        eprintln!("Sending errored! {:?}", e);
    }
}
// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second; followed by a panic! because the stream was polled after it ended
// Actual output: as expected

Actix_web example: Has the problem

use actix_web::body::MessageBody;
use actix_web::web::Bytes;
use actix_web::{services, web, HttpResponse, Responder};
use actix_web::{App, HttpServer};
use std::convert::Infallible;
use std::{process::Command, time::Duration};
use tokio::sync::mpsc::Receiver;

use futures_util::stream::unfold;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        let services = services![web::scope("/a").route("/b", web::get().to(serve))]; // Available
                                                                                      // at localhost:8080/a/b
        App::new().service(services)
    })
    .bind(("localhost", 8080))
    .unwrap()
    .run()
    .await
}
async fn serve() -> impl Responder {
    let c = |state: Option<Receiver<String>>| async move {
        let mut rx = if state.is_none() {
            let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not
                                                            // initialized yet
            println!("created channel");
            //tokio::spawn(do_work(tx));
            actix_web::rt::spawn(do_work(tx)); // Both have the same problem
            println!("spawned work");
            rx
        } else {
            state.unwrap()
        };
        // vvv Problem line: timeout doesn't happen; the command is instead being waited on
        let t = timeout(Duration::from_secs(1), rx.recv()).await;
        // Tokio select! and Tokio sleep also don't work here. 
        dbg!(&t);
        if t == Ok(None) {
            // Stop the stream if the command ended
            return None;
        }
        Some((
            // Send a message on timeout
            Ok::<Bytes, Infallible>("Stream fragment\n".to_owned().try_into_bytes().unwrap()),
            Some(rx),
        ))
    };
    let stream = unfold(None, c); // Construct stream from closure
    HttpResponse::Ok().streaming(stream) // Stream out the answer
}
async fn do_work(tx: Sender<String>) {
    println!("Running Command");
    let _ = Command::new("sh").arg("-c").arg("sleep 10").output();
    println!("Command finished!");
    if let Err(e) = tx.send("".to_owned()).await {
        eprintln!("Sending errored! {:?}", e);
    }
}
// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second
// Actual output: a single line of "Stream Fragment" is output at the very end.

Edit: Using async_process::Command and then awaiting instead of using std::process::Command does solve this particular case.

Context

I am writing a REST API in actix_web that is heavily relying on streams to the client.

While I was adding a heartbeat so that the connection to the client wouldn't time out and the client could be sure that the server was still alive, I came across the issue that some tokio functionality like sleep, select! (with sleep) and timeout don't properly work.

I managed to reduce it to the two examples above.

In both cases, a channel is spawned inside an async closure, which sender is given to a new tokio task that takes long to finish.
If the Receiver is then awaited using a tokio timeout (or a tokio sleep is called), the timeout (or sleep) doesn't trigger after the given time, but only after the Receiver is available.

In the context I used it, the async closure was then unfolded into a stream, which is then served to a client.
This causes the stream to become "stuck" and not send a heartbeat, until the Receiver can recieve.

In the tokio example, the stream is instead given to the main function, pinned, and iterated through until completion. Here, the stream does not become "stuck" and instead sends a heartbeat once every second, as expected.

(I put this issue on actix-web because it seems to happen because of the way streaming is handeled and because the tokio example works)

Your Environment

The given examples work on fresh install.

  • Rust Version (I.e, output of rustc -V): rustc 1.81.0 (eeb90cda1 2024-09-04)
  • Actix Web Version: 4.9.0
@SeseMueller
Copy link
Author

SeseMueller commented Oct 28, 2024

After doing some digging, this seems to be caused by rust-lang/futures-rs#2775.
See their comment for a good explanation of why it's happening.

The issue is also known at tokio-rs/tokio#2542, which came up with a similar example as I did.

For me, this means that I can fix my problem by moving from std::process::Command to async_process::Command, because that is await-ed, but the general issue still persists.

(A similar problem was discussed:bytecodealliance/wasmtime#2876, but it's unlikely that the cause is the same as I did not find any evidence of mixed executors.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant