Skip to content

Commit

Permalink
Merge pull request #43 from lucab/ups/update-hyper
Browse files Browse the repository at this point in the history
dkregistry: update to latest hyper and hyper-rustls
  • Loading branch information
steveej authored Oct 12, 2018
2 parents c305948 + 2b72a0f commit bacca3b
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 209 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ exclude = [
base64 = "0.9"
error-chain = { version = "0.12", default-features = false }
futures = "0.1"
hyper = "0.11"
hyper-rustls = "0.12"
hyper = "0.12"
hyper-rustls = "0.14"
http = "0.1"
libflate = "0.1"
log = "0.4"
mime = "0.3"
mockito = { version = "0.13", optional = true }
serde = "1"
serde_derive = "1"
Expand Down
9 changes: 6 additions & 3 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Error chains, types and traits.
use base64;
use http;
use hyper;
use serde_json;
use std::{io, string};

error_chain! {
foreign_links {
UriParse(hyper::error::UriError);
Base64Decode(base64::DecodeError);
HeaderInvalid(hyper::header::InvalidHeaderValue);
HeaderParse(hyper::header::ToStrError);
Hyper(hyper::Error);
Json(serde_json::Error);
Io(io::Error);
Json(serde_json::Error);
UriParse(http::uri::InvalidUri);
Utf8Parse(string::FromUtf8Error);
Base64Decode(base64::DecodeError);
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
extern crate base64;
extern crate futures;
extern crate http;
extern crate hyper;
extern crate hyper_rustls;
extern crate mime;
extern crate serde;
extern crate serde_json;
extern crate tokio_core;
Expand Down
5 changes: 1 addition & 4 deletions src/mediatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use errors::*;
use futures;
use hyper::{header, mime};
use mime;
use strum::EnumProperty;

pub type FutureMediaType = Box<futures::Future<Item = Option<MediaTypes>, Error = Error>>;
Expand Down Expand Up @@ -77,7 +77,4 @@ impl MediaTypes {
}
}
}
pub fn to_qitem(&self) -> header::QualityItem<mime::Mime> {
header::qitem(self.to_mime())
}
}
94 changes: 47 additions & 47 deletions src/v2/auth.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use base64;
use futures::Stream;
use hyper::header;
use v2::*;

/// Convenience alias for future `TokenAuth` result.
Expand All @@ -25,21 +27,18 @@ impl Client {
let url = try!(hyper::Uri::from_str(
(self.base_url.clone() + "/v2/").as_str()
));
let req = self.new_request(hyper::Method::Get, url);
let req = self.new_request(hyper::Method::GET, url);
let freq = self.hclient.request(req);
let www_auth = freq
.and_then(|r| Ok(r))
.from_err()
.and_then(|r| {
let a = r
.headers()
.get_raw("www-authenticate")
.ok_or(hyper::Error::Header)?
.one()
.ok_or(hyper::Error::Header)?;
let chal = try!(String::from_utf8(a.to_vec()));
.get(hyper::header::WWW_AUTHENTICATE)
.ok_or(Error::from("get_token: missing Auth header"))?;
let chal = String::from_utf8(a.as_bytes().to_vec())?;
Ok(chal)
}).from_err::<Error>()
.and_then(move |hdr| {
}).and_then(move |hdr| {
let mut auth_ep = "".to_owned();
let mut service = None;
for item in hdr.trim_left_matches("Bearer ").split(',') {
Expand All @@ -58,7 +57,7 @@ impl Client {
}
Ok(auth_ep)
});
return Ok(Box::new(www_auth));
Ok(Box::new(www_auth))
}

/// Set the token to be used for further registry requests.
Expand All @@ -85,65 +84,66 @@ impl Client {
trace!("Token endpoint: {}", auth_ep);
hyper::Uri::from_str(auth_ep.as_str()).map_err(|e| e.into())
}).and_then(move |u| {
let mut auth_req = client::Request::new(hyper::Method::Get, u);
let mut auth_req = hyper::Request::default();
*auth_req.method_mut() = hyper::Method::GET;
*auth_req.uri_mut() = u;
if let Some(c) = creds {
let hdr = hyper::header::Authorization(hyper::header::Basic {
username: c.0,
password: Some(c.1),
});
auth_req.headers_mut().set(hdr);
let plain = format!("{}:{}", c.0, c.1);
let basic = format!("Basic {}", base64::encode(&plain));
auth_req.headers_mut().append(
header::AUTHORIZATION,
header::HeaderValue::from_str(&basic).unwrap(),
);
};
subclient.request(auth_req).map_err(|e| e.into())
}).and_then(|r| {
trace!("Got status {}", r.status());
if r.status() != hyper::StatusCode::Ok {
Err(Error::from(hyper::Error::Status))
} else {
Ok(r)
let status = r.status();
trace!("Got status {}", status);
match status {
hyper::StatusCode::OK => Ok(r),
_ => Err(format!("login: wrong HTTP status '{}'", status).into()),
}
}).and_then(|r| {
r.body()
.fold(Vec::new(), |mut v, chunk| {
v.extend(&chunk[..]);
futures::future::ok::<_, hyper::Error>(v)
}).map_err(|e| e.into())
r.into_body()
.concat2()
.map_err(|e| format!("login: failed to fetch the whole body: {}", e).into())
}).and_then(|body| {
let s = String::from_utf8(body)?;
let ta = serde_json::from_slice(s.as_bytes()).map_err(|e| e.into());
if let Ok(_) = ta {
trace!("Got token");
};
ta
let s = String::from_utf8(body.into_bytes().to_vec())?;
serde_json::from_slice(s.as_bytes()).map_err(|e| e.into())
}).inspect(|_| {
trace!("Got token");
});
return Ok(Box::new(auth));
Ok(Box::new(auth))
}

