Skip to content

Commit

Permalink
feat(rama-http): add csv support (#401)
Browse files Browse the repository at this point in the history
* feat(rama-http): add csv support
* feat(rama-http): update FromRequest on Csv to Csv<Vec<T>>
  • Loading branch information
IamTossan authored Jan 29, 2025
1 parent dcdf410 commit 8916d9d
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 1 deletion.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ brotli = "7"
bytes = "1"
clap = { version = "4.5.15", features = ["derive"] }
crossterm = "0.27"
csv = "1.3.1"
flate2 = "1.0"
futures-lite = "2.3.0"
futures-core = "0.3"
Expand Down
1 change: 1 addition & 0 deletions rama-http-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
[dependencies]
bytes = { workspace = true }
const_format = { workspace = true }
csv = { workspace = true }
futures-core = { workspace = true }
futures-lite = { workspace = true }
headers = { workspace = true }
Expand Down
145 changes: 145 additions & 0 deletions rama-http-types/src/response/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use crate::response::{IntoResponse, Response};
use crate::{dep::http::StatusCode, Body};
use bytes::{BufMut, BytesMut};
use csv;
use headers::ContentType;
use http::header::CONTENT_TYPE;
use http::HeaderValue;
use rama_error::OpaqueError;
use rama_utils::macros::impl_deref;
use serde::Serialize;
use std::fmt;

use super::Headers;

/// Wrapper used to create Csv Http [`Response`]s,
/// as well as to extract Csv from Http [`Request`] bodies.
///
/// [`Request`]: crate::Request
/// [`Response`]: crate::Response
///
/// # Examples
///
/// ## Creating a Csv Response
///
/// ```
/// use serde_json::json;
/// use rama_http_types::{IntoResponse, response::Csv};
///
/// async fn handler() -> impl IntoResponse {
/// Csv(
/// vec![
/// json!({
/// "name": "john",
/// "age": 30,
/// "is_student": false
/// })
/// ]
/// )
/// }
/// ```
///
/// ## Extracting Csv from a Request
///
/// ```
/// use serde_json::json;
/// use rama_http_types::response::Csv;
///
/// #[derive(Debug, serde::Deserialize)]
/// struct Input {
/// name: String,
/// age: u8,
/// alive: Option<bool>,
/// }
///
/// # fn bury(name: impl AsRef<str>) {}
///
/// async fn handler(Csv(input): Csv<Vec<Input>>) {
/// if !input[0].alive.unwrap_or_default() {
/// bury(&input[0].name);
/// }
/// }
/// ```
pub struct Csv<T>(pub T);

impl<T: fmt::Debug> fmt::Debug for Csv<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Csv").field(&self.0).finish()
}
}

impl<T: Clone> Clone for Csv<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl_deref!(Csv);

impl<T> From<T> for Csv<T> {
fn from(inner: T) -> Self {
Self(inner)
}
}

impl<T> IntoResponse for Csv<T>
where
T: IntoIterator<Item: Serialize> + std::fmt::Debug,
{
fn into_response(self) -> Response {
// Use a small initial capacity of 128 bytes like serde_json::to_vec
// https://docs.rs/serde_json/1.0.82/src/serde_json/ser.rs.html#2189
let mut buf = BytesMut::with_capacity(128).writer();
{
let mut wtr = csv::Writer::from_writer(&mut buf);
let res: Result<Vec<_>, _> = self.0.into_iter().map(|rec| wtr.serialize(rec)).collect();
if let Err(err) = res {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Headers::single(ContentType::text_utf8()),
err.to_string(),
)
.into_response();
}
if let Err(err) = wtr.flush() {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Headers::single(ContentType::text_utf8()),
err.to_string(),
)
.into_response();
}
}

(
[(CONTENT_TYPE, HeaderValue::from_static("text/csv"))],
buf.into_inner().freeze(),
)
.into_response()
}
}

impl<T> TryInto<Body> for Csv<T>
where
T: IntoIterator<Item: Serialize>,
{
type Error = OpaqueError;

fn try_into(self) -> Result<Body, Self::Error> {
// Use a small initial capacity of 128 bytes like serde_json::to_vec
// https://docs.rs/serde_json/1.0.82/src/serde_json/ser.rs.html#2189
let mut buf = BytesMut::with_capacity(128).writer();
{
let mut wtr = csv::Writer::from_writer(&mut buf);
let res: Result<Vec<_>, _> = self.0.into_iter().map(|rec| wtr.serialize(rec)).collect();
if let Err(err) = res {
return Err(OpaqueError::from_std(err));
}
if let Err(err) = wtr.flush() {
return Err(OpaqueError::from_std(err));
}
}

Ok(buf.into_inner().freeze().into())
}
}
4 changes: 4 additions & 0 deletions rama-http-types/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ mod json;
#[doc(inline)]
pub use json::Json;

mod csv;
#[doc(inline)]
pub use csv::Csv;

