From 2b72a0f91423a6ada8b5257131a53669975c96ef Mon Sep 17 00:00:00 2001 From: Luca Bruno Date: Thu, 11 Oct 2018 08:42:23 +0000 Subject: [PATCH] dkregstry: update to latest hyper and hyper-rustls --- Cargo.toml | 6 ++- src/errors.rs | 9 ++-- src/lib.rs | 2 + src/mediatypes.rs | 5 +-- src/v2/auth.rs | 94 +++++++++++++++++++++--------------------- src/v2/blobs.rs | 75 ++++++++++++++++++--------------- src/v2/catalog.rs | 42 ++++++++++--------- src/v2/config.rs | 13 +++--- src/v2/manifest/mod.rs | 89 ++++++++++++++++++++------------------- src/v2/mod.rs | 67 ++++++++++++++++-------------- src/v2/tags.rs | 42 ++++++++++--------- 11 files changed, 235 insertions(+), 209 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ce48adca..c0ccdafe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,12 @@ exclude = [ base64 = "0.9" error-chain = { version = "0.12", default-features = false } futures = "0.1" -hyper = "0.11" -hyper-rustls = "0.12" +hyper = "0.12" +hyper-rustls = "0.14" +http = "0.1" libflate = "0.1" log = "0.4" +mime = "0.3" mockito = { version = "0.13", optional = true } serde = "1" serde_derive = "1" diff --git a/src/errors.rs b/src/errors.rs index 972d9cb2..61d8a81d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,17 +1,20 @@ //! Error chains, types and traits. use base64; +use http; use hyper; use serde_json; use std::{io, string}; error_chain! { foreign_links { - UriParse(hyper::error::UriError); + Base64Decode(base64::DecodeError); + HeaderInvalid(hyper::header::InvalidHeaderValue); + HeaderParse(hyper::header::ToStrError); Hyper(hyper::Error); - Json(serde_json::Error); Io(io::Error); + Json(serde_json::Error); + UriParse(http::uri::InvalidUri); Utf8Parse(string::FromUtf8Error); - Base64Decode(base64::DecodeError); } } diff --git a/src/lib.rs b/src/lib.rs index 91a2aae0..3dec8fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,8 +35,10 @@ extern crate base64; extern crate futures; +extern crate http; extern crate hyper; extern crate hyper_rustls; +extern crate mime; extern crate serde; extern crate serde_json; extern crate tokio_core; diff --git a/src/mediatypes.rs b/src/mediatypes.rs index d63f8368..cc1998b8 100644 --- a/src/mediatypes.rs +++ b/src/mediatypes.rs @@ -2,7 +2,7 @@ use errors::*; use futures; -use hyper::{header, mime}; +use mime; use strum::EnumProperty; pub type FutureMediaType = Box, Error = Error>>; @@ -77,7 +77,4 @@ impl MediaTypes { } } } - pub fn to_qitem(&self) -> header::QualityItem { - header::qitem(self.to_mime()) - } } diff --git a/src/v2/auth.rs b/src/v2/auth.rs index cf559fc1..b1afa154 100644 --- a/src/v2/auth.rs +++ b/src/v2/auth.rs @@ -1,4 +1,6 @@ +use base64; use futures::Stream; +use hyper::header; use v2::*; /// Convenience alias for future `TokenAuth` result. @@ -25,21 +27,18 @@ impl Client { let url = try!(hyper::Uri::from_str( (self.base_url.clone() + "/v2/").as_str() )); - let req = self.new_request(hyper::Method::Get, url); + let req = self.new_request(hyper::Method::GET, url); let freq = self.hclient.request(req); let www_auth = freq - .and_then(|r| Ok(r)) + .from_err() .and_then(|r| { let a = r .headers() - .get_raw("www-authenticate") - .ok_or(hyper::Error::Header)? - .one() - .ok_or(hyper::Error::Header)?; - let chal = try!(String::from_utf8(a.to_vec())); + .get(hyper::header::WWW_AUTHENTICATE) + .ok_or(Error::from("get_token: missing Auth header"))?; + let chal = String::from_utf8(a.as_bytes().to_vec())?; Ok(chal) - }).from_err::() - .and_then(move |hdr| { + }).and_then(move |hdr| { let mut auth_ep = "".to_owned(); let mut service = None; for item in hdr.trim_left_matches("Bearer ").split(',') { @@ -58,7 +57,7 @@ impl Client { } Ok(auth_ep) }); - return Ok(Box::new(www_auth)); + Ok(Box::new(www_auth)) } /// Set the token to be used for further registry requests. @@ -85,37 +84,36 @@ impl Client { trace!("Token endpoint: {}", auth_ep); hyper::Uri::from_str(auth_ep.as_str()).map_err(|e| e.into()) }).and_then(move |u| { - let mut auth_req = client::Request::new(hyper::Method::Get, u); + let mut auth_req = hyper::Request::default(); + *auth_req.method_mut() = hyper::Method::GET; + *auth_req.uri_mut() = u; if let Some(c) = creds { - let hdr = hyper::header::Authorization(hyper::header::Basic { - username: c.0, - password: Some(c.1), - }); - auth_req.headers_mut().set(hdr); + let plain = format!("{}:{}", c.0, c.1); + let basic = format!("Basic {}", base64::encode(&plain)); + auth_req.headers_mut().append( + header::AUTHORIZATION, + header::HeaderValue::from_str(&basic).unwrap(), + ); }; subclient.request(auth_req).map_err(|e| e.into()) }).and_then(|r| { - trace!("Got status {}", r.status()); - if r.status() != hyper::StatusCode::Ok { - Err(Error::from(hyper::Error::Status)) - } else { - Ok(r) + let status = r.status(); + trace!("Got status {}", status); + match status { + hyper::StatusCode::OK => Ok(r), + _ => Err(format!("login: wrong HTTP status '{}'", status).into()), } }).and_then(|r| { - r.body() - .fold(Vec::new(), |mut v, chunk| { - v.extend(&chunk[..]); - futures::future::ok::<_, hyper::Error>(v) - }).map_err(|e| e.into()) + r.into_body() + .concat2() + .map_err(|e| format!("login: failed to fetch the whole body: {}", e).into()) }).and_then(|body| { - let s = String::from_utf8(body)?; - let ta = serde_json::from_slice(s.as_bytes()).map_err(|e| e.into()); - if let Ok(_) = ta { - trace!("Got token"); - }; - ta + let s = String::from_utf8(body.into_bytes().to_vec())?; + serde_json::from_slice(s.as_bytes()).map_err(|e| e.into()) + }).inspect(|_| { + trace!("Got token"); }); - return Ok(Box::new(auth)); + Ok(Box::new(auth)) } /// Check whether the client is authenticated with the registry. @@ -123,27 +121,29 @@ impl Client { let url = try!(hyper::Uri::from_str( (self.base_url.clone() + "/v2/").as_str() )); - let mut req = self.new_request(hyper::Method::Get, url.clone()); + let mut req = self.new_request(hyper::Method::GET, url.clone()); if let Some(t) = token { - req.headers_mut() - .set(hyper::header::Authorization(hyper::header::Bearer { - token: t.to_owned(), - })); + let bearer = format!("Bearer {}", t); + req.headers_mut().append( + header::AUTHORIZATION, + header::HeaderValue::from_str(&bearer).unwrap(), + ); }; let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("GET {:?}", url); - r }).and_then(move |r| { - trace!("Got status {}", r.status()); - match r.status() { - hyper::StatusCode::Ok => Ok(true), - hyper::StatusCode::Unauthorized => Ok(false), - _ => Err(hyper::error::Error::Status), + let status = r.status(); + trace!("Got status {}", status); + match status { + hyper::StatusCode::OK => Ok(true), + hyper::StatusCode::UNAUTHORIZED => Ok(false), + _ => Err(format!("is_auth: wrong HTTP status '{}'", status).into()), } - }).from_err(); - return Ok(Box::new(fres)); + }); + Ok(Box::new(fres)) } } diff --git a/src/v2/blobs.rs b/src/v2/blobs.rs index 5f48644a..7f3d0af6 100644 --- a/src/v2/blobs.rs +++ b/src/v2/blobs.rs @@ -10,26 +10,26 @@ impl Client { /// Check if a blob exists. pub fn has_blob(&self, name: &str, digest: &str) -> Result { let url = { - let ep = format!("{}/v2/{}/blobs/{}", self.base_url.clone(), name, digest); - try!(hyper::Uri::from_str(ep.as_str())) + let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest); + hyper::Uri::from_str(ep.as_str())? }; - let req = self.new_request(hyper::Method::Head, url.clone()); + let req = self.new_request(hyper::Method::HEAD, url.clone()); let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("HEAD {:?}", url); - r }).and_then(|r| { trace!("Blob check result: {:?}", r.status()); match r.status() { - StatusCode::MovedPermanently - | StatusCode::TemporaryRedirect - | StatusCode::Found - | StatusCode::Ok => Ok(true), + StatusCode::MOVED_PERMANENTLY + | StatusCode::TEMPORARY_REDIRECT + | StatusCode::FOUND + | StatusCode::OK => Ok(true), _ => Ok(false), } - }).map_err(|e| e.into()); - return Ok(Box::new(fres)); + }); + Ok(Box::new(fres)) } /// Retrieve blob. @@ -37,48 +37,55 @@ impl Client { let cl = self.clone(); let url = { let ep = format!("{}/v2/{}/blobs/{}", self.base_url.clone(), name, digest); - try!(hyper::Uri::from_str(ep.as_str())) + hyper::Uri::from_str(ep.as_str())? }; - let req = self.new_request(hyper::Method::Get, url.clone()); + let req = self.new_request(hyper::Method::GET, url.clone()); let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("GET {:?}", url); - r }).and_then(move |r| { match r.status() { - StatusCode::MovedPermanently - | StatusCode::TemporaryRedirect - | StatusCode::Found => { + StatusCode::MOVED_PERMANENTLY + | StatusCode::TEMPORARY_REDIRECT + | StatusCode::FOUND => { trace!("Got moved status {:?}", r.status()); } _ => return Either::A(future::ok(r)), }; - let redirect: Option = match r.headers().get_raw("Location") { - None => return Either::A(future::result(Err(hyper::error::Error::Status))), - Some(ct) => { - trace!("Got Location header {:?}", ct); - ct.clone() - .one() - .and_then(|h| String::from_utf8(h.to_vec()).ok()) + let redirect: Option = match r.headers().get("Location") { + None => { + return Either::A(future::err(Error::from( + "get_blob: missing location header", + ))) + } + Some(loc) => { + trace!("Got Location header {:?}", loc); + String::from_utf8(loc.as_bytes().to_vec()).ok() } }; if let Some(u) = redirect { let new_url = match hyper::Uri::from_str(u.as_str()) { Ok(u) => u, - Err(e) => return Either::A(future::result(Err(hyper::Error::Uri(e)))), + _ => { + return Either::A(future::err( + format!("get_blob: wrong URL '{}'", u).into(), + )) + } }; trace!("Following redirection to {}", new_url); - let req = client::Request::new(hyper::Method::Get, new_url); - return Either::B(cl.hclient.request(req)); + let mut req = hyper::Request::default(); + *req.method_mut() = hyper::Method::GET; + *req.uri_mut() = new_url; + return Either::B(cl.hclient.request(req).from_err()); }; Either::A(future::ok(r)) }).and_then(|r| { - r.body().fold(Vec::new(), |mut v, chunk| { - v.extend(&chunk[..]); - futures::future::ok::<_, hyper::Error>(v) - }) - }).from_err(); - return Ok(Box::new(fres)); + r.into_body() + .concat2() + .map_err(|e| format!("get_blob: failed to fetch the whole body: {}", e).into()) + }).and_then(|body| Ok(body.into_bytes().to_vec())); + Ok(Box::new(fres)) } } diff --git a/src/v2/catalog.rs b/src/v2/catalog.rs index bf0b1ac2..c3ab35be 100644 --- a/src/v2/catalog.rs +++ b/src/v2/catalog.rs @@ -1,5 +1,9 @@ -use futures::Stream; -use v2::*; +use errors::{Error, Result}; +use futures::{self, Future, Stream}; +use hyper; +use serde_json; +use std::str::FromStr; +use v2; /// Convenience alias for a stream of `String` repos. pub type StreamCatalog = Box>; @@ -9,7 +13,7 @@ struct Catalog { pub repositories: Vec, } -impl Client { +impl v2::Client { pub fn get_catalog(&self, paginate: Option) -> Result { let url = { let mut s = self.base_url.clone() + "/v2/_catalog"; @@ -18,27 +22,25 @@ impl Client { }; try!(hyper::Uri::from_str(s.as_str())) }; - let req = self.new_request(hyper::Method::Get, url); + let req = self.new_request(hyper::Method::GET, url); let freq = self.hclient.request(req); let fres = freq - .and_then(|resp| { - if resp.status() != hyper::StatusCode::Ok { - return Err(hyper::Error::Status); - }; - Ok(resp) + .from_err() + .and_then(|r| { + let status = r.status(); + trace!("Got status: {:?}", status); + match status { + hyper::StatusCode::OK => Ok(r), + _ => Err(format!("get_catalog: wrong HTTP status '{}'", status).into()), + } }).and_then(|r| { - r.body().fold(Vec::new(), |mut v, chunk| { - v.extend(&chunk[..]); - futures::future::ok::<_, hyper::Error>(v) + r.into_body().concat2().map_err(|e| { + format!("get_catalog: failed to fetch the whole body: {}", e).into() }) - }).map_err(|e| e.into()) - .and_then(|chunks| String::from_utf8(chunks).map_err(|e| e.into())) - .and_then(|body| -> Result { - serde_json::from_slice(body.as_bytes()).map_err(|e| e.into()) - }).map(|cat| -> Vec> { - cat.repositories.into_iter().map(|r| Ok(r)).collect() - }).map(|rs| futures::stream::iter(rs.into_iter())) + }).and_then(|body| -> Result { + serde_json::from_slice(&body.into_bytes()).map_err(|e| e.into()) + }).map(|cat| futures::stream::iter_ok(cat.repositories.into_iter())) .flatten_stream(); - return Ok(Box::new(fres)); + Ok(Box::new(fres)) } } diff --git a/src/v2/config.rs b/src/v2/config.rs index e11f3bc2..0ff52975 100644 --- a/src/v2/config.rs +++ b/src/v2/config.rs @@ -1,9 +1,10 @@ +use hyper::client; use v2::*; /// Configuration for a `Client`. #[derive(Debug)] pub struct Config { - config: client::Config, + config: client::Client, hyper::Body>, handle: reactor::Handle, index: String, insecure_registry: bool, @@ -15,9 +16,10 @@ pub struct Config { impl Config { /// Initialize `Config` with default values. pub fn default(handle: &reactor::Handle) -> Self { + let dns_threads = 4; Self { - config: hyper::client::Client::configure() - .connector(hyper_rustls::HttpsConnector::new(4, handle)), + config: hyper::client::Client::builder() + .build(hyper_rustls::HttpsConnector::new(dns_threads)), handle: handle.clone(), index: "registry-1.docker.io".into(), insecure_registry: false, @@ -68,7 +70,6 @@ impl Config { /// Return a `Client` to interact with a v2 registry. pub fn build(self) -> Result { - let hclient = self.config.build(&self.handle); let base = match self.insecure_registry { false => "https://".to_string() + &self.index, true => "http://".to_string() + &self.index, @@ -86,11 +87,11 @@ impl Config { let c = Client { base_url: base, credentials: creds, - hclient: hclient, + hclient: self.config, index: self.index, user_agent: self.user_agent, token: None, }; - return Ok(c); + Ok(c) } } diff --git a/src/v2/manifest/mod.rs b/src/v2/manifest/mod.rs index bdcab493..9dfd7e8c 100644 --- a/src/v2/manifest/mod.rs +++ b/src/v2/manifest/mod.rs @@ -3,9 +3,9 @@ use mediatypes; use v2::*; use futures::Stream; -use hyper::header::{Accept, QualityItem}; -use hyper::mime; +use hyper::header; use hyper::StatusCode; +use mime; mod manifest_schema1; pub use self::manifest_schema1::*; @@ -26,29 +26,30 @@ impl Client { reference ))); let req = { - let accept_types = Accept(vec![mediatypes::MediaTypes::ManifestV2S2.to_qitem()]); - let mut r = self.new_request(hyper::Method::Get, url.clone()); - r.headers_mut().set(accept_types); + let mut r = self.new_request(hyper::Method::GET, url.clone()); + let mtype = mediatypes::MediaTypes::ManifestV2S2.to_string(); + r.headers_mut() + .append(header::ACCEPT, header::HeaderValue::from_str(&mtype)?); r }; let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("GET {:?}", url); - r - }).and_then(move |r| { - trace!("Got status: {:?}", r.status()); - if r.status() != hyper::StatusCode::Ok { - return Err(hyper::Error::Status); - }; - Ok(r) - }).and_then(move |r| { - r.body().fold(Vec::new(), |mut v, chunk| { - v.extend(&chunk[..]); - futures::future::ok::<_, hyper::Error>(v) + }).and_then(|r| { + let status = r.status(); + trace!("Got status: {:?}", status); + match status { + hyper::StatusCode::OK => Ok(r), + _ => Err(format!("get_manifest: wrong HTTP status '{}'", status).into()), + } + }).and_then(|r| { + r.into_body().concat2().map_err(|e| { + format!("get_manifest: failed to fetch the whole body: {}", e).into() }) - }).from_err(); - return Ok(Box::new(fres)); + }).and_then(|body| Ok(body.into_bytes().to_vec())); + Ok(Box::new(fres)) } /// Check if an image manifest exists. @@ -71,48 +72,52 @@ impl Client { try!(hyper::Uri::from_str(ep.as_str())) }; let accept_types = match mediatypes { - None => Accept(vec![mediatypes::MediaTypes::ManifestV2S2.to_qitem()]), - Some(ref v) => Accept(try!(to_mimes(v))), + None => vec![mediatypes::MediaTypes::ManifestV2S2.to_mime()], + Some(ref v) => try!(to_mimes(v)), }; let req = { - let mut r = self.new_request(hyper::Method::Head, url.clone()); - r.headers_mut().set(accept_types); + let mut r = self.new_request(hyper::Method::HEAD, url.clone()); + for v in accept_types { + let _ = header::HeaderValue::from_str(&v.to_string()) + .map(|hval| r.headers_mut().append(hyper::header::ACCEPT, hval)); + } r }; let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("HEAD {:?}", url); - r }).and_then(|r| { + let status = r.status(); let mut ct = None; - let hdr = r.headers().get::(); - if let Some(h) = hdr { - ct = mediatypes::MediaTypes::from_mime(h).ok(); - }; + if let Some(h) = r.headers().get(header::CONTENT_TYPE) { + if let Ok(s) = h.to_str() { + ct = mediatypes::MediaTypes::from_str(s).ok(); + } + } trace!("Manifest check result: {:?}", r.status()); - let res = match r.status() { - StatusCode::MovedPermanently - | StatusCode::TemporaryRedirect - | StatusCode::Found - | StatusCode::Ok => ct, - StatusCode::NotFound => None, - _ => return Err(hyper::Error::Status), + let res = match status { + StatusCode::MOVED_PERMANENTLY + | StatusCode::TEMPORARY_REDIRECT + | StatusCode::FOUND + | StatusCode::OK => ct, + StatusCode::NOT_FOUND => None, + _ => return Err(format!("has_manifest: wrong HTTP status '{}'", status).into()), }; Ok(res) - }).from_err(); - return Ok(Box::new(fres)); + }); + Ok(Box::new(fres)) } } -fn to_mimes(v: &[&str]) -> Result>> { - let res: Vec>; - res = v +fn to_mimes(v: &[&str]) -> Result> { + let res = v .iter() .filter_map(|x| { let mtype = mediatypes::MediaTypes::from_str(x); match mtype { - Ok(m) => Some(m.to_qitem()), + Ok(m) => Some(m.to_mime()), _ => None, } }).collect(); diff --git a/src/v2/mod.rs b/src/v2/mod.rs index a09affa6..089c3473 100644 --- a/src/v2/mod.rs +++ b/src/v2/mod.rs @@ -31,7 +31,7 @@ use super::errors::*; use futures; -use hyper::{self, client}; +use hyper::{self, client, header}; use hyper_rustls; use serde_json; use tokio_core::reactor; @@ -61,16 +61,16 @@ pub use self::blobs::FutureBlob; pub struct Client { base_url: String, credentials: Option<(String, String)>, - hclient: client::Client, + hclient: client::Client>, index: String, user_agent: Option, token: Option, } -/// Convenience alias for future boolean result. +/// Convenience alias for a future boolean result. pub type FutureBool = Box>; -/// Convenience alias for future manifest blob. +/// Convenience alias for a future manifest blob. pub type FutureManifest = Box, Error = Error>>; impl Client { @@ -78,21 +78,29 @@ impl Client { Config::default(handle) } - fn new_request(&self, method: hyper::Method, url: hyper::Uri) -> hyper::client::Request { - let mut req = client::Request::new(method, url); - let host = hyper::header::Host::new(self.index.clone(), None); - req.headers_mut().set(host); + fn new_request(&self, method: hyper::Method, url: hyper::Uri) -> hyper::Request { + // TODO(lucab): get rid of all unwraps here. + let mut req = hyper::Request::default(); + *req.method_mut() = method; + *req.uri_mut() = url; + req.headers_mut().append( + header::HOST, + header::HeaderValue::from_str(&self.index).unwrap(), + ); if let Some(ref t) = self.token { - req.headers_mut() - .set(hyper::header::Authorization(hyper::header::Bearer { - token: t.to_owned(), - })); + let bearer = format!("Bearer {}", t); + req.headers_mut().append( + header::AUTHORIZATION, + header::HeaderValue::from_str(&bearer).unwrap(), + ); }; if let Some(ref ua) = self.user_agent { - req.headers_mut() - .set(hyper::header::UserAgent::new(ua.to_owned())); + req.headers_mut().append( + header::USER_AGENT, + header::HeaderValue::from_str(ua).unwrap(), + ); }; - return req; + req } pub fn is_v2_supported(&self) -> Result { @@ -102,26 +110,23 @@ impl Client { let url = try!(hyper::Uri::from_str( (self.base_url.clone() + "/v2/").as_str() )); - let req = self.new_request(hyper::Method::Get, url.clone()); + let req = self.new_request(hyper::Method::GET, url.clone()); let freq = self.hclient.request(req); let fres = freq - .map(move |r| { + .from_err() + .inspect(move |_| { trace!("GET {:?}", url); - r - }).and_then( - move |r| match (r.status(), r.headers().get_raw(api_header)) { - (hyper::StatusCode::Ok, Some(x)) => Ok(x == api_version), - (hyper::StatusCode::Unauthorized, Some(x)) => Ok(x == api_version), - (s, v) => { - trace!("Got status {}, header version {:?}", s, v); - Ok(false) - } - }, - ).and_then(|b| { + }).and_then(move |r| match (r.status(), r.headers().get(api_header)) { + (hyper::StatusCode::OK, Some(x)) => Ok(x == api_version), + (hyper::StatusCode::UNAUTHORIZED, Some(x)) => Ok(x == api_version), + (s, v) => { + trace!("Got status {}, header version {:?}", s, v); + Ok(false) + } + }).inspect(|b| { trace!("v2 API supported: {}", b); - Ok(b) - }).from_err(); - return Ok(Box::new(fres)); + }); + Ok(Box::new(fres)) } } diff --git a/src/v2/tags.rs b/src/v2/tags.rs index 01db1ff1..1b0189e9 100644 --- a/src/v2/tags.rs +++ b/src/v2/tags.rs @@ -1,4 +1,5 @@ -use futures::Stream; +use futures::{self, Stream}; +use hyper::{self, header}; use v2::*; /// Convenience alias for a stream of `String` tags. @@ -20,32 +21,33 @@ impl Client { }; try!(hyper::Uri::from_str(s.as_str())) }; - let req = self.new_request(hyper::Method::Get, url); + let req = self.new_request(hyper::Method::GET, url); let freq = self.hclient.request(req); let fres = freq + .from_err() .and_then(|r| { - if r.status() != hyper::StatusCode::Ok { - return Err(hyper::Error::Status); + let status = r.status(); + if status != hyper::StatusCode::OK { + return Err(format!("get_tags: wrong HTTP status '{}", status).into()); }; - let ok = match r.headers().get_raw("Content-type") { - None => false, - Some(ct) => ct.iter().any(|v| v == b"application/json"), - }; - if !ok { - return Err(hyper::Error::Header); + { + let ct_hdr = r.headers().get(header::CONTENT_TYPE); + let ok = match ct_hdr { + None => false, + Some(ct) => ct.to_str()?.starts_with("application/json"), + }; + if !ok { + return Err(format!("get_tags: wrong content type '{:?}'", ct_hdr).into()); + } } Ok(r) }).and_then(|r| { - r.body().fold(Vec::new(), |mut v, chunk| { - v.extend(&chunk[..]); - futures::future::ok::<_, hyper::Error>(v) - }) - }).from_err() - .and_then(|chunks| String::from_utf8(chunks).map_err(|e| e.into())) - .and_then(|body| -> Result { - serde_json::from_slice(body.as_bytes()).map_err(|e| e.into()) - }).map(|ts| -> Vec> { ts.tags.into_iter().map(|r| Ok(r)).collect() }) - .map(|rs| futures::stream::iter(rs.into_iter())) + r.into_body() + .concat2() + .map_err(|e| format!("get_tags: failed to fetch the whole body: {}", e).into()) + }).and_then(|body| -> Result { + serde_json::from_slice(&body.into_bytes()).map_err(|e| e.into()) + }).map(|ts| futures::stream::iter_ok(ts.tags.into_iter())) .flatten_stream(); Ok(Box::new(fres)) }