-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC Consistency Proposal #2473
base: master
Are you sure you want to change the base?
RPC Consistency Proposal #2473
Changes from all commits
a8f5f41
778dd7c
d1b1c33
b77506c
b764cd4
d9091e6
90c411f
625bf8f
085d173
a98ef9b
c95f09a
6099651
3b81537
11e6876
88b9c52
c5f82ff
844e4e6
71f8087
cbea694
da7f057
f5e63f6
5f4500b
37dd63e
9913620
f849eb5
1fddd80
e1f71d4
be7f927
0b1e31f
8e1b0a5
abe13c2
eae1e22
0a3da1f
e7e1db7
6b76769
b6f206d
2f52aaa
cd3be50
dbc5cdc
c5db12f
24b8d69
4a8d839
51ec4a9
37b03f1
396a953
af0c6d6
b1e89d6
6e50f3b
dcec472
8c6af1f
818a717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -39,7 +39,6 @@ use base64::prelude::{ | |||
#[cfg(feature = "subscriptions")] | ||||
use cynic::StreamingOperation; | ||||
use cynic::{ | ||||
http::ReqwestExt, | ||||
GraphQlResponse, | ||||
Id, | ||||
MutationBuilder, | ||||
|
@@ -78,6 +77,13 @@ use pagination::{ | |||
PaginatedResult, | ||||
PaginationRequest, | ||||
}; | ||||
use reqwest::header::{ | ||||
AsHeaderName, | ||||
HeaderMap, | ||||
HeaderValue, | ||||
IntoHeaderName, | ||||
CONTENT_TYPE, | ||||
}; | ||||
use schema::{ | ||||
assets::AssetInfoArg, | ||||
balance::BalanceArgs, | ||||
|
@@ -119,6 +125,7 @@ use schema::{ | |||
#[cfg(feature = "subscriptions")] | ||||
use std::future; | ||||
use std::{ | ||||
collections::HashMap, | ||||
convert::TryInto, | ||||
io::{ | ||||
self, | ||||
|
@@ -151,12 +158,25 @@ pub mod types; | |||
|
||||
type RegisterId = u32; | ||||
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)] | ||||
#[non_exhaustive] | ||||
/// Error occurring during interaction with the FuelClient | ||||
// anyhow::Error is wrapped inside a custom Error type, | ||||
// so that we can specific error variants in the future. | ||||
pub enum Error { | ||||
/// Unknown or not expected(by architecture) error. | ||||
#[from] | ||||
Other(anyhow::Error), | ||||
} | ||||
|
||||
#[derive(Debug, Clone)] | ||||
pub struct FuelClient { | ||||
client: reqwest::Client, | ||||
#[cfg(feature = "subscriptions")] | ||||
cookie: std::sync::Arc<reqwest::cookie::Jar>, | ||||
url: reqwest::Url, | ||||
headers: HeaderMap, | ||||
extensions: HashMap<&'static str, serde_json::Value>, | ||||
} | ||||
|
||||
impl FromStr for FuelClient { | ||||
|
@@ -184,13 +204,20 @@ impl FromStr for FuelClient { | |||
client, | ||||
cookie, | ||||
url, | ||||
headers: HeaderMap::new(), | ||||
extensions: HashMap::new(), | ||||
}) | ||||
} | ||||
|
||||
#[cfg(not(feature = "subscriptions"))] | ||||
{ | ||||
let client = reqwest::Client::new(); | ||||
Ok(Self { client, url }) | ||||
Ok(Self { | ||||
client, | ||||
url, | ||||
headers: HeaderMap::new(), | ||||
extensions: HashMap::new(), | ||||
}) | ||||
} | ||||
} | ||||
} | ||||
|
@@ -223,6 +250,34 @@ impl FuelClient { | |||
Self::from_str(url.as_ref()) | ||||
} | ||||
|
||||
pub fn set_header( | ||||
&mut self, | ||||
key: impl IntoHeaderName, | ||||
value: impl TryInto<HeaderValue>, | ||||
) -> Result<&mut Self, Error> { | ||||
let header_value: HeaderValue = value | ||||
.try_into() | ||||
.map_err(|_err| anyhow::anyhow!("Cannot parse value for header"))?; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we now have an proper |
||||
self.headers.insert(key, header_value); | ||||
Ok(self) | ||||
} | ||||
|
||||
pub fn remove_header(&mut self, key: impl AsHeaderName) -> &mut Self { | ||||
self.headers.remove(key); | ||||
self | ||||
} | ||||
|
||||
pub fn with_required_fuel_block_height(&mut self, height: u64) -> &mut Self { | ||||
self.extensions | ||||
.insert("required_fuel_block_height", height.into()); | ||||
self | ||||
} | ||||
|
||||
pub fn without_required_fuel_block_height(&mut self) -> &mut Self { | ||||
self.extensions.remove("required_fuel_block_height"); | ||||
self | ||||
} | ||||
|
||||
/// Send the GraphQL query to the client. | ||||
pub async fn query<ResponseData, Vars>( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about |
||||
&self, | ||||
|
@@ -232,13 +287,33 @@ impl FuelClient { | |||
Vars: serde::Serialize, | ||||
ResponseData: serde::de::DeserializeOwned + 'static, | ||||
{ | ||||
let response = self | ||||
let mut operation_json = serde_json::to_value(&q)?; | ||||
let operation_object = operation_json | ||||
.as_object_mut() | ||||
.expect("Graphql operation is a valid json object"); | ||||
Comment on lines
+291
to
+293
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that you can be sure that it is an object. It would be better if you checked it via |
||||
operation_object.insert( | ||||
"extensions".to_owned(), | ||||
serde_json::to_value(self.extensions.clone())?, | ||||
); | ||||
|
||||
println!("{}", serde_json::to_string_pretty(&operation_json).unwrap()); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
let request_builder = self | ||||
.client | ||||
.post(self.url.clone()) | ||||
.run_graphql(q) | ||||
.headers(self.headers.clone()) | ||||
.header(CONTENT_TYPE, "application/json") | ||||
.body(serde_json::to_string(&operation_json)?); | ||||
|
||||
let response = request_builder | ||||
.send() | ||||
.await | ||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))? | ||||
.text() | ||||
.await | ||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; | ||||
|
||||
println!("Response: {}", response); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
let response = serde_json::from_str(&response)?; | ||||
Self::decode_response(response) | ||||
} | ||||
|
||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,7 +15,10 @@ use crate::{ | |||||||||
view_extension::ViewExtension, | ||||||||||
Config, | ||||||||||
}, | ||||||||||
graphql_api, | ||||||||||
graphql_api::{ | ||||||||||
self, | ||||||||||
required_fuel_block_height_extension::RequiredFuelBlockHeightExtension, | ||||||||||
}, | ||||||||||
schema::{ | ||||||||||
CoreSchema, | ||||||||||
CoreSchemaBuilder, | ||||||||||
|
@@ -85,6 +88,9 @@ use tower_http::{ | |||||||||
trace::TraceLayer, | ||||||||||
}; | ||||||||||
|
||||||||||
pub(crate) const REQUIRED_FUEL_BLOCK_HEIGHT: &str = "required_fuel_block_height"; | ||||||||||
pub(crate) const CURRENT_FUEL_BLOCK_HEIGHT: &str = "current_fuel_block_height"; | ||||||||||
|
||||||||||
pub type Service = fuel_core_services::ServiceRunner<GraphqlService>; | ||||||||||
|
||||||||||
pub use super::database::ReadDatabase; | ||||||||||
|
@@ -282,6 +288,9 @@ where | |||||||||
)) | ||||||||||
.extension(async_graphql::extensions::Tracing) | ||||||||||
.extension(ViewExtension::new()) | ||||||||||
// `RequiredFuelBlockHeightExtension` uses the view set by the ViewExtension. | ||||||||||
// Do not reorder this line before adding the `ViewExtension`. | ||||||||||
.extension(RequiredFuelBlockHeightExtension::new()) | ||||||||||
.finish(); | ||||||||||
|
||||||||||
let graphql_endpoint = "/v1/graphql"; | ||||||||||
|
@@ -358,15 +367,18 @@ async fn graphql_handler( | |||||||||
schema: Extension<CoreSchema>, | ||||||||||
req: Json<Request>, | ||||||||||
) -> Json<Response> { | ||||||||||
schema.execute(req.0).await.into() | ||||||||||
let response = schema.execute(req.0).await; | ||||||||||
|
||||||||||
response.into() | ||||||||||
Comment on lines
+370
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
async fn graphql_subscription_handler( | ||||||||||
schema: Extension<CoreSchema>, | ||||||||||
req: Json<Request>, | ||||||||||
) -> Sse<impl Stream<Item = anyhow::Result<Event, serde_json::Error>>> { | ||||||||||
let request = req.0; | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
let stream = schema | ||||||||||
.execute_stream(req.0) | ||||||||||
.execute_stream(request) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
.map(|r| Event::default().json_data(r)); | ||||||||||
Sse::new(stream) | ||||||||||
.keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text")) | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
use super::database::ReadView; | ||
use crate::fuel_core_graphql_api::api_service::{ | ||
CURRENT_FUEL_BLOCK_HEIGHT, | ||
REQUIRED_FUEL_BLOCK_HEIGHT, | ||
}; | ||
use async_graphql::{ | ||
extensions::{ | ||
Extension, | ||
ExtensionContext, | ||
ExtensionFactory, | ||
NextExecute, | ||
NextPrepareRequest, | ||
}, | ||
Pos, | ||
Request, | ||
Response, | ||
ServerError, | ||
ServerResult, | ||
Value, | ||
}; | ||
use async_graphql_value::ConstValue; | ||
use fuel_core_types::fuel_types::BlockHeight; | ||
use std::sync::{ | ||
Arc, | ||
OnceLock, | ||
}; | ||
|
||
/// The extension that implements the logic for checking whether | ||
/// the precondition that REQUIRED_FUEL_BLOCK_HEADER must | ||
/// be higher than the current block height is met. | ||
/// The value of the REQUIRED_FUEL_BLOCK_HEADER is set in | ||
/// the request data by the graphql handler as a value of type | ||
/// `RequiredHeight`. | ||
#[derive(Debug, derive_more::Display, derive_more::From)] | ||
pub(crate) struct RequiredFuelBlockHeightExtension; | ||
|
||
impl RequiredFuelBlockHeightExtension { | ||
pub fn new() -> Self { | ||
Self | ||
} | ||
} | ||
|
||
pub(crate) struct RequiredFuelBlockHeightInner { | ||
required_height: OnceLock<BlockHeight>, | ||
} | ||
|
||
impl RequiredFuelBlockHeightInner { | ||
pub fn new() -> Self { | ||
Self { | ||
required_height: OnceLock::new(), | ||
} | ||
} | ||
} | ||
|
||
impl ExtensionFactory for RequiredFuelBlockHeightExtension { | ||
fn create(&self) -> Arc<dyn Extension> { | ||
Arc::new(RequiredFuelBlockHeightInner::new()) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Extension for RequiredFuelBlockHeightInner { | ||
async fn prepare_request( | ||
&self, | ||
ctx: &ExtensionContext<'_>, | ||
request: Request, | ||
next: NextPrepareRequest<'_>, | ||
) -> ServerResult<Request> { | ||
let required_fuel_block_height = | ||
request.extensions.get(REQUIRED_FUEL_BLOCK_HEIGHT); | ||
|
||
if let Some(ConstValue::Number(required_fuel_block_height)) = | ||
required_fuel_block_height | ||
{ | ||
if let Some(required_fuel_block_height) = required_fuel_block_height.as_u64() | ||
{ | ||
let required_fuel_block_height: u32 = | ||
required_fuel_block_height.try_into().unwrap_or(u32::MAX); | ||
let required_block_height: BlockHeight = | ||
required_fuel_block_height.into(); | ||
self.required_height | ||
.set(required_block_height) | ||
.expect("`prepare_request` called only once; qed"); | ||
} | ||
} | ||
|
||
next.run(ctx, request).await | ||
} | ||
|
||
async fn execute( | ||
acerone85 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
&self, | ||
ctx: &ExtensionContext<'_>, | ||
operation_name: Option<&str>, | ||
next: NextExecute<'_>, | ||
) -> Response { | ||
let view: &ReadView = ctx.data_unchecked(); | ||
|
||
let current_block_height = view.latest_block_height(); | ||
|
||
if let Some(required_block_height) = self.required_height.get() { | ||
if let Ok(current_block_height) = current_block_height { | ||
if *required_block_height > current_block_height { | ||
let (line, column) = (line!(), column!()); | ||
let mut response = Response::from_errors(vec![ServerError::new( | ||
format!( | ||
"The required fuel block height is higher than the current block height. \ | ||
Required: {}, Current: {}", | ||
// required_block_height: &BlockHeight, dereference twice to get the | ||
// corresponding value as u32. This is necessary because the Display | ||
// implementation for BlockHeight displays values in hexadecimal format. | ||
**required_block_height, | ||
// current_fuel_block_height: BlockHeight, dereference once to get the | ||
// corresponding value as u32. | ||
*current_block_height | ||
), | ||
Some(Pos { | ||
line: line as usize, | ||
column: column as usize, | ||
}), | ||
)]); | ||
|
||
response.extensions.insert( | ||
CURRENT_FUEL_BLOCK_HEIGHT.to_string(), | ||
Value::Number((*current_block_height).into()), | ||
); | ||
|
||
return response | ||
} | ||
} | ||
} | ||
|
||
let mut response = next.run(ctx, operation_name).await; | ||
|
||
let current_block_height = view.latest_block_height(); | ||
netrome marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if let Ok(current_block_height) = current_block_height { | ||
let current_block_height: u32 = *current_block_height; | ||
response.extensions.insert( | ||
CURRENT_FUEL_BLOCK_HEIGHT.to_string(), | ||
Value::Number(current_block_height.into()), | ||
); | ||
} else { | ||
tracing::error!("Failed to get the current block height"); | ||
} | ||
|
||
response | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't use it and can remove