Skip to content

Commit

Permalink
Merge pull request #69 from http-rs/fix-unexpected-end
Browse files Browse the repository at this point in the history
Fix stream freezing when body ends unexpectedly
  • Loading branch information
yoshuawuyts authored Mar 15, 2020
2 parents 739e481 + e2ec881 commit e798b22
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 10 deletions.
13 changes: 13 additions & 0 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
read += n;
let new_state = if new_current == len {
State::ChunkEnd
} else if n == 0 {
// Unexpected end
// TODO: do something?
State::Done
} else {
State::Chunk(new_current, len)
};
Expand Down Expand Up @@ -279,6 +283,15 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
return Poll::Pending;
}
};
match (bytes_read, &this.state) {
(0, State::Done) => {}
(0, _) => {
// Unexpected end
// TODO: do something?
this.state = State::Done;
}
_ => {}
}
n.end += bytes_read;
}
match this.poll_read_inner(cx, buffer, &n, buf)? {
Expand Down
15 changes: 13 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use async_std::io::{self, BufReader, Read, Write};
use async_std::prelude::*;
use async_std::task::{Context, Poll};
use futures_core::ready;
use http_types::{ensure, ensure_eq, format_err};
use http_types::{
headers::{HeaderName, HeaderValue, CONTENT_LENGTH, DATE, TRANSFER_ENCODING},
Expand Down Expand Up @@ -241,7 +240,19 @@ impl Read for Encoder {
}

if !self.body_done {
let n = ready!(Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]))?;
let inner_poll_result =
Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]);
let n = match inner_poll_result {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
if bytes_read == 0 {
return Poll::Pending;
} else {
return Poll::Ready(Ok(bytes_read as usize));
}
}
};
bytes_read += n;
self.body_bytes_read += n;
if bytes_read == 0 {
Expand Down
51 changes: 43 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use async_std::io::{self, BufReader};
use async_std::io::{Read, Write};
use async_std::prelude::*;
use async_std::task::{Context, Poll};
use futures_core::ready;
use http_types::headers::{HeaderName, HeaderValue, CONTENT_LENGTH, TRANSFER_ENCODING};
use http_types::{ensure, ensure_eq, format_err};
use http_types::{Body, Method, Request, Response};
Expand Down Expand Up @@ -198,9 +197,19 @@ impl Read for Encoder {
// Figure out how many bytes we can read.
let upper_bound = (bytes_read + body_len - body_bytes_read).min(buf.len());
// Read bytes from body
let new_body_bytes_read =
ready!(Pin::new(&mut self.res)
.poll_read(cx, &mut buf[bytes_read..upper_bound]))?;
let inner_poll_result =
Pin::new(&mut self.res).poll_read(cx, &mut buf[bytes_read..upper_bound]);
let new_body_bytes_read = match inner_poll_result {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
if bytes_read == 0 {
return Poll::Pending;
} else {
break;
}
}
};
body_bytes_read += new_body_bytes_read;
bytes_read += new_body_bytes_read;

Expand All @@ -212,8 +221,13 @@ impl Read for Encoder {
body_len,
body_bytes_read
);
// If we've read the `len` number of bytes, end
if body_len == body_bytes_read {
// If we've read the `len` number of bytes, end
self.state = EncoderState::Done;
break;
} else if new_body_bytes_read == 0 {
// If we've reached unexpected EOF, end anyway
// TODO: do something?
self.state = EncoderState::Done;
break;
} else {
Expand All @@ -237,8 +251,18 @@ impl Read for Encoder {
// it into the actual buffer
let mut chunk_buf = vec![0; buffer_remaining];
// Read bytes from body reader
let chunk_length =
ready!(Pin::new(&mut self.res).poll_read(cx, &mut chunk_buf))?;
let inner_poll_result = Pin::new(&mut self.res).poll_read(cx, &mut chunk_buf);
let chunk_length = match inner_poll_result {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
if bytes_read == 0 {
return Poll::Pending;
} else {
break;
}
}
};

// serialize chunk length as hex
let chunk_length_string = format!("{:X}", chunk_length);
Expand Down Expand Up @@ -311,7 +335,18 @@ impl Read for Encoder {
ref mut chunk,
is_last,
} => {
bytes_read += ready!(Pin::new(chunk).poll_read(cx, &mut buf))?;
let inner_poll_result = Pin::new(chunk).poll_read(cx, &mut buf);
bytes_read += match inner_poll_result {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
if bytes_read == 0 {
return Poll::Pending;
} else {
break;
}
}
};
if bytes_read == 0 {
self.state = match is_last {
true => EncoderState::Done,
Expand Down
5 changes: 5 additions & 0 deletions tests/fixtures/request-unexpected-eof.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
POST / HTTP/1.1
content-type: text/plain
content-length: 11

aaaaabbbbb
6 changes: 6 additions & 0 deletions tests/fixtures/response-unexpected-eof.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
HTTP/1.1 200 OK
content-length: 11
date: {DATE}
content-type: text/plain

aaaaabbbbb
27 changes: 27 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,30 @@ async fn test_chunked_echo() {

case.assert().await;
}

#[async_std::test]
async fn test_unexpected_eof() {
// We can't predict unexpected EOF, so the response content-length is still 11
let case = TestCase::new_server(
"fixtures/request-unexpected-eof.txt",
"fixtures/response-unexpected-eof.txt",
)
.await;
let addr = "http://example.com";

async_h1::accept(addr, case.clone(), |req| async {
let mut resp = Response::new(StatusCode::Ok);
let ct = req.content_type();
let body: Body = req.into();
resp.set_body(body);
if let Some(ct) = ct {
resp.set_content_type(ct);
}

Ok(resp)
})
.await
.unwrap();

case.assert().await;
}

0 comments on commit e798b22

Please sign in to comment.