Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: icx-proxy panic when decoding a response to http_request #221

Merged
merged 2 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions ic-utils/src/interfaces/http_request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{call::SyncCall, canister::CanisterBuilder, Canister};
use candid::{parser::value::IDLValue, CandidType, Deserialize};
use candid::{parser::value::IDLValue, CandidType, Deserialize, Func, Nat};
use ic_agent::{export::Principal, Agent};
use serde_bytes::ByteBuf;
use std::fmt::Debug;

#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
Expand All @@ -18,10 +19,20 @@ pub struct HttpRequest<'body> {
pub body: &'body [u8],
}

#[derive(CandidType, Deserialize)]
pub struct Token {
key: String,
content_encoding: String,
index: Nat,
// The sha ensures that a client doesn't stream part of one version of an asset
// followed by part of a different asset, even if not checking the certificate.
sha256: Option<ByteBuf>,
}

#[derive(CandidType, Deserialize)]
pub struct CallbackStrategy {
pub callback: IDLValue,
pub token: IDLValue,
pub callback: Func,
pub token: Token,
}

#[derive(CandidType, Deserialize)]
Expand All @@ -42,7 +53,7 @@ pub struct HttpResponse {
pub struct StreamingCallbackHttpResponse {
#[serde(with = "serde_bytes")]
pub body: Vec<u8>,
pub token: Option<IDLValue>,
pub token: Option<Token>,
}

impl HttpRequestCanister {
Expand Down Expand Up @@ -87,8 +98,8 @@ impl<'agent> Canister<'agent, HttpRequestCanister> {
pub fn http_request_stream_callback<'canister: 'agent, M: Into<String>>(
&'canister self,
method: M,
token: IDLValue,
token: Token,
) -> impl 'agent + SyncCall<(StreamingCallbackHttpResponse,)> {
self.query_(&method.into()).with_value_arg(token).build()
self.query_(&method.into()).with_arg(token).build()
}
}
80 changes: 33 additions & 47 deletions icx-proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::config::dns_canister_config::DnsCanisterConfig;
use candid::parser::value::IDLValue;
use clap::{crate_authors, crate_version, AppSettings, Clap};
use hyper::{
body,
Expand Down Expand Up @@ -256,58 +255,45 @@ async fn forward_request(

match streaming_strategy {
StreamingStrategy::Callback(callback) => {
match callback.callback {
IDLValue::Func(streaming_canister_id_id, method_name) => {
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister =
HttpRequestCanister::create(&agent, streaming_canister_id_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
count += 1;
if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT {
let streaming_canister_id_id = callback.callback.principal;
let method_name = callback.callback.method;
let mut callback_token = callback.token;
let logger = logger.clone();
tokio::spawn(async move {
let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id);
// We have not yet called http_request_stream_callback.
let mut count = 0;
loop {
count += 1;
if count > MAX_HTTP_REQUEST_STREAM_CALLBACK_CALL_COUNT {
sender.abort();
break;
}

match canister
.http_request_stream_callback(&method_name, callback_token)
.call()
.await
{
Ok((StreamingCallbackHttpResponse { body, token },)) => {
if sender.send_data(Bytes::from(body)).await.is_err() {
sender.abort();
break;
}

match canister
.http_request_stream_callback(&method_name, callback_token)
.call()
.await
{
Ok((StreamingCallbackHttpResponse { body, token },)) => {
if sender.send_data(Bytes::from(body)).await.is_err() {
sender.abort();
break;
}
if let Some(next_token) = token {
callback_token = next_token;
} else {
break;
}
}
Err(e) => {
slog::debug!(
logger,
"Error happened during streaming: {}",
e
);
sender.abort();
break;
}
if let Some(next_token) = token {
callback_token = next_token;
} else {
break;
}
}
});
}
_ => {
return Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body("Streaming callback must be a function.".into())
.unwrap())
Err(e) => {
slog::debug!(logger, "Error happened during streaming: {}", e);
sender.abort();
break;
}
}
}
}
});
}
}

Expand Down