mod form;
#[doc(inline)]
pub use form::Form;
Expand Down
1 change: 1 addition & 0 deletions rama-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ base64 = { workspace = true }
bitflags = { workspace = true }
bytes = { workspace = true }
const_format = { workspace = true }
csv = { workspace = true }
futures-lite = { workspace = true }
headers = { workspace = true }
http = { workspace = true }
Expand Down
155 changes: 155 additions & 0 deletions rama-http/src/service/web/endpoint/extract/body/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use super::BytesRejection;
use crate::dep::http_body_util::BodyExt;
use crate::service::web::extract::FromRequest;
use crate::utils::macros::{composite_http_rejection, define_http_rejection};
use crate::Request;
use bytes::Buf;

pub use crate::response::Csv;

define_http_rejection! {
#[status = UNSUPPORTED_MEDIA_TYPE]
#[body = "Csv requests must have `Content-Type: text/csv`"]
/// Rejection type for [`Csv`]
/// used if the `Content-Type` header is missing
/// or its value is not `text/csv`.
pub struct InvalidCsvContentType;
}

define_http_rejection! {
#[status = BAD_REQUEST]
#[body = "Failed to deserialize csv payload"]
/// Rejection type used if the [`Csv`]
/// deserialize the payload into the target type.
pub struct FailedToDeserializeCsv(Error);
}

composite_http_rejection! {
/// Rejection used for [`Csv`]
///
/// Contains one variant for each way the [`Csv`] extractor
/// can fail.
pub enum CsvRejection {
InvalidCsvContentType,
FailedToDeserializeCsv,
BytesRejection,
}
}

impl<T> FromRequest for Csv<Vec<T>>
where
T: serde::de::DeserializeOwned + Send + Sync + 'static,
{
type Rejection = CsvRejection;

async fn from_request(req: Request) -> Result<Self, Self::Rejection> {
if !crate::service::web::extract::has_any_content_type(req.headers(), &[&mime::TEXT_CSV]) {
return Err(InvalidCsvContentType.into());
}

let body = req.into_body();
match body.collect().await {
Ok(c) => {
let b = c.to_bytes();
let mut rdr = csv::Reader::from_reader(b.clone().reader());

let out: Result<Vec<T>, _> = rdr
.deserialize()
.map(|rec| {
let record: Result<T, _> = rec;
record
})
.collect();

match out {
Ok(s) => Ok(Self(s)),
Err(err) => Err(FailedToDeserializeCsv::from_err(err).into()),
}
}
Err(err) => Err(BytesRejection::from_err(err).into()),
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::service::web::WebService;
use crate::StatusCode;
use rama_core::{Context, Service};

#[tokio::test]
async fn test_csv() {
#[derive(serde::Deserialize)]
struct Input {
name: String,
age: u8,
alive: Option<bool>,
}

let service = WebService::default().post("/", |Csv(body): Csv<Vec<Input>>| async move {
assert_eq!(body.len(), 2);

assert_eq!(body[0].name, "glen");
assert_eq!(body[0].age, 42);
assert_eq!(body[0].alive, None);

assert_eq!(body[1].name, "adr");
assert_eq!(body[1].age, 40);
assert_eq!(body[1].alive, Some(true));
StatusCode::OK
});

let req = http::Request::builder()
.method(http::Method::POST)
.header(http::header::CONTENT_TYPE, "text/csv; charset=utf-8")
.body("name,age,alive\nglen,42,\nadr,40,true\n".into())
.unwrap();
let resp = service.serve(Context::default(), req).await.unwrap();
println!("debug {:?}", resp);
assert_eq!(resp.status(), StatusCode::OK);
}

#[tokio::test]
async fn test_csv_missing_content_type() {
#[derive(Debug, serde::Deserialize)]
struct Input {
_name: String,
_age: u8,
_alive: Option<bool>,
}

let service = WebService::default()
.post("/", |Csv(_): Csv<Vec<Input>>| async move { StatusCode::OK });

let req = http::Request::builder()
.method(http::Method::POST)
.header(http::header::CONTENT_TYPE, "text/plain")
.body(r#"{"name": "glen", "age": 42}"#.into())
.unwrap();
let resp = service.serve(Context::default(), req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}

#[tokio::test]
async fn test_csv_invalid_body() {
#[derive(Debug, serde::Deserialize)]
struct Input {
_name: String,
_age: u8,
_alive: Option<bool>,
}

let service = WebService::default()
.post("/", |Csv(_): Csv<Vec<Input>>| async move { StatusCode::OK });

let req = http::Request::builder()
.method(http::Method::POST)
.header(http::header::CONTENT_TYPE, "text/csv; charset=utf-8")
// the missing column last line should trigger an error
.body("name,age,alive\nglen,42,\nadr,40\n".into())
.unwrap();
let resp = service.serve(Context::default(), req).await.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}
4 changes: 4 additions & 0 deletions rama-http/src/service/web/endpoint/extract/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ mod json;
#[doc(inline)]
pub use json::*;

mod csv;
#[doc(inline)]
pub use csv::*;

mod form;
#[doc(inline)]
pub use form::*;
Expand Down
Loading

0 comments on commit 8916d9d

Please sign in to comment.