Skip to content

Commit

Permalink
Upgrade hyper and related crates
Browse files Browse the repository at this point in the history
  • Loading branch information
robklg committed Jan 27, 2025
1 parent 9510b50 commit ea2f2f0
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 413 deletions.
684 changes: 368 additions & 316 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,30 @@ features = ["shipper"]
version = "0.1.0"

[dependencies]
async-trait = "0.1.83"
async-trait = "0.1.85"
base64 = "0.22.1"
clap = { version = "3.2.25", features = ["derive", "env"] }
console-subscriber = { version = "0.3.0", optional = true }
flate2 = "1.0.35"
futures = "0.3.31"
http = "0.2.12"
hyper = { version = "0.14.31", features = ["server", "http1"] }
hyper-rustls = "0.23.2"
http = "1.2.0"
hyper = { version = "1.5.2", features = ["server", "http1"] }
http-body-util = "0.1.2"
hyper-util = { version = "0.1.10", features = ["tokio"] }
hyper-rustls = "0.27.5"
lazy_static = "1.5.0"
libunftp = "0.20.3"
opendal = { version = "0.47.3", optional = true }
prometheus = { version = "0.13.4", features = ["process"] }
serde = { version = "1.0.216", features = ["derive"] }
serde_json = "1.0.133"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }
slog-async = "2.8.0"
slog-term = "2.9.1"
strum = { version = "0.26.3", features = ["derive"] }
strum_macros = "0.26.4"
thiserror = "1.0.69"
tokio = { version = "1.42.0", features = ["signal", "rt-multi-thread"] }
tokio = { version = "1.43.0", features = ["signal", "rt-multi-thread"] }
unftp-sbe-fs = "0.2.6"
unftp-sbe-gcs = { version = "0.2.7", optional = true }
unftp-sbe-opendal = { version = "0.0.1", optional = true }
Expand All @@ -62,9 +64,10 @@ unftp-auth-jsonfile = { version = "0.3.5", optional = true }
unftp-sbe-rooter = "0.2.1"
unftp-sbe-restrict = "0.1.2"
url = "2.5.4"
rustls = "0.23.21"

[target.'cfg(unix)'.dependencies]
unftp-auth-pam = { version = "0.2.5", optional = true }
unftp-auth-pam = { version = "0.2.6", optional = true }

