-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid using errors to communicate the state of other shards to the leader during query-status API. #1520
base: main
Are you sure you want to change the base?
Avoid using errors to communicate the state of other shards to the leader during query-status API. #1520
Changes from all commits
30116f1
c062333
d13aff6
8afb0ab
fe5064b
ee77604
2889f1c
967fdca
c7927c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
use std::{fmt::Debug, future::Future, marker::PhantomData}; | ||
|
||
use async_trait::async_trait; | ||
use serde::de::DeserializeOwned; | ||
use futures_util::TryStreamExt; | ||
use serde::{de::DeserializeOwned, Deserialize, Serialize}; | ||
use serde_json::json; | ||
|
||
use super::BytesStream; | ||
use crate::{ | ||
error::BoxError, | ||
helpers::{ | ||
|
@@ -113,6 +115,18 @@ impl HelperResponse { | |
pub fn try_into_owned<T: DeserializeOwned>(self) -> Result<T, serde_json::Error> { | ||
serde_json::from_slice(&self.body) | ||
} | ||
|
||
/// Asynchronously collects and returns a newly created `HelperResponse`. | ||
/// | ||
/// # Errors | ||
/// | ||
/// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned. | ||
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> { | ||
let bytes: bytes::BytesMut = value.try_collect().await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't remember this API in detail, but is |
||
Ok(Self { | ||
body: bytes.to_vec(), | ||
}) | ||
} | ||
} | ||
|
||
impl From<PrepareQuery> for HelperResponse { | ||
|
@@ -128,13 +142,26 @@ impl From<()> for HelperResponse { | |
} | ||
} | ||
|
||
#[derive(Deserialize, Serialize)] | ||
struct QueryStatusResponse { | ||
status: QueryStatus, | ||
} | ||
|
||
impl From<QueryStatus> for HelperResponse { | ||
fn from(value: QueryStatus) -> Self { | ||
let v = serde_json::to_vec(&json!({"status": value})).unwrap(); | ||
let response = QueryStatusResponse { status: value }; | ||
let v = serde_json::to_vec(&response).unwrap(); | ||
Self { body: v } | ||
} | ||
} | ||
|
||
impl From<HelperResponse> for QueryStatus { | ||
fn from(value: HelperResponse) -> Self { | ||
let response: QueryStatusResponse = serde_json::from_slice(value.body.as_ref()).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is likely a fallible conversion, so a method on |
||
response.status | ||
} | ||
} | ||
|
||
impl From<QueryKilled> for HelperResponse { | ||
fn from(value: QueryKilled) -> Self { | ||
let v = serde_json::to_vec(&json!({"query_id": value.0, "status": "killed"})).unwrap(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -310,7 +310,7 @@ impl RouteParams<RouteId, QueryId, NoStep> for (RouteId, QueryId) { | |
} | ||
|
||
#[derive(thiserror::Error, Debug)] | ||
#[error("One or more peers rejected the request: {failures:?}")] | ||
#[error("One or more peer shards rejected the broadcast request: {failures:?}")] | ||
pub struct BroadcastError<I: TransportIdentity, E: Debug> { | ||
pub failures: Vec<(I, E)>, | ||
} | ||
|
@@ -325,7 +325,10 @@ impl<I: TransportIdentity, E: Debug> From<Vec<(I, E)>> for BroadcastError<I, E> | |
#[async_trait] | ||
pub trait Transport: Clone + Send + Sync + 'static { | ||
type Identity: TransportIdentity; | ||
/// They type used by [`receive`]. | ||
type RecordsStream: BytesStream; | ||
/// The type used for responses to [`send`] and [`broadcast`]. | ||
type SendResponse: BytesStream; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure this is the right way to do it. Send requests could be very different in nature and may not have the common response type. One may return a json while others can return a stream. Ideally, the return type should be defined by |
||
type Error: Debug + Send; | ||
|
||
/// Return my identity in the network (MPC or Sharded) | ||
|
@@ -349,7 +352,7 @@ pub trait Transport: Clone + Send + Sync + 'static { | |
dest: Self::Identity, | ||
route: R, | ||
data: D, | ||
) -> Result<(), Self::Error> | ||
) -> Result<Option<Self::SendResponse>, Self::Error> | ||
where | ||
Option<QueryId>: From<Q>, | ||
Option<Gate>: From<S>, | ||
|
@@ -371,7 +374,10 @@ pub trait Transport: Clone + Send + Sync + 'static { | |
async fn broadcast<Q, S, R>( | ||
&self, | ||
route: R, | ||
) -> Result<(), BroadcastError<Self::Identity, Self::Error>> | ||
) -> Result< | ||
Vec<(Self::Identity, Option<Self::SendResponse>)>, | ||
BroadcastError<Self::Identity, Self::Error>, | ||
> | ||
where | ||
Option<QueryId>: From<Q>, | ||
Option<Gate>: From<S>, | ||
|
@@ -388,14 +394,16 @@ pub trait Transport: Clone + Send + Sync + 'static { | |
} | ||
|
||
let mut errs = Vec::new(); | ||
let mut responses = Vec::new(); | ||
while let Some(r) = futs.next().await { | ||
if let Err(e) = r.1 { | ||
errs.push((r.0, e)); | ||
match r.1 { | ||
Err(e) => errs.push((r.0, e)), | ||
Ok(re) => responses.push((r.0, re)), | ||
} | ||
} | ||
|
||
if errs.is_empty() { | ||
Ok(()) | ||
Ok(responses) | ||
} else { | ||
Err(errs.into()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.