Skip to content

Commit

Permalink
remove h2 lock as well. rama backend http client is now lock-free at …
Browse files Browse the repository at this point in the history
…the surface
  • Loading branch information
GlenDC committed Dec 28, 2024
1 parent 444547b commit 662c97a
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 18 deletions.
3 changes: 1 addition & 2 deletions rama-http-backend/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use rama_net::{

use rama_utils::macros::define_inner_service_accessors;
use std::fmt;
use tokio::sync::Mutex;
use tracing::trace;

#[cfg(any(feature = "rustls", feature = "boring"))]
Expand Down Expand Up @@ -115,7 +114,7 @@ where
}
});

let svc = HttpClientService(SendRequest::Http2(Mutex::new(sender)));
let svc = HttpClientService(SendRequest::Http2(sender));

Ok(EstablishedClientConnection {
ctx,
Expand Down
7 changes: 2 additions & 5 deletions rama-http-backend/src/client/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ use rama_http_types::{
Method, Request, Response, Version,
};
use rama_net::{address::ProxyAddress, http::RequestContext};
use tokio::sync::Mutex;

#[derive(Debug)]
// TODO: once we have hyper as `rama_core` we can
// drop this mutex as there is no inherint reason for `sender` to be mutable...
pub(super) enum SendRequest<Body> {
Http1(rama_http_core::client::conn::http1::SendRequest<Body>),
Http2(Mutex<rama_http_core::client::conn::http2::SendRequest<Body>>),
Http2(rama_http_core::client::conn::http2::SendRequest<Body>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -48,7 +45,7 @@ where

let resp = match &self.0 {
SendRequest::Http1(sender) => sender.send_request(req).await,
SendRequest::Http2(sender) => sender.lock().await.send_request(req).await,
SendRequest::Http2(sender) => sender.send_request(req).await,
}?;

Ok(resp.map(rama_http_types::Body::new))
Expand Down
2 changes: 1 addition & 1 deletion rama-http-core/src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
/// Absolute-form `Uri`s are not required. If received, they will be serialized
/// as-is.
pub fn send_request(
&mut self,
&self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
let sent = self.dispatch.send(req);
Expand Down
2 changes: 1 addition & 1 deletion rama-http-core/src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<T, U> UnboundedSender<T, U> {
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}

pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
pub(crate) fn send(&self, val: T) -> Result<Promise<U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
Expand Down
6 changes: 3 additions & 3 deletions rama-http-core/src/server/conn/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ mod tests {
#[tokio::test]
async fn http2() {
let addr = start_server(false, false).await;
let mut sender = connect_h2(addr).await;
let sender = connect_h2(addr).await;

let response = sender
.send_request(Request::new(Empty::<Bytes>::new()))
Expand All @@ -832,7 +832,7 @@ mod tests {
#[tokio::test]
async fn http2_only() {
let addr = start_server(false, true).await;
let mut sender = connect_h2(addr).await;
let sender = connect_h2(addr).await;

let response = sender
.send_request(Request::new(Empty::<Bytes>::new()))
Expand Down Expand Up @@ -876,7 +876,7 @@ mod tests {
#[tokio::test]
async fn http1_only_fail_if_client_is_http2() {
let addr = start_server(true, false).await;
let mut sender = connect_h2(addr).await;
let sender = connect_h2(addr).await;

let _ = sender
.send_request(Request::new(Empty::<Bytes>::new()))
Expand Down
8 changes: 4 additions & 4 deletions tests/http-core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2315,7 +2315,7 @@ mod conn {
let (tx2, rx2) = oneshot::channel::<()>();
let mut rxs = vec![rx, rx2];
for _i in 0..requests {
let mut client = clients.pop().unwrap();
let client = clients.pop().unwrap();
let rx = rxs.pop().unwrap();
let req = Request::builder()
.method(Method::CONNECT)
Expand Down Expand Up @@ -2530,7 +2530,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(Executor::new())
let (client, conn) = conn::http2::Builder::new(Executor::new())
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -2585,7 +2585,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(Executor::new())
let (client, conn) = conn::http2::Builder::new(Executor::new())
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -2635,7 +2635,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(Executor::new())
let (client, conn) = conn::http2::Builder::new(Executor::new())
.handshake::<_, Empty<Bytes>>(io)
.await
.expect("http handshake");
Expand Down
4 changes: 2 additions & 2 deletions tests/http-core/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ async fn async_test(cfg: __TestConfig) {
let stream = TcpStream::connect(addr).await.unwrap();

let res = if http2_only {
let (mut sender, conn) =
let (sender, conn) =
rama::http::core::client::conn::http2::Builder::new(Executor::new())
.handshake(stream)
.await
Expand Down Expand Up @@ -550,7 +550,7 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>)
.unwrap();

let result = if http2_only {
let (mut sender, conn) =
let (sender, conn) =
rama::http::core::client::conn::http2::Builder::new(
Executor::new(),
)
Expand Down

0 comments on commit 662c97a

Please sign in to comment.