diff --git a/Cargo.lock b/Cargo.lock index 39d834f2..2e5e2e29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,6 +611,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "data-encoding" version = "2.7.0" @@ -2208,6 +2229,7 @@ dependencies = [ "brotli", "bytes", "const_format", + "csv", "flate2", "futures-lite", "headers", @@ -2307,6 +2329,7 @@ version = "0.2.0-alpha.7" dependencies = [ "bytes", "const_format", + "csv", "futures-core", "futures-lite", "headers", diff --git a/Cargo.toml b/Cargo.toml index 0cb29bb3..ea9bd6cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ brotli = "7" bytes = "1" clap = { version = "4.5.15", features = ["derive"] } crossterm = "0.27" +csv = "1.3.1" flate2 = "1.0" futures-lite = "2.3.0" futures-core = "0.3" diff --git a/rama-http-types/Cargo.toml b/rama-http-types/Cargo.toml index 13233f85..2a8b6df7 100644 --- a/rama-http-types/Cargo.toml +++ b/rama-http-types/Cargo.toml @@ -16,6 +16,7 @@ workspace = true [dependencies] bytes = { workspace = true } const_format = { workspace = true } +csv = { workspace = true } futures-core = { workspace = true } futures-lite = { workspace = true } headers = { workspace = true } diff --git a/rama-http-types/src/response/csv.rs b/rama-http-types/src/response/csv.rs new file mode 100644 index 00000000..9d887b95 --- /dev/null +++ b/rama-http-types/src/response/csv.rs @@ -0,0 +1,145 @@ +use crate::response::{IntoResponse, Response}; +use crate::{dep::http::StatusCode, Body}; +use bytes::{BufMut, BytesMut}; +use csv; +use headers::ContentType; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use rama_error::OpaqueError; +use rama_utils::macros::impl_deref; +use serde::Serialize; +use std::fmt; + +use super::Headers; + +/// Wrapper used to create Csv Http [`Response`]s, +/// as well as to extract Csv from Http [`Request`] bodies. +/// +/// [`Request`]: crate::Request +/// [`Response`]: crate::Response +/// +/// # Examples +/// +/// ## Creating a Csv Response +/// +/// ``` +/// use serde_json::json; +/// use rama_http_types::{IntoResponse, response::Csv}; +/// +/// async fn handler() -> impl IntoResponse { +/// Csv( +/// vec![ +/// json!({ +/// "name": "john", +/// "age": 30, +/// "is_student": false +/// }) +/// ] +/// ) +/// } +/// ``` +/// +/// ## Extracting Csv from a Request +/// +/// ``` +/// use serde_json::json; +/// use rama_http_types::response::Csv; +/// +/// #[derive(Debug, serde::Deserialize)] +/// struct Input { +/// name: String, +/// age: u8, +/// alive: Option, +/// } +/// +/// # fn bury(name: impl AsRef) {} +/// +/// async fn handler(Csv(input): Csv>) { +/// if !input[0].alive.unwrap_or_default() { +/// bury(&input[0].name); +/// } +/// } +/// ``` +pub struct Csv(pub T); + +impl fmt::Debug for Csv { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Csv").field(&self.0).finish() + } +} + +impl Clone for Csv { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl_deref!(Csv); + +impl From for Csv { + fn from(inner: T) -> Self { + Self(inner) + } +} + +impl IntoResponse for Csv +where + T: IntoIterator + std::fmt::Debug, +{ + fn into_response(self) -> Response { + // Use a small initial capacity of 128 bytes like serde_json::to_vec + // https://docs.rs/serde_json/1.0.82/src/serde_json/ser.rs.html#2189 + let mut buf = BytesMut::with_capacity(128).writer(); + { + let mut wtr = csv::Writer::from_writer(&mut buf); + let res: Result, _> = self.0.into_iter().map(|rec| wtr.serialize(rec)).collect(); + if let Err(err) = res { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Headers::single(ContentType::text_utf8()), + err.to_string(), + ) + .into_response(); + } + if let Err(err) = wtr.flush() { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Headers::single(ContentType::text_utf8()), + err.to_string(), + ) + .into_response(); + } + } + + ( + [(CONTENT_TYPE, HeaderValue::from_static("text/csv"))], + buf.into_inner().freeze(), + ) + .into_response() + } +} + +impl TryInto for Csv +where + T: IntoIterator, +{ + type Error = OpaqueError; + + fn try_into(self) -> Result { + // Use a small initial capacity of 128 bytes like serde_json::to_vec + // https://docs.rs/serde_json/1.0.82/src/serde_json/ser.rs.html#2189 + let mut buf = BytesMut::with_capacity(128).writer(); + { + let mut wtr = csv::Writer::from_writer(&mut buf); + let res: Result, _> = self.0.into_iter().map(|rec| wtr.serialize(rec)).collect(); + if let Err(err) = res { + return Err(OpaqueError::from_std(err)); + } + if let Err(err) = wtr.flush() { + return Err(OpaqueError::from_std(err)); + } + } + + Ok(buf.into_inner().freeze().into()) + } +} diff --git a/rama-http-types/src/response/mod.rs b/rama-http-types/src/response/mod.rs index 7f1be832..1bf0bdb2 100644 --- a/rama-http-types/src/response/mod.rs +++ b/rama-http-types/src/response/mod.rs @@ -25,6 +25,10 @@ mod json; #[doc(inline)] pub use json::Json; +mod csv; +#[doc(inline)] +pub use csv::Csv; + mod form; #[doc(inline)] pub use form::Form; diff --git a/rama-http/Cargo.toml b/rama-http/Cargo.toml index a7bf21e2..ac596aa2 100644 --- a/rama-http/Cargo.toml +++ b/rama-http/Cargo.toml @@ -31,6 +31,7 @@ base64 = { workspace = true } bitflags = { workspace = true } bytes = { workspace = true } const_format = { workspace = true } +csv = { workspace = true } futures-lite = { workspace = true } headers = { workspace = true } http = { workspace = true } diff --git a/rama-http/src/service/web/endpoint/extract/body/csv.rs b/rama-http/src/service/web/endpoint/extract/body/csv.rs new file mode 100644 index 00000000..6521afcb --- /dev/null +++ b/rama-http/src/service/web/endpoint/extract/body/csv.rs @@ -0,0 +1,155 @@ +use super::BytesRejection; +use crate::dep::http_body_util::BodyExt; +use crate::service::web::extract::FromRequest; +use crate::utils::macros::{composite_http_rejection, define_http_rejection}; +use crate::Request; +use bytes::Buf; + +pub use crate::response::Csv; + +define_http_rejection! { + #[status = UNSUPPORTED_MEDIA_TYPE] + #[body = "Csv requests must have `Content-Type: text/csv`"] + /// Rejection type for [`Csv`] + /// used if the `Content-Type` header is missing + /// or its value is not `text/csv`. + pub struct InvalidCsvContentType; +} + +define_http_rejection! { + #[status = BAD_REQUEST] + #[body = "Failed to deserialize csv payload"] + /// Rejection type used if the [`Csv`] + /// deserialize the payload into the target type. + pub struct FailedToDeserializeCsv(Error); +} + +composite_http_rejection! { + /// Rejection used for [`Csv`] + /// + /// Contains one variant for each way the [`Csv`] extractor + /// can fail. + pub enum CsvRejection { + InvalidCsvContentType, + FailedToDeserializeCsv, + BytesRejection, + } +} + +impl FromRequest for Csv> +where + T: serde::de::DeserializeOwned + Send + Sync + 'static, +{ + type Rejection = CsvRejection; + + async fn from_request(req: Request) -> Result { + if !crate::service::web::extract::has_any_content_type(req.headers(), &[&mime::TEXT_CSV]) { + return Err(InvalidCsvContentType.into()); + } + + let body = req.into_body(); + match body.collect().await { + Ok(c) => { + let b = c.to_bytes(); + let mut rdr = csv::Reader::from_reader(b.clone().reader()); + + let out: Result, _> = rdr + .deserialize() + .map(|rec| { + let record: Result = rec; + record + }) + .collect(); + + match out { + Ok(s) => Ok(Self(s)), + Err(err) => Err(FailedToDeserializeCsv::from_err(err).into()), + } + } + Err(err) => Err(BytesRejection::from_err(err).into()), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::service::web::WebService; + use crate::StatusCode; + use rama_core::{Context, Service}; + + #[tokio::test] + async fn test_csv() { + #[derive(serde::Deserialize)] + struct Input { + name: String, + age: u8, + alive: Option, + } + + let service = WebService::default().post("/", |Csv(body): Csv>| async move { + assert_eq!(body.len(), 2); + + assert_eq!(body[0].name, "glen"); + assert_eq!(body[0].age, 42); + assert_eq!(body[0].alive, None); + + assert_eq!(body[1].name, "adr"); + assert_eq!(body[1].age, 40); + assert_eq!(body[1].alive, Some(true)); + StatusCode::OK + }); + + let req = http::Request::builder() + .method(http::Method::POST) + .header(http::header::CONTENT_TYPE, "text/csv; charset=utf-8") + .body("name,age,alive\nglen,42,\nadr,40,true\n".into()) + .unwrap(); + let resp = service.serve(Context::default(), req).await.unwrap(); + println!("debug {:?}", resp); + assert_eq!(resp.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_csv_missing_content_type() { + #[derive(Debug, serde::Deserialize)] + struct Input { + _name: String, + _age: u8, + _alive: Option, + } + + let service = WebService::default() + .post("/", |Csv(_): Csv>| async move { StatusCode::OK }); + + let req = http::Request::builder() + .method(http::Method::POST) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(r#"{"name": "glen", "age": 42}"#.into()) + .unwrap(); + let resp = service.serve(Context::default(), req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + #[tokio::test] + async fn test_csv_invalid_body() { + #[derive(Debug, serde::Deserialize)] + struct Input { + _name: String, + _age: u8, + _alive: Option, + } + + let service = WebService::default() + .post("/", |Csv(_): Csv>| async move { StatusCode::OK }); + + let req = http::Request::builder() + .method(http::Method::POST) + .header(http::header::CONTENT_TYPE, "text/csv; charset=utf-8") + // the missing column last line should trigger an error + .body("name,age,alive\nglen,42,\nadr,40\n".into()) + .unwrap(); + let resp = service.serve(Context::default(), req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } +} diff --git a/rama-http/src/service/web/endpoint/extract/body/mod.rs b/rama-http/src/service/web/endpoint/extract/body/mod.rs index 07108a2c..681d7888 100644 --- a/rama-http/src/service/web/endpoint/extract/body/mod.rs +++ b/rama-http/src/service/web/endpoint/extract/body/mod.rs @@ -15,6 +15,10 @@ mod json; #[doc(inline)] pub use json::*; +mod csv; +#[doc(inline)] +pub use csv::*; + mod form; #[doc(inline)] pub use form::*; diff --git a/rama-http/src/service/web/endpoint/extract/mod.rs b/rama-http/src/service/web/endpoint/extract/mod.rs index 96d9ae82..858fffe9 100644 --- a/rama-http/src/service/web/endpoint/extract/mod.rs +++ b/rama-http/src/service/web/endpoint/extract/mod.rs @@ -29,7 +29,7 @@ pub use typed_header::{TypedHeader, TypedHeaderRejection, TypedHeaderRejectionRe mod body; #[doc(inline)] -pub use body::{Body, Bytes, Form, Json, Text}; +pub use body::{Body, Bytes, Csv, Form, Json, Text}; mod option; #[doc(inline)]