From 3e1bc0676caa20e65ce8ad26241da041cc075bf9 Mon Sep 17 00:00:00 2001 From: Glen De Cauwsemaecker Date: Fri, 27 Dec 2024 18:54:44 +0100 Subject: [PATCH] sync w/ upstream(h2): v0.4.6 => v0.4.7 Changelog: ## What's Changed - Fix treating HEADERS frames with a non-zero content-length but END_STREAM flag as malformed. - Fix notifying the stream task when automatically reset on receipt of a stream error. Release url: raw changelog: --- FORK.md | 2 +- rama-http-core/src/h2/codec/framed_read.rs | 7 ++- rama-http-core/src/h2/frame/headers.rs | 16 ++++--- rama-http-core/src/h2/frame/util.rs | 6 +-- rama-http-core/src/h2/proto/streams/recv.rs | 12 ++++++ .../src/h2/proto/streams/streams.rs | 3 ++ tests/http-core/h2/client_request.rs | 43 +++++++++++++++++++ tests/http-core/h2/stream_states.rs | 7 ++- 8 files changed, 81 insertions(+), 15 deletions(-) diff --git a/FORK.md b/FORK.md index b41cf1c7..7095ab43 100644 --- a/FORK.md +++ b/FORK.md @@ -9,7 +9,7 @@ as a distant relative. ### hyperium -- +- - - diff --git a/rama-http-core/src/h2/codec/framed_read.rs b/rama-http-core/src/h2/codec/framed_read.rs index 85d04723..eed27b5b 100644 --- a/rama-http-core/src/h2/codec/framed_read.rs +++ b/rama-http-core/src/h2/codec/framed_read.rs @@ -8,7 +8,7 @@ use crate::h2::hpack; use futures_core::Stream; -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; use std::io; @@ -146,8 +146,7 @@ fn decode_frame( macro_rules! header_block { ($frame:ident, $head:ident, $bytes:ident) => ({ // Drop the frame header - // TODO: Change to drain: carllerche/bytes#130 - let _ = $bytes.split_to(frame::HEADER_LEN); + $bytes.advance(frame::HEADER_LEN); // Parse the header frame w/o parsing the payload let (mut frame, mut payload) = match frame::$frame::load($head, $bytes) { @@ -227,7 +226,7 @@ fn decode_frame( .into() } Kind::Data => { - let _ = bytes.split_to(frame::HEADER_LEN); + bytes.advance(frame::HEADER_LEN); let res = frame::Data::load(head, bytes.freeze()); // TODO: Should this always be connection level? Probably not... diff --git a/rama-http-core/src/h2/frame/headers.rs b/rama-http-core/src/h2/frame/headers.rs index 27cfdc32..f0f70076 100644 --- a/rama-http-core/src/h2/frame/headers.rs +++ b/rama-http-core/src/h2/frame/headers.rs @@ -8,7 +8,7 @@ use rama_http_types::{ header, HeaderMap, HeaderName, HeaderValue, Method, Request, StatusCode, Uri, }; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::fmt; use std::io::Cursor; @@ -168,7 +168,7 @@ impl Headers { pad = src[0] as usize; // Drop the padding - let _ = src.split_to(1); + src.advance(1); } // Read the stream dependency @@ -183,7 +183,7 @@ impl Headers { } // Drop the next 5 bytes - let _ = src.split_to(5); + src.advance(5); Some(stream_dep) } else { @@ -256,6 +256,10 @@ impl Headers { &mut self.header_block.pseudo } + pub(crate) fn pseudo(&self) -> &Pseudo { + &self.header_block.pseudo + } + /// Whether it has status 1xx pub(crate) fn is_informational(&self) -> bool { self.header_block.pseudo.is_informational() @@ -426,7 +430,7 @@ impl PushPromise { pad = src[0] as usize; // Drop the padding - let _ = src.split_to(1); + src.advance(1); } if src.len() < 5 { @@ -435,7 +439,7 @@ impl PushPromise { let (promised_id, _) = StreamId::parse(&src[..4]); // Drop promised_id bytes - let _ = src.split_to(4); + src.advance(4); if pad > 0 { if pad > src.len() { @@ -658,7 +662,7 @@ impl EncodingHeaderBlock { // Now, encode the header payload let continuation = if self.hpack.len() > dst.remaining_mut() { - dst.put_slice(&self.hpack.split_to(dst.remaining_mut())); + dst.put((&mut self.hpack).take(dst.remaining_mut())); Some(Continuation { stream_id: head.stream_id(), diff --git a/rama-http-core/src/h2/frame/util.rs b/rama-http-core/src/h2/frame/util.rs index 0393c937..3eb3c8bb 100644 --- a/rama-http-core/src/h2/frame/util.rs +++ b/rama-http-core/src/h2/frame/util.rs @@ -1,7 +1,7 @@ use std::fmt; use super::Error; -use bytes::Bytes; +use bytes::{Buf, Bytes}; /// Strip padding from the given payload. /// @@ -32,8 +32,8 @@ pub(super) fn strip_padding(payload: &mut Bytes) -> Result { return Err(Error::TooMuchPadding); } - let _ = payload.split_to(1); - let _ = payload.split_off(payload_len - pad_len - 1); + payload.advance(1); + payload.truncate(payload_len - pad_len - 1); Ok(pad_len as u8) } diff --git a/rama-http-core/src/h2/proto/streams/recv.rs b/rama-http-core/src/h2/proto/streams/recv.rs index ad36027d..6642c99d 100644 --- a/rama-http-core/src/h2/proto/streams/recv.rs +++ b/rama-http-core/src/h2/proto/streams/recv.rs @@ -185,6 +185,18 @@ impl Recv { }; stream.content_length = ContentLength::Remaining(content_length); + // END_STREAM on headers frame with non-zero content-length is malformed. + // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1 + if frame.is_end_stream() + && content_length > 0 + && frame + .pseudo() + .status + .map_or(true, |status| status != 204 && status != 304) + { + proto_err!(stream: "recv_headers with END_STREAM: content-length is not zero; stream={:?};", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } } } diff --git a/rama-http-core/src/h2/proto/streams/streams.rs b/rama-http-core/src/h2/proto/streams/streams.rs index e2400904..1abb6d29 100644 --- a/rama-http-core/src/h2/proto/streams/streams.rs +++ b/rama-http-core/src/h2/proto/streams/streams.rs @@ -1587,6 +1587,9 @@ impl Actions { // Reset the stream. self.send .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); + self.recv.enqueue_reset_expiration(stream, counts); + // if a RecvStream is parked, ensure it's notified + stream.notify_recv(); Ok(()) } else { tracing::warn!( diff --git a/tests/http-core/h2/client_request.rs b/tests/http-core/h2/client_request.rs index 975580f1..157fa572 100644 --- a/tests/http-core/h2/client_request.rs +++ b/tests/http-core/h2/client_request.rs @@ -1376,6 +1376,49 @@ async fn allow_empty_data_for_head() { join(srv, h2).await; } +#[tokio::test] +async fn reject_none_zero_content_length_header_with_end_stream() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame( + frames::headers(1) + .response(200) + .field("content-length", 100) + .eos(), + ) + .await; + }; + + let h2 = async move { + let (mut client, h2) = client::Builder::new() + .handshake::<_, Bytes>(io) + .await + .unwrap(); + tokio::spawn(async { + h2.await.expect("connection failed"); + }); + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + let _ = response.await.unwrap_err(); + }; + + join(srv, h2).await; +} + #[tokio::test] #[ignore] async fn early_hints() { diff --git a/tests/http-core/h2/stream_states.rs b/tests/http-core/h2/stream_states.rs index 1f68ddae..04290f38 100644 --- a/tests/http-core/h2/stream_states.rs +++ b/tests/http-core/h2/stream_states.rs @@ -548,7 +548,12 @@ async fn recv_next_stream_id_updated_by_malformed_headers() { client.recv_frame(frames::go_away(1).protocol_error()).await; }; let srv = async move { - let mut srv = server::handshake(io).await.expect("handshake"); + let mut srv = server::Builder::new() + // forget the bad stream immediately + .max_concurrent_reset_streams(0) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); let res = srv.next().await.unwrap(); let err = res.unwrap_err(); assert_eq!(err.reason(), Some(h2::Reason::PROTOCOL_ERROR));