From a0eeac8e7a73485e3c22cc5505fcb96d9a368fa8 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Sun, 8 Mar 2020 22:13:59 +0900 Subject: [PATCH 1/2] Fix stream freezing when body ends unexpectedly --- src/chunked.rs | 13 ++++++ src/client.rs | 15 ++++++- src/server.rs | 51 ++++++++++++++++++---- tests/fixtures/request-unexpected-eof.txt | 5 +++ tests/fixtures/response-unexpected-eof.txt | 6 +++ tests/server.rs | 27 ++++++++++++ 6 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 tests/fixtures/request-unexpected-eof.txt create mode 100644 tests/fixtures/response-unexpected-eof.txt diff --git a/src/chunked.rs b/src/chunked.rs index bebf43d..a1d5fb3 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -99,6 +99,10 @@ impl ChunkedDecoder { read += n; let new_state = if new_current == len { State::ChunkEnd + } else if n == 0 { + // Unexpected end + // TODO: do something? + State::Done } else { State::Chunk(new_current, len) }; @@ -279,6 +283,15 @@ impl Read for ChunkedDecoder { return Poll::Pending; } }; + match (bytes_read, &this.state) { + (0, State::Done) => {} + (0, _) => { + // Unexpected end + // TODO: do something? + this.state = State::Done; + } + _ => {} + } n.end += bytes_read; } match this.poll_read_inner(cx, buffer, &n, buf)? { diff --git a/src/client.rs b/src/client.rs index bf00301..ff05c42 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,6 @@ use async_std::io::{self, BufReader, Read, Write}; use async_std::prelude::*; use async_std::task::{Context, Poll}; -use futures_core::ready; use http_types::{ensure, ensure_eq, format_err}; use http_types::{ headers::{HeaderName, HeaderValue, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}, @@ -241,7 +240,19 @@ impl Read for Encoder { } if !self.body_done { - let n = ready!(Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]))?; + let inner_poll_result = + Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]); + let n = match inner_poll_result { + Poll::Ready(Ok(n)) => n, + Poll::Pending => { + if bytes_read == 0 { + return Poll::Pending; + } else { + return Poll::Ready(Ok(bytes_read as usize)); + } + } + e => return e, + }; bytes_read += n; self.body_bytes_read += n; if bytes_read == 0 { diff --git a/src/server.rs b/src/server.rs index d39ac33..b5be0ec 100644 --- a/src/server.rs +++ b/src/server.rs @@ -9,7 +9,6 @@ use async_std::io::{self, BufReader}; use async_std::io::{Read, Write}; use async_std::prelude::*; use async_std::task::{Context, Poll}; -use futures_core::ready; use http_types::headers::{HeaderName, HeaderValue, CONTENT_LENGTH, TRANSFER_ENCODING}; use http_types::{ensure, ensure_eq, format_err}; use http_types::{Body, Method, Request, Response}; @@ -198,9 +197,19 @@ impl Read for Encoder { // Figure out how many bytes we can read. let upper_bound = (bytes_read + body_len - body_bytes_read).min(buf.len()); // Read bytes from body - let new_body_bytes_read = - ready!(Pin::new(&mut self.res) - .poll_read(cx, &mut buf[bytes_read..upper_bound]))?; + let inner_poll_result = + Pin::new(&mut self.res).poll_read(cx, &mut buf[bytes_read..upper_bound]); + let new_body_bytes_read = match inner_poll_result { + Poll::Ready(Ok(n)) => n, + Poll::Pending => { + if bytes_read == 0 { + return Poll::Pending; + } else { + break; + } + } + e => return e, + }; body_bytes_read += new_body_bytes_read; bytes_read += new_body_bytes_read; @@ -212,8 +221,13 @@ impl Read for Encoder { body_len, body_bytes_read ); - // If we've read the `len` number of bytes, end if body_len == body_bytes_read { + // If we've read the `len` number of bytes, end + self.state = EncoderState::Done; + break; + } else if new_body_bytes_read == 0 { + // If we've reached unexpected EOF, end anyway + // TODO: do something? self.state = EncoderState::Done; break; } else { @@ -237,8 +251,18 @@ impl Read for Encoder { // it into the actual buffer let mut chunk_buf = vec![0; buffer_remaining]; // Read bytes from body reader - let chunk_length = - ready!(Pin::new(&mut self.res).poll_read(cx, &mut chunk_buf))?; + let inner_poll_result = Pin::new(&mut self.res).poll_read(cx, &mut chunk_buf); + let chunk_length = match inner_poll_result { + Poll::Ready(Ok(n)) => n, + Poll::Pending => { + if bytes_read == 0 { + return Poll::Pending; + } else { + break; + } + } + e => return e, + }; // serialize chunk length as hex let chunk_length_string = format!("{:X}", chunk_length); @@ -311,7 +335,18 @@ impl Read for Encoder { ref mut chunk, is_last, } => { - bytes_read += ready!(Pin::new(chunk).poll_read(cx, &mut buf))?; + let inner_poll_result = Pin::new(chunk).poll_read(cx, &mut buf); + bytes_read += match inner_poll_result { + Poll::Ready(Ok(n)) => n, + Poll::Pending => { + if bytes_read == 0 { + return Poll::Pending; + } else { + break; + } + } + e => return e, + }; if bytes_read == 0 { self.state = match is_last { true => EncoderState::Done, diff --git a/tests/fixtures/request-unexpected-eof.txt b/tests/fixtures/request-unexpected-eof.txt new file mode 100644 index 0000000..9e02003 --- /dev/null +++ b/tests/fixtures/request-unexpected-eof.txt @@ -0,0 +1,5 @@ +POST / HTTP/1.1 +content-type: text/plain +content-length: 11 + +aaaaabbbbb \ No newline at end of file diff --git a/tests/fixtures/response-unexpected-eof.txt b/tests/fixtures/response-unexpected-eof.txt new file mode 100644 index 0000000..acb7e76 --- /dev/null +++ b/tests/fixtures/response-unexpected-eof.txt @@ -0,0 +1,6 @@ +HTTP/1.1 200 OK +content-length: 11 +date: {DATE} +content-type: text/plain + +aaaaabbbbb \ No newline at end of file diff --git a/tests/server.rs b/tests/server.rs index 3cea066..a3355c3 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -72,3 +72,30 @@ async fn test_chunked_echo() { case.assert().await; } + +#[async_std::test] +async fn test_unexpected_eof() { + // We can't predict unexpected EOF, so the response content-length is still 11 + let case = TestCase::new_server( + "fixtures/request-unexpected-eof.txt", + "fixtures/response-unexpected-eof.txt", + ) + .await; + let addr = "http://example.com"; + + async_h1::accept(addr, case.clone(), |req| async { + let mut resp = Response::new(StatusCode::Ok); + let ct = req.content_type(); + let body: Body = req.into(); + resp.set_body(body); + if let Some(ct) = ct { + resp.set_content_type(ct); + } + + Ok(resp) + }) + .await + .unwrap(); + + case.assert().await; +} From e2ec8811ae8dfff4b4d2702a3a5f6d2861b5a8b2 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 15 Mar 2020 12:59:03 +0100 Subject: [PATCH 2/2] Add feedback from review --- src/client.rs | 2 +- src/server.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index ff05c42..0abe587 100644 --- a/src/client.rs +++ b/src/client.rs @@ -244,6 +244,7 @@ impl Read for Encoder { Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]); let n = match inner_poll_result { Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => { if bytes_read == 0 { return Poll::Pending; @@ -251,7 +252,6 @@ impl Read for Encoder { return Poll::Ready(Ok(bytes_read as usize)); } } - e => return e, }; bytes_read += n; self.body_bytes_read += n; diff --git a/src/server.rs b/src/server.rs index b5be0ec..da1196f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -201,6 +201,7 @@ impl Read for Encoder { Pin::new(&mut self.res).poll_read(cx, &mut buf[bytes_read..upper_bound]); let new_body_bytes_read = match inner_poll_result { Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => { if bytes_read == 0 { return Poll::Pending; @@ -208,7 +209,6 @@ impl Read for Encoder { break; } } - e => return e, }; body_bytes_read += new_body_bytes_read; bytes_read += new_body_bytes_read; @@ -254,6 +254,7 @@ impl Read for Encoder { let inner_poll_result = Pin::new(&mut self.res).poll_read(cx, &mut chunk_buf); let chunk_length = match inner_poll_result { Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => { if bytes_read == 0 { return Poll::Pending; @@ -261,7 +262,6 @@ impl Read for Encoder { break; } } - e => return e, }; // serialize chunk length as hex @@ -338,6 +338,7 @@ impl Read for Encoder { let inner_poll_result = Pin::new(chunk).poll_read(cx, &mut buf); bytes_read += match inner_poll_result { Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => { if bytes_read == 0 { return Poll::Pending; @@ -345,7 +346,6 @@ impl Read for Encoder { break; } } - e => return e, }; if bytes_read == 0 { self.state = match is_last {