You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The tokio timeout and sleep should work and trigger after the supplied time.
Current Behavior
They are "blocked" by another tokio thread executing and do not trigger before that thread is done.
Steps to Reproduce (for bugs)
I created two examples; one where the problem occurs when using actix_web, and one where it doesn't occur when using tokio on very similar code. See Context for explanation.
Tokio example: working code
use futures_util::{Stream,StreamExt};use std::convert::Infallible;use std::{process::Command, time::Duration};use tokio::pin;use tokio::sync::mpsc::Receiver;use futures_util::stream::unfold;use tokio::sync::mpsc::Sender;use tokio::time::timeout;#[tokio::main]asyncfnmain(){let stream = tokio::spawn(serve()).await;let stream = stream.unwrap();pin!(stream);loop{let item = stream.next().await;dbg!(item);}}// Whether this is async or not doesn't make a difference; just to make them more similarasyncfnserve() -> implStream<Item = Result<String,Infallible>>{let c = |state:Option<Receiver<String>>| asyncmove{letmut rx = if state.is_none(){let(tx, rx) = tokio::sync::mpsc::channel(100);// Construct a new reciever if it's not// initialized yetprintln!("created channel");
tokio::spawn(do_work(tx));println!("spawned work");
rx
}else{
state.unwrap()};// vvv Problem line: timeout doesn't happen; the command is instead being waited onlet t = timeout(Duration::from_secs(1), rx.recv()).await;// Tokio select! and Tokio sleep also don't work here. dbg!(&t);if t == Ok(None){// Stop the stream if the command endedreturnNone;}Some((// Send a message on timeoutOk::<String,Infallible>("Stream fragment\n".to_owned()),Some(rx),))};let stream = unfold(None, c);// Construct stream from closure
stream
}asyncfndo_work(tx:Sender<String>){println!("Running Command");let _ = Command::new("sh").arg("-c").arg("sleep 10").output();println!("Command finished!");ifletErr(e) = tx.send("".to_owned()).await{eprintln!("Sending errored! {:?}", e);}}// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second; followed by a panic! because the stream was polled after it ended// Actual output: as expected
Actix_web example: Has the problem
use actix_web::body::MessageBody;use actix_web::web::Bytes;use actix_web::{services, web,HttpResponse,Responder};use actix_web::{App,HttpServer};use std::convert::Infallible;use std::{process::Command, time::Duration};use tokio::sync::mpsc::Receiver;use futures_util::stream::unfold;use tokio::sync::mpsc::Sender;use tokio::time::timeout;#[tokio::main]asyncfnmain() -> std::io::Result<()>{HttpServer::new(|| {let services = services![web::scope("/a").route("/b", web::get().to(serve))];// Available// at localhost:8080/a/bApp::new().service(services)}).bind(("localhost",8080)).unwrap().run().await}asyncfnserve() -> implResponder{let c = |state:Option<Receiver<String>>| asyncmove{letmut rx = if state.is_none(){let(tx, rx) = tokio::sync::mpsc::channel(100);// Construct a new reciever if it's not// initialized yetprintln!("created channel");//tokio::spawn(do_work(tx));
actix_web::rt::spawn(do_work(tx));// Both have the same problemprintln!("spawned work");
rx
}else{
state.unwrap()};// vvv Problem line: timeout doesn't happen; the command is instead being waited onlet t = timeout(Duration::from_secs(1), rx.recv()).await;// Tokio select! and Tokio sleep also don't work here. dbg!(&t);if t == Ok(None){// Stop the stream if the command endedreturnNone;}Some((// Send a message on timeoutOk::<Bytes,Infallible>("Stream fragment\n".to_owned().try_into_bytes().unwrap()),Some(rx),))};let stream = unfold(None, c);// Construct stream from closureHttpResponse::Ok().streaming(stream)// Stream out the answer}asyncfndo_work(tx:Sender<String>){println!("Running Command");let _ = Command::new("sh").arg("-c").arg("sleep 10").output();println!("Command finished!");ifletErr(e) = tx.send("".to_owned()).await{eprintln!("Sending errored! {:?}", e);}}// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second// Actual output: a single line of "Stream Fragment" is output at the very end.
Edit: Using async_process::Command and then awaiting instead of using std::process::Command does solve this particular case.
Context
I am writing a REST API in actix_web that is heavily relying on streams to the client.
While I was adding a heartbeat so that the connection to the client wouldn't time out and the client could be sure that the server was still alive, I came across the issue that some tokio functionality like sleep, select! (with sleep) and timeout don't properly work.
I managed to reduce it to the two examples above.
In both cases, a channel is spawned inside an async closure, which sender is given to a new tokio task that takes long to finish.
If the Receiver is then awaited using a tokio timeout (or a tokio sleep is called), the timeout (or sleep) doesn't trigger after the given time, but only after the Receiver is available.
In the context I used it, the async closure was then unfolded into a stream, which is then served to a client.
This causes the stream to become "stuck" and not send a heartbeat, until the Receiver can recieve.
In the tokio example, the stream is instead given to the main function, pinned, and iterated through until completion. Here, the stream does not become "stuck" and instead sends a heartbeat once every second, as expected.
(I put this issue on actix-web because it seems to happen because of the way streaming is handeled and because the tokio example works)
Your Environment
The given examples work on fresh install.
Rust Version (I.e, output of rustc -V): rustc 1.81.0 (eeb90cda1 2024-09-04)
Actix Web Version: 4.9.0
The text was updated successfully, but these errors were encountered:
After doing some digging, this seems to be caused by rust-lang/futures-rs#2775.
See their comment for a good explanation of why it's happening.
The issue is also known at tokio-rs/tokio#2542, which came up with a similar example as I did.
For me, this means that I can fix my problem by moving from std::process::Command to async_process::Command, because that is await-ed, but the general issue still persists.
(A similar problem was discussed:bytecodealliance/wasmtime#2876, but it's unlikely that the cause is the same as I did not find any evidence of mixed executors.)
Expected Behavior
The tokio
timeout
andsleep
should work and trigger after the supplied time.Current Behavior
They are "blocked" by another tokio thread executing and do not trigger before that thread is done.
Steps to Reproduce (for bugs)
I created two examples; one where the problem occurs when using actix_web, and one where it doesn't occur when using tokio on very similar code. See Context for explanation.
Tokio example: working code
Actix_web example: Has the problem
Edit: Using
async_process::Command
and then awaiting instead of usingstd::process::Command
does solve this particular case.Context
I am writing a REST API in actix_web that is heavily relying on streams to the client.
While I was adding a heartbeat so that the connection to the client wouldn't time out and the client could be sure that the server was still alive, I came across the issue that some tokio functionality like
sleep
,select!
(with sleep) andtimeout
don't properly work.I managed to reduce it to the two examples above.
In both cases, a channel is spawned inside an async closure, which sender is given to a new tokio task that takes long to finish.
If the Receiver is then awaited using a tokio timeout (or a tokio sleep is called), the timeout (or sleep) doesn't trigger after the given time, but only after the Receiver is available.
In the context I used it, the async closure was then unfolded into a stream, which is then served to a client.
This causes the stream to become "stuck" and not send a heartbeat, until the Receiver can recieve.
In the tokio example, the stream is instead given to the main function, pinned, and iterated through until completion. Here, the stream does not become "stuck" and instead sends a heartbeat once every second, as expected.
(I put this issue on actix-web because it seems to happen because of the way streaming is handeled and because the tokio example works)
Your Environment
The given examples work on fresh install.
rustc -V
): rustc 1.81.0 (eeb90cda1 2024-09-04)The text was updated successfully, but these errors were encountered: