Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid using errors to communicate the state of other shards to the leader during query-status API. #1520

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
cli::LoggingHandle,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
query::{PrepareQuery, QueryConfig, QueryInput},
routing::{Addr, RouteId},
ApiError, BodyStream, HandlerBox, HandlerRef, HelperIdentity, HelperResponse,
MpcTransportImpl, RequestHandler, ShardTransportImpl, Transport, TransportIdentity,
Expand Down Expand Up @@ -208,8 +208,8 @@ impl RequestHandler<ShardIndex> for Inner {
HelperResponse::from(qp.prepare_shard(&self.shard_transport, req)?)
}
RouteId::QueryStatus => {
let req = req.into::<CompareStatusRequest>()?;
HelperResponse::from(qp.shard_status(&self.shard_transport, &req)?)
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.shard_status(&self.shard_transport, query_id)?)
}
RouteId::CompleteQuery => {
// The processing flow for this API is exactly the same, regardless
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/gateway/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(super) struct Transports<M: Transport<Identity = Role>, S: Transport<Identit
impl Transport for RoleResolvingTransport {
type Identity = Role;
type RecordsStream = <MpcTransportImpl as Transport>::RecordsStream;
type SendResponse = <MpcTransportImpl as Transport>::SendResponse;
type Error = SendToRoleError;

fn identity(&self) -> Role {
Expand All @@ -60,7 +61,7 @@ impl Transport for RoleResolvingTransport {
dest: Role,
route: R,
data: D,
) -> Result<(), Self::Error>
) -> Result<Option<Self::SendResponse>, Self::Error>
where
Option<QueryId>: From<Q>,
Option<Gate>: From<S>,
Expand Down
31 changes: 29 additions & 2 deletions ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{fmt::Debug, future::Future, marker::PhantomData};

use async_trait::async_trait;
use serde::de::DeserializeOwned;
use futures_util::TryStreamExt;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;

use super::BytesStream;
use crate::{
error::BoxError,
helpers::{
Expand Down Expand Up @@ -113,6 +115,18 @@ impl HelperResponse {
pub fn try_into_owned<T: DeserializeOwned>(self) -> Result<T, serde_json::Error> {
serde_json::from_slice(&self.body)
}

/// Asynchronously collects and returns a newly created `HelperResponse`.
///
/// # Errors
///
/// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned.
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> {
pub async fn from_bytes_stream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> {

let bytes: bytes::BytesMut = value.try_collect().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember this API in detail, but is BytesMut required or Bytes would work too?

Ok(Self {
body: bytes.to_vec(),
})
}
}

impl From<PrepareQuery> for HelperResponse {
Expand All @@ -128,13 +142,26 @@ impl From<()> for HelperResponse {
}
}

#[derive(Deserialize, Serialize)]
struct QueryStatusResponse {
status: QueryStatus,
}

impl From<QueryStatus> for HelperResponse {
fn from(value: QueryStatus) -> Self {
let v = serde_json::to_vec(&json!({"status": value})).unwrap();
let response = QueryStatusResponse { status: value };
let v = serde_json::to_vec(&response).unwrap();
Self { body: v }
}
}

impl From<HelperResponse> for QueryStatus {
fn from(value: HelperResponse) -> Self {
let response: QueryStatusResponse = serde_json::from_slice(value.body.as_ref()).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is likely a fallible conversion, so a method on HelperResponse will do better imo. Or a TryFrom implementation

response.status
}
}

impl From<QueryKilled> for HelperResponse {
fn from(value: QueryKilled) -> Self {
let v = serde_json::to_vec(&json!({"query_id": value.0, "status": "killed"})).unwrap();
Expand Down
18 changes: 13 additions & 5 deletions ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ::tokio::sync::{
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::{stream, Stream, StreamExt};
#[cfg(all(feature = "shuttle", test))]
use shuttle::future as tokio;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -156,6 +156,7 @@ impl<I: TransportIdentity> InMemoryTransport<I> {
impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
type Identity = I;
type RecordsStream = ReceiveRecords<I, InMemoryStream>;
type SendResponse = InMemoryStream;
type Error = Error<I>;

fn identity(&self) -> I {
Expand All @@ -182,7 +183,7 @@ impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
dest: I,
route: R,
data: D,
) -> Result<(), Error<I>>
) -> Result<Option<Self::SendResponse>, Error<I>>
where
Option<QueryId>: From<Q>,
Option<Gate>: From<S>,
Expand Down Expand Up @@ -214,7 +215,7 @@ impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
io::Error::new::<String>(io::ErrorKind::ConnectionAborted, "channel closed".into())
})?;

ack_rx
let res = ack_rx
.await
.map_err(|_recv_error| Error::Rejected {
dest,
Expand All @@ -224,8 +225,11 @@ impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
dest,
inner: e.into(),
})?;

Ok(())
let body_bytes = res.into_body();
if body_bytes.is_empty() {
return Ok(None);
}
Ok(Some(InMemoryStream::wrap_bytes(body_bytes)))
}

fn receive<R: RouteParams<NoResourceIdentifier, QueryId, Gate>>(
Expand All @@ -247,6 +251,10 @@ pub struct InMemoryStream {
}

impl InMemoryStream {
fn wrap_bytes(bytes: Vec<u8>) -> Self {
InMemoryStream::wrap(stream::once(async { Ok(Bytes::from(bytes)) }))
}

fn wrap<S: Stream<Item = StreamItem> + Send + 'static>(value: S) -> Self {
Self {
inner: Box::pin(value),
Expand Down
20 changes: 14 additions & 6 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl RouteParams<RouteId, QueryId, NoStep> for (RouteId, QueryId) {
}

#[derive(thiserror::Error, Debug)]
#[error("One or more peers rejected the request: {failures:?}")]
#[error("One or more peer shards rejected the broadcast request: {failures:?}")]
pub struct BroadcastError<I: TransportIdentity, E: Debug> {
pub failures: Vec<(I, E)>,
}
Expand All @@ -325,7 +325,10 @@ impl<I: TransportIdentity, E: Debug> From<Vec<(I, E)>> for BroadcastError<I, E>
#[async_trait]
pub trait Transport: Clone + Send + Sync + 'static {
type Identity: TransportIdentity;
/// They type used by [`receive`].
type RecordsStream: BytesStream;
/// The type used for responses to [`send`] and [`broadcast`].
type SendResponse: BytesStream;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is the right way to do it. Send requests could be very different in nature and may not have the common response type. One may return a json while others can return a stream. Ideally, the return type should be defined by R: RouteParams trait but I am not sure if I have a suggestion how to use it here

type Error: Debug + Send;

/// Return my identity in the network (MPC or Sharded)
Expand All @@ -349,7 +352,7 @@ pub trait Transport: Clone + Send + Sync + 'static {
dest: Self::Identity,
route: R,
data: D,
) -> Result<(), Self::Error>
) -> Result<Option<Self::SendResponse>, Self::Error>
where
Option<QueryId>: From<Q>,
Option<Gate>: From<S>,
Expand All @@ -371,7 +374,10 @@ pub trait Transport: Clone + Send + Sync + 'static {
async fn broadcast<Q, S, R>(
&self,
route: R,
) -> Result<(), BroadcastError<Self::Identity, Self::Error>>
) -> Result<
Vec<(Self::Identity, Option<Self::SendResponse>)>,
BroadcastError<Self::Identity, Self::Error>,
>
where
Option<QueryId>: From<Q>,
Option<Gate>: From<S>,
Expand All @@ -388,14 +394,16 @@ pub trait Transport: Clone + Send + Sync + 'static {
}

let mut errs = Vec::new();
let mut responses = Vec::new();
while let Some(r) = futs.next().await {
if let Err(e) = r.1 {
errs.push((r.0, e));
match r.1 {
Err(e) => errs.push((r.0, e)),
Ok(re) => responses.push((r.0, re)),
}
}

if errs.is_empty() {
Ok(())
Ok(responses)
} else {
Err(errs.into())
}
Expand Down
28 changes: 0 additions & 28 deletions ipa-core/src/helpers/transport/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
RoleAssignment, RouteParams,
},
protocol::QueryId,
query::QueryStatus,
};

#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -239,33 +238,6 @@ impl Debug for QueryInput {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct CompareStatusRequest {
pub query_id: QueryId,
pub status: QueryStatus,
}

impl RouteParams<RouteId, QueryId, NoStep> for CompareStatusRequest {
type Params = String;

fn resource_identifier(&self) -> RouteId {
RouteId::QueryStatus
}

fn query_id(&self) -> QueryId {
self.query_id
}

fn gate(&self) -> NoStep {
NoStep
}

fn extra(&self) -> Self::Params {
serde_json::to_string(self).unwrap()
}
}

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub enum QueryType {
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/transport/stream/axum_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::{error::BoxError, helpers::BytesStream};
pub struct WrappedAxumBodyStream(#[pin] BodyDataStream);

impl WrappedAxumBodyStream {
pub(super) fn new(b: Body) -> Self {
#[must_use]
pub fn new(b: Body) -> Self {
Self(b.into_data_stream())
}

Expand Down
81 changes: 36 additions & 45 deletions ipa-core/src/net/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ use crate::{
},
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
TransportIdentity,
query::{PrepareQuery, QueryConfig, QueryInput},
BodyStream, TransportIdentity,
},
net::{error::ShardQueryStatusMismatchError, http_serde, Error, CRYPTO_PROVIDER},
net::{http_serde, Error, CRYPTO_PROVIDER},
protocol::{Gate, QueryId},
};

Expand Down Expand Up @@ -385,30 +385,44 @@ impl<F: ConnectionFlavor> IpaHttpClient<F> {
resp_ok(resp).await
}

/// This API is used by leader shards in MPC to request query status information on peers.
/// If a given peer has status that doesn't match the one provided by the leader, it responds
/// with 412 error and encodes its status inside the response body. Otherwise, 200 is returned.
/// Sends a query status request and returns the response bytes.
///
/// # Errors
/// If the request has illegal arguments, or fails to be delivered
pub async fn status_match(&self, data: CompareStatusRequest) -> Result<(), Error> {
let req = http_serde::query::status_match::try_into_http_request(
&data,
self.scheme.clone(),
self.authority.clone(),
)?;
/// If the request has illegal arguments, or fails to deliver to helper
async fn query_status_impl(&self, query_id: QueryId) -> Result<Bytes, Error> {
let req = http_serde::query::status::Request::new(query_id);
let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?;
let resp = self.request(req).await?;

match resp.status() {
StatusCode::OK => Ok(()),
StatusCode::PRECONDITION_FAILED => {
let bytes = response_to_bytes(resp).await?;
let err = serde_json::from_slice::<ShardQueryStatusMismatchError>(&bytes)?;
Err(err.into())
}
_ => Err(Error::from_failed_resp(resp).await),
if resp.status().is_success() {
Ok(response_to_bytes(resp).await?)
} else {
Err(Error::from_failed_resp(resp).await)
}
}
/// Retrieves the status of a query as a byte stream.
///
/// This function calls `query_status_impl` and returns the response bytes as a `BodyStream`.
///
/// # Errors
/// If the request has illegal arguments, or fails to deliver to helper
pub async fn query_status_bytes(&self, query_id: QueryId) -> Result<BodyStream, Error> {
let bytes = self.query_status_impl(query_id).await?;
Ok(BodyStream::from(bytes.to_vec()))
}
/// Retrieves the status of a query.
///
/// This function calls `query_status_impl` and deserializes the response bytes into a `QueryStatus` struct.
///
/// # Errors
/// If the request has illegal arguments, or fails to deliver to helper
pub async fn query_status(
&self,
query_id: QueryId,
) -> Result<crate::query::QueryStatus, Error> {
let bytes = self.query_status_impl(query_id).await?;
let http_serde::query::status::ResponseBody { status } = serde_json::from_slice(&bytes)?;
Ok(status)
}
}

impl IpaHttpClient<Helper> {
Expand Down Expand Up @@ -467,29 +481,6 @@ impl IpaHttpClient<Helper> {
resp_ok(resp).await
}

/// Retrieve the status of a query.
///
/// ## Errors
/// If the request has illegal arguments, or fails to deliver to helper
#[cfg(any(all(test, not(feature = "shuttle")), feature = "cli"))]
pub async fn query_status(
&self,
query_id: QueryId,
) -> Result<crate::query::QueryStatus, Error> {
let req = http_serde::query::status::Request::new(query_id);
let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?;

let resp = self.request(req).await?;
if resp.status().is_success() {
let bytes = response_to_bytes(resp).await?;
let http_serde::query::status::ResponseBody { status } =
serde_json::from_slice(&bytes)?;
Ok(status)
} else {
Err(Error::from_failed_resp(resp).await)
}
}

/// Wait for completion of the query and pull the results of this query. This is a blocking
/// API so it is not supposed to be used outside of CLI context.
///
Expand Down
Loading
Loading