/// Check whether the client is authenticated with the registry.
pub fn is_auth(&self, token: Option<&str>) -> Result<FutureBool> {
let url = try!(hyper::Uri::from_str(
(self.base_url.clone() + "/v2/").as_str()
));
let mut req = self.new_request(hyper::Method::Get, url.clone());
let mut req = self.new_request(hyper::Method::GET, url.clone());
if let Some(t) = token {
req.headers_mut()
.set(hyper::header::Authorization(hyper::header::Bearer {
token: t.to_owned(),
}));
let bearer = format!("Bearer {}", t);
req.headers_mut().append(
header::AUTHORIZATION,
header::HeaderValue::from_str(&bearer).unwrap(),
);
};

let freq = self.hclient.request(req);
let fres = freq
.map(move |r| {
.from_err()
.inspect(move |_| {
trace!("GET {:?}", url);
r
}).and_then(move |r| {
trace!("Got status {}", r.status());
match r.status() {
hyper::StatusCode::Ok => Ok(true),
hyper::StatusCode::Unauthorized => Ok(false),
_ => Err(hyper::error::Error::Status),
let status = r.status();
trace!("Got status {}", status);
match status {
hyper::StatusCode::OK => Ok(true),
hyper::StatusCode::UNAUTHORIZED => Ok(false),
_ => Err(format!("is_auth: wrong HTTP status '{}'", status).into()),
}
}).from_err();
return Ok(Box::new(fres));
});
Ok(Box::new(fres))
}
}
75 changes: 41 additions & 34 deletions src/v2/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,75 +10,82 @@ impl Client {
/// Check if a blob exists.
pub fn has_blob(&self, name: &str, digest: &str) -> Result<FutureBool> {
let url = {
let ep = format!("{}/v2/{}/blobs/{}", self.base_url.clone(), name, digest);
try!(hyper::Uri::from_str(ep.as_str()))
let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
hyper::Uri::from_str(ep.as_str())?
};
let req = self.new_request(hyper::Method::Head, url.clone());
let req = self.new_request(hyper::Method::HEAD, url.clone());
let freq = self.hclient.request(req);
let fres = freq
.map(move |r| {
.from_err()
.inspect(move |_| {
trace!("HEAD {:?}", url);
r
}).and_then(|r| {
trace!("Blob check result: {:?}", r.status());
match r.status() {
StatusCode::MovedPermanently
| StatusCode::TemporaryRedirect
| StatusCode::Found
| StatusCode::Ok => Ok(true),
StatusCode::MOVED_PERMANENTLY
| StatusCode::TEMPORARY_REDIRECT
| StatusCode::FOUND
| StatusCode::OK => Ok(true),
_ => Ok(false),
}
}).map_err(|e| e.into());
return Ok(Box::new(fres));
});
Ok(Box::new(fres))
}

/// Retrieve blob.
pub fn get_blob(&self, name: &str, digest: &str) -> Result<FutureBlob> {
let cl = self.clone();
let url = {
let ep = format!("{}/v2/{}/blobs/{}", self.base_url.clone(), name, digest);
try!(hyper::Uri::from_str(ep.as_str()))
hyper::Uri::from_str(ep.as_str())?
};
let req = self.new_request(hyper::Method::Get, url.clone());
let req = self.new_request(hyper::Method::GET, url.clone());
let freq = self.hclient.request(req);
let fres = freq
.map(move |r| {
.from_err()
.inspect(move |_| {
trace!("GET {:?}", url);
r
}).and_then(move |r| {
match r.status() {
StatusCode::MovedPermanently
| StatusCode::TemporaryRedirect
| StatusCode::Found => {
StatusCode::MOVED_PERMANENTLY
| StatusCode::TEMPORARY_REDIRECT
| StatusCode::FOUND => {
trace!("Got moved status {:?}", r.status());
}
_ => return Either::A(future::ok(r)),
};
let redirect: Option<String> = match r.headers().get_raw("Location") {
None => return Either::A(future::result(Err(hyper::error::Error::Status))),
Some(ct) => {
trace!("Got Location header {:?}", ct);
ct.clone()
.one()
.and_then(|h| String::from_utf8(h.to_vec()).ok())
let redirect: Option<String> = match r.headers().get("Location") {
None => {
return Either::A(future::err(Error::from(
"get_blob: missing location header",
)))
}
Some(loc) => {
trace!("Got Location header {:?}", loc);
String::from_utf8(loc.as_bytes().to_vec()).ok()
}
};
if let Some(u) = redirect {
let new_url = match hyper::Uri::from_str(u.as_str()) {
Ok(u) => u,
Err(e) => return Either::A(future::result(Err(hyper::Error::Uri(e)))),
_ => {
return Either::A(future::err(
format!("get_blob: wrong URL '{}'", u).into(),
))
}
};
trace!("Following redirection to {}", new_url);
let req = client::Request::new(hyper::Method::Get, new_url);
return Either::B(cl.hclient.request(req));
let mut req = hyper::Request::default();
*req.method_mut() = hyper::Method::GET;
*req.uri_mut() = new_url;
return Either::B(cl.hclient.request(req).from_err());
};
Either::A(future::ok(r))
}).and_then(|r| {
r.body().fold(Vec::new(), |mut v, chunk| {
v.extend(&chunk[..]);
futures::future::ok::<_, hyper::Error>(v)
})
}).from_err();
return Ok(Box::new(fres));
r.into_body()
.concat2()
.map_err(|e| format!("get_blob: failed to fetch the whole body: {}", e).into())
}).and_then(|body| Ok(body.into_bytes().to_vec()));
Ok(Box::new(fres))
}
}
42 changes: 22 additions & 20 deletions src/v2/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use futures::Stream;
use v2::*;
use errors::{Error, Result};
use futures::{self, Future, Stream};
use hyper;
use serde_json;
use std::str::FromStr;
use v2;

/// Convenience alias for a stream of `String` repos.
pub type StreamCatalog = Box<futures::Stream<Item = String, Error = Error>>;
Expand All @@ -9,7 +13,7 @@ struct Catalog {
pub repositories: Vec<String>,
}

impl Client {
impl v2::Client {
pub fn get_catalog(&self, paginate: Option<u32>) -> Result<StreamCatalog> {
let url = {
let mut s = self.base_url.clone() + "/v2/_catalog";
Expand All @@ -18,27 +22,25 @@ impl Client {
};
try!(hyper::Uri::from_str(s.as_str()))
};
let req = self.new_request(hyper::Method::Get, url);
let req = self.new_request(hyper::Method::GET, url);
let freq = self.hclient.request(req);
let fres = freq
.and_then(|resp| {
if resp.status() != hyper::StatusCode::Ok {
return Err(hyper::Error::Status);
};
Ok(resp)
.from_err()
.and_then(|r| {
let status = r.status();
trace!("Got status: {:?}", status);
match status {
hyper::StatusCode::OK => Ok(r),
_ => Err(format!("get_catalog: wrong HTTP status '{}'", status).into()),
}
}).and_then(|r| {
r.body().fold(Vec::new(), |mut v, chunk| {
v.extend(&chunk[..]);
futures::future::ok::<_, hyper::Error>(v)
r.into_body().concat2().map_err(|e| {
format!("get_catalog: failed to fetch the whole body: {}", e).into()
})
}).map_err(|e| e.into())
.and_then(|chunks| String::from_utf8(chunks).map_err(|e| e.into()))
.and_then(|body| -> Result<Catalog> {
serde_json::from_slice(body.as_bytes()).map_err(|e| e.into())
}).map(|cat| -> Vec<Result<String>> {
cat.repositories.into_iter().map(|r| Ok(r)).collect()
}).map(|rs| futures::stream::iter(rs.into_iter()))
}).and_then(|body| -> Result<Catalog> {
serde_json::from_slice(&body.into_bytes()).map_err(|e| e.into())
}).map(|cat| futures::stream::iter_ok(cat.repositories.into_iter()))
.flatten_stream();
return Ok(Box::new(fres));
Ok(Box::new(fres))
}
}
Loading

0 comments on commit bacca3b

Please sign in to comment.