[features]
default = ["rest_auth", "cloud_storage", "jsonfile_auth", "opendal"]
Expand Down Expand Up @@ -120,3 +123,10 @@ unlike your normal FTP server in that it provides:
With unFTP, you can present RFC compliant FTP(S) to the outside world while freeing yourself to use modern APIs and
techniques on the inside of your perimeter.
"""

[patch.crates-io]
libunftp = { git = "https://github.com/bolcom/libunftp.git", branch = "upgrades" }
unftp-sbe-fs = { git = "https://github.com/bolcom/libunftp.git", branch = "upgrades" }
unftp-sbe-gcs = { git = "https://github.com/bolcom/libunftp.git", branch = "upgrades" }
unftp-auth-rest = { git = "https://github.com/bolcom/libunftp.git", branch = "upgrades" }
unftp-auth-jsonfile = { git = "https://github.com/bolcom/libunftp.git", branch = "upgrades" }
6 changes: 3 additions & 3 deletions crates/googlelog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ shipper = []

[dependencies]
google-logging2 = "5.0.5"
serde_json = "1.0.133"
serde_json = "1.0.137"
chrono = "0.4.39"
tokio = { version = "1.42.0", features = ["macros", "time", "rt-multi-thread"] }
tokio = { version = "1.43.0", features = ["macros", "time", "rt-multi-thread"] }
slog = "2.7.0"
thiserror = "1.0.69"
reqwest = { version = "0.12.9", default-features = false, features = ["rustls-tls", "json"] }
reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "json"] }
2 changes: 1 addition & 1 deletion crates/redislog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chrono = { version = "0.4.39", default-features = false, features = ["std", "clo
r2d2 = "0.8.10"
r2d2_redis = "0.14.0"
redis = "0.20.2"
serde_json = "1.0.133"
serde_json = "1.0.137"
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] }

[dev-dependencies]
Expand Down
140 changes: 84 additions & 56 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
//! Contains code pertaining to unFTPs HTTP service it exposes, including prometheus metrics.
use crate::{app, metrics};

use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, StatusCode,
};
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::{Empty, Full};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use slog::*;
use std::convert::Infallible;
use std::net::{IpAddr, Ipv4Addr};
use std::{net::SocketAddr, result::Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt},
net::TcpListener,
};

const PATH_HOME: &str = "/";
const PATH_METRICS: &str = "/metrics";
Expand All @@ -28,31 +32,54 @@ pub async fn start(
.parse()
.map_err(|e| format!("unable to parse HTTP address {}: {}", bind_addr, e))?;

let make_svc = make_service_fn(|_socket: &AddrStream| {
async move {
// service_fn converts our function into a `Service`
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let handler = HttpHandler { ftp_addr };
handler.router(req).await
}))
}
});

let http_server = hyper::Server::bind(&http_addr)
.serve(make_svc)
.with_graceful_shutdown(async {
shutdown.recv().await.ok();
info!(log, "Shutting down HTTP server");
});
let listener = TcpListener::bind(http_addr)
.await
.map_err(|e| format!("unable to parse HTTP address {}: {}", bind_addr, e))?;
let http_server =
hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

info!(log, "Starting HTTP service."; "address" => &http_addr);
info!(log, "Exposing {} service home.", app::NAME; "path" => PATH_HOME);
info!(log, "Exposing Prometheus {} exporter endpoint.", app::NAME; "path" => PATH_METRICS);
info!(log, "Exposing readiness endpoint."; "path" => PATH_READINESS);
info!(log, "Exposing liveness endpoint."; "path" => PATH_HEALTH);

if let Err(e) = http_server.await {
error!(log, "HTTP server error: {}", e)
loop {
tokio::select! {
conn = listener.accept() => {
let (stream, peer_addr) = match conn {
Ok(conn) => conn,
Err(e) => {
error!(log, "Accept error: {}", e);
continue;
}
};
info!(log, "Incoming connection accepted: {}", peer_addr);

let stream = hyper_util::rt::TokioIo::new(stream);

let conn = http_server.serve_connection_with_upgrades(stream, service_fn(move |req: Request<Incoming>| async move {
let handler = HttpHandler { ftp_addr };
handler.router(req).await
}));

let conn = graceful.watch(conn.into_owned());

let log_clone = log.clone();
tokio::spawn(async move {
if let Err(err) = conn.await {
error!(log_clone, "connection error: {}", err);
}
debug!(log_clone, "connection dropped: {}", peer_addr);
});
},
_ = shutdown.recv() => {
drop(listener);
info!(log, "Shutting down HTTP server");
break;
}
}
}

info!(log, "HTTP shutdown OK");
Expand All @@ -65,45 +92,46 @@ struct HttpHandler {
}

impl HttpHandler {
async fn router(&self, req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let mut response: Response<Body> = Response::new(Body::empty());
match (req.method(), req.uri().path()) {
(&Method::GET, PATH_HOME) | (&Method::GET, "/index.html") => {
*response.body_mut() = self.service_home();
}
(&Method::GET, PATH_METRICS) => {
*response.body_mut() = Body::from(metrics::gather());
}
(&Method::GET, PATH_HEALTH) => {
self.health(&mut response).await;
}
(&Method::GET, PATH_READINESS) => {
*response.status_mut() = StatusCode::OK;
}
_ => {
*response.status_mut() = StatusCode::NOT_FOUND;
}
}
async fn router(
&self,
req: Request<Incoming>,
) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, http::Error> {
let (parts, _) = req.into_parts();

let response = match (parts.method, parts.uri.path()) {
(Method::GET, PATH_HOME) | (Method::GET, "/index.html") => Ok(Response::new(
UnsyncBoxBody::new(Full::new(self.service_home())),
)),
(Method::GET, PATH_METRICS) => Ok(Response::new(UnsyncBoxBody::new(Full::new(
metrics::gather().into(),
)))),
(Method::GET, PATH_HEALTH) => self.health().await,
(Method::GET, PATH_READINESS) => Response::builder()
.status(StatusCode::OK)
.body(UnsyncBoxBody::new(Empty::<Bytes>::new())),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(UnsyncBoxBody::new(Empty::<Bytes>::new())),
};

Ok(response)
response
}

fn service_home(&self) -> Body {
fn service_home(&self) -> Bytes {
let index_html = include_str!(concat!(env!("PROJ_WEB_DIR"), "/index.html"));
Body::from(index_html.replace("{{ .AppVersion }}", app::VERSION))
Bytes::from(index_html.replace("{{ .AppVersion }}", app::VERSION))
}

async fn health(&self, response: &mut Response<Body>) {
async fn health(&self) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, http::Error> {
match self.ftp_probe().await {
Ok(_) => {
*response.body_mut() = Body::from("<html>OK!</html>");
*response.status_mut() = StatusCode::OK;
}
Err(_e) => {
// TODO: Log error
*response.body_mut() = Body::from("<html>Service unavailable!</html>");
*response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
}
Ok(_) => Response::builder()
.status(StatusCode::OK)
.body(UnsyncBoxBody::new(Full::<Bytes>::from("<html>OK!</html>"))),
Err(_e) => Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.body(UnsyncBoxBody::new(Full::<Bytes>::from(
"<html>Service unavailable!</html>",
))),
}
}

Expand Down
40 changes: 24 additions & 16 deletions src/infra/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use crate::infra::workload_identity;
use async_trait::async_trait;
use base64::Engine;
use http::{header, Method, Request, StatusCode, Uri};
use hyper::client::connect::dns::GaiResolver;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Response};
use http_body_util::{Either, Empty};
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper::Response;
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -24,7 +28,7 @@ pub struct PubsubEventDispatcher {
api_base_url: String,
project: String,
topic: String,
client: Client<HttpsConnector<HttpConnector>>,
client: Client<HttpsConnector<HttpConnector>, Either<String, Empty<Bytes>>>,
}

const DEFAULT_SERVICE_ENDPOINT: &str = "https://pubsub.googleapis.com";
Expand Down Expand Up @@ -52,14 +56,15 @@ impl PubsubEventDispatcher {
where
Str: Into<String>,
{
let client: Client<HttpsConnector<HttpConnector<GaiResolver>>, Body> = Client::builder()
.build(
HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.build(),
);
let https = HttpsConnectorBuilder::new()
.with_native_roots()
.expect("no native root CA certificates found")
.https_or_http()
.enable_http1()
.build();

let client = Client::builder(TokioExecutor::new()).build(https);

PubsubEventDispatcher {
log,
api_base_url: api_base.into(),
Expand Down Expand Up @@ -110,7 +115,7 @@ impl PubsubEventDispatcher {
// FIXME: When testing locally there won't be a token, we might want to handle this better.
let token = self.get_token().await.unwrap_or_else(|_| "".to_owned());

let request: Request<Body> = Request::builder()
let request: Request<Either<String, Empty<Bytes>>> = Request::builder()
.uri(
Uri::from_maybe_shared(format!(
"{}/v1/projects/{}/topics/{}:publish",
Expand All @@ -120,10 +125,10 @@ impl PubsubEventDispatcher {
)
.header(header::AUTHORIZATION, format!("Bearer {}", token))
.method(Method::POST)
.body(body_string.into())
.body(Either::Left(body_string))
.map_err(|e| format!("error with publish request: {}", e))?;

let response: Response<Body> = self.client.request(request).await.unwrap();
let response: Response<Incoming> = self.client.request(request).await.unwrap();
if response.status() != StatusCode::OK {
Err(format!(
"bad HTTP status code received: {}",
Expand Down Expand Up @@ -162,12 +167,15 @@ struct PubSubMsg {

#[cfg(test)]
mod tests {
use base64::engine::general_purpose;
use base64::Engine as _;

use crate::infra::pubsub::{PubSubMsg, PubSubRequest};
use std::collections::HashMap;

#[test]
fn pubub_request_serializes_correctly() {
let payload = base64::encode("123");
let payload = general_purpose::STANDARD.encode(b"123");
let r = PubSubRequest {
messages: vec![PubSubMsg {
data: payload,
Expand Down
15 changes: 10 additions & 5 deletions src/infra/userdetail_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::domain::user::{User, UserDetailError, UserDetailProvider};
use crate::infra::usrdetail_json::JsonUserProvider;
use async_trait::async_trait;
use http::{Method, Request};
use hyper::{Body, Client};
use http_body_util::{BodyExt, Empty};
use hyper::body::Bytes;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use url::form_urlencoded;

/// A libunftp [`UserDetail`](libunftp::auth::UserDetail) provider that obtains user detail
Expand Down Expand Up @@ -44,25 +47,27 @@ impl UserDetailProvider for HTTPUserDetailProvider {
.method(Method::GET)
.header("Content-type", "application/json")
.uri(format!("{}{}", self.url, username))
.body(Body::empty())
.body(Empty::<Bytes>::new())
.map_err(|e| UserDetailError::with_source("error creating request", e))?;

let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("no native root CA certificates found")
.https_or_http()
.enable_http1()
.build();

let client = Client::builder().build(https);
let client = Client::builder(TokioExecutor::new()).build(https);

let resp = client
.request(req)
.await
.map_err(|e| UserDetailError::with_source("error doing HTTP request", e))?;

let body_bytes = hyper::body::to_bytes(resp.into_body())
let body_bytes = BodyExt::collect(resp.into_body())
.await
.map_err(|e| UserDetailError::with_source("error parsing body", e))?;
.map_err(|e| UserDetailError::with_source("error parsing body", e))?
.to_bytes();

let json_str = std::str::from_utf8(body_bytes.as_ref())
.map_err(|e| UserDetailError::with_source("body is not a valid UTF string", e))?;
Expand Down
Loading

0 comments on commit ea2f2f0

Please sign in to comment.