Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Consistency Proposal #2473

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a8f5f41
Start structuring middleware
acerone85 Dec 5, 2024
778dd7c
Use a static atomic to keep track of the current block height
acerone85 Dec 5, 2024
d1b1c33
Add changelog entry
acerone85 Dec 5, 2024
b77506c
Add tests and fix middleware
acerone85 Dec 5, 2024
b764cd4
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 6, 2024
d9091e6
Set headers in fuel client
acerone85 Dec 6, 2024
90c411f
AxumRequest/AxumResponse -> http::Request/http::Response
acerone85 Dec 6, 2024
625bf8f
Fix compilation
acerone85 Dec 6, 2024
085d173
Extension instead of Axum middleware
acerone85 Dec 7, 2024
a98ef9b
Merge branch 'master' into 1897-rpc-consistency-proposal
xgreenx Dec 12, 2024
c95f09a
Add response header
acerone85 Dec 12, 2024
6099651
Better handling of response
acerone85 Dec 14, 2024
3b81537
Use BlockHeight type when possible
acerone85 Dec 14, 2024
11e6876
Fix compilation error
acerone85 Dec 14, 2024
88b9c52
Use Request data to retrieve required block height
acerone85 Dec 14, 2024
c5f82ff
Remove useless .into
acerone85 Dec 15, 2024
844e4e6
Move up const definitions
acerone85 Dec 15, 2024
71f8087
Remove useless .into()
acerone85 Dec 16, 2024
cbea694
Inject request data in subscription handler
acerone85 Dec 16, 2024
da7f057
Improvements
acerone85 Dec 16, 2024
f5e63f6
Add documentation
acerone85 Dec 16, 2024
5f4500b
Do not fetch the read view twice
acerone85 Dec 16, 2024
37dd63e
Remove commented code
acerone85 Dec 16, 2024
9913620
Typo
acerone85 Dec 16, 2024
f849eb5
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 16, 2024
1fddd80
Improve setting headers in client
acerone85 Dec 16, 2024
e1f71d4
Revert to setting header in graphql extension when possible
acerone85 Dec 16, 2024
be7f927
Update comment
acerone85 Dec 16, 2024
0b1e31f
Remove use of Arc<Mutex<_>>
acerone85 Dec 16, 2024
8e1b0a5
Remove stray comment
acerone85 Dec 16, 2024
abe13c2
WIP
acerone85 Dec 16, 2024
eae1e22
Example of how it can be implemented via `extensions`
xgreenx Dec 16, 2024
0a3da1f
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Dec 18, 2024
e7e1db7
Add current height header on failed requests
acerone85 Dec 18, 2024
6b76769
Return current level after request has been executed
acerone85 Dec 19, 2024
b6f206d
WIP: Use only graphql extensions: tests still to be adjusted
acerone85 Dec 31, 2024
2f52aaa
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 20, 2025
cd3be50
Add capability to the client to set required_fuel_block_height + fix …
acerone85 Jan 20, 2025
dbc5cdc
Fix other tests
acerone85 Jan 20, 2025
c5db12f
Rename test file
acerone85 Jan 20, 2025
24b8d69
Fix BlockHeight base format in extension error
acerone85 Jan 20, 2025
4a8d839
Fix typo
acerone85 Jan 21, 2025
51ec4a9
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
37b03f1
Fix compilation error after rename
acerone85 Jan 21, 2025
396a953
Reference follow-up issue
acerone85 Jan 21, 2025
af0c6d6
Fix rustfmt
acerone85 Jan 21, 2025
b1e89d6
Fix subscriptions feature
acerone85 Jan 21, 2025
6e50f3b
Use HashMap::new instead of Default::default
acerone85 Jan 21, 2025
dcec472
Downgrade netlink-proto
acerone85 Jan 21, 2025
8c6af1f
Revert "Downgrade netlink-proto"
acerone85 Jan 21, 2025
818a717
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added
- [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id.
- [2473](https://github.com/FuelLabs/fuel-core/pull/2473): Graphql requests and responses make use of a new `extensions` object to specify request/response metadata. A request `extensions` object can contain an integer-valued `required_fuel_block_height` field. When specified, the request will return an error unless the node's current fuel block height is at least the value specified in the `required_fuel_block_height` field. All graphql responses now contain an integer-valued `current_fuel_block_height` field in the `extensions` object, which contains the block height of the last block processed by the node.

### Fixed
- [2599](https://github.com/FuelLabs/fuel-core/pull/2599): Use the proper `url` apis to construct full url path in `BlockCommitterHttpApi` client
Expand Down
87 changes: 83 additions & 4 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use base64::prelude::{
#[cfg(feature = "subscriptions")]
use cynic::StreamingOperation;
use cynic::{
http::ReqwestExt,
GraphQlResponse,
Id,
MutationBuilder,
Expand Down Expand Up @@ -78,6 +77,13 @@ use pagination::{
PaginatedResult,
PaginationRequest,
};
use reqwest::header::{
AsHeaderName,
HeaderMap,
HeaderValue,
IntoHeaderName,
CONTENT_TYPE,
};
use schema::{
assets::AssetInfoArg,
balance::BalanceArgs,
Expand Down Expand Up @@ -119,6 +125,7 @@ use schema::{
#[cfg(feature = "subscriptions")]
use std::future;
use std::{
collections::HashMap,
convert::TryInto,
io::{
self,
Expand Down Expand Up @@ -151,12 +158,30 @@ pub mod types;

type RegisterId = u32;

#[derive(Debug, derive_more::Display, derive_more::From)]
#[non_exhaustive]
/// Error occurring during interaction with the FuelClient
// anyhow::Error is wrapped inside a custom Error type,
// so that we can specific error variants in the future.
pub enum Error {
/// Unknown or not expected(by architecture) error.
#[from]
Other(anyhow::Error),
}

#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct RequestExtensions {
pub required_fuel_block_height: Option<u64>,
}

#[derive(Debug, Clone)]
pub struct FuelClient {
client: reqwest::Client,
#[cfg(feature = "subscriptions")]
cookie: std::sync::Arc<reqwest::cookie::Jar>,
url: reqwest::Url,
headers: HeaderMap,
extensions: HashMap<&'static str, serde_json::Value>,
}

impl FromStr for FuelClient {
Expand Down Expand Up @@ -184,13 +209,19 @@ impl FromStr for FuelClient {
client,
cookie,
url,
headers: HeaderMap::new(),
extensions: Default::default(),
})
}

#[cfg(not(feature = "subscriptions"))]
{
let client = reqwest::Client::new();
Ok(Self { client, url })
Ok(Self {
client,
url,
headers: HeaderMap::new(),
})
}
}
}
Expand Down Expand Up @@ -223,6 +254,34 @@ impl FuelClient {
Self::from_str(url.as_ref())
}

pub fn set_header(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't use it and can remove

&mut self,
key: impl IntoHeaderName,
value: impl TryInto<HeaderValue>,
) -> Result<&mut Self, Error> {
let header_value: HeaderValue = value
.try_into()
.map_err(|_err| anyhow::anyhow!("Cannot parse value for header"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we now have an proper Error, maybe we should introduce specific variant instead of anyhow?

self.headers.insert(key, header_value);
Ok(self)
}

pub fn remove_header(&mut self, key: impl AsHeaderName) -> &mut Self {
self.headers.remove(key);
self
}

pub fn with_required_fuel_block_height(&mut self, height: u64) -> &mut Self {
self.extensions
.insert("required_fuel_block_height", height.into());
self
}

pub fn without_required_fuel_block_height(&mut self) -> &mut Self {
self.extensions.remove("required_fuel_block_height");
self
}

/// Send the GraphQL query to the client.
pub async fn query<ResponseData, Vars>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about fn subscribe? We also want to support setting heights for the submit_and_await_commit like queries

&self,
Expand All @@ -232,13 +291,33 @@ impl FuelClient {
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
{
let response = self
let mut operation_json = serde_json::to_value(&q)?;
let operation_object = operation_json
.as_object_mut()
.expect("Graphql operation is a valid json object");
Comment on lines +291 to +293
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that you can be sure that it is an object. It would be better if you checked it via if let Value::Object() ...

operation_object.insert(
"extensions".to_owned(),
serde_json::to_value(self.extensions.clone())?,
);

println!("{}", serde_json::to_string_pretty(&operation_json).unwrap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
println!("{}", serde_json::to_string_pretty(&operation_json).unwrap());

let request_builder = self
.client
.post(self.url.clone())
.run_graphql(q)
.headers(self.headers.clone())
.header(CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&operation_json)?);

let response = request_builder
.send()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.text()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

println!("Response: {}", response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
println!("Response: {}", response);

let response = serde_json::from_str(&response)?;
Self::decode_response(response)
}

Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod database;
pub(crate) mod indexation;
pub(crate) mod metrics_extension;
pub mod ports;
pub(crate) mod required_fuel_block_height_extension;
pub mod storage;
pub(crate) mod validation_extension;
pub(crate) mod view_extension;
Expand Down
18 changes: 15 additions & 3 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::{
view_extension::ViewExtension,
Config,
},
graphql_api,
graphql_api::{
self,
required_fuel_block_height_extension::RequiredFuelBlockHeightExtension,
},
schema::{
CoreSchema,
CoreSchemaBuilder,
Expand Down Expand Up @@ -85,6 +88,9 @@ use tower_http::{
trace::TraceLayer,
};

pub(crate) const REQUIRED_FUEL_BLOCK_HEIGHT: &str = "required_fuel_block_height";
pub(crate) const CURRENT_FUEL_BLOCK_HEIGHT: &str = "current_fuel_block_height";

pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub use super::database::ReadDatabase;
Expand Down Expand Up @@ -282,6 +288,9 @@ where
))
.extension(async_graphql::extensions::Tracing)
.extension(ViewExtension::new())
// `RequiredFuelBlockHeightExtension` uses the view set by the ViewExtension.
// Do not reorder this line before adding the `ViewExtension`.
.extension(RequiredFuelBlockHeightExtension::new())
.finish();

let graphql_endpoint = "/v1/graphql";
Expand Down Expand Up @@ -358,15 +367,18 @@ async fn graphql_handler(
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Json<Response> {
schema.execute(req.0).await.into()
let response = schema.execute(req.0).await;

response.into()
Comment on lines +370 to +372
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let response = schema.execute(req.0).await;
response.into()
schema.execute(req.0).await.into()

}

async fn graphql_subscription_handler(
schema: Extension<CoreSchema>,
req: Json<Request>,
) -> Sse<impl Stream<Item = anyhow::Result<Event, serde_json::Error>>> {
let request = req.0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let request = req.0;

let stream = schema
.execute_stream(req.0)
.execute_stream(request)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.execute_stream(request)
.execute_stream(req.0)

.map(|r| Event::default().json_data(r));
Sse::new(stream)
.keep_alive(axum::response::sse::KeepAlive::new().text("keep-alive-text"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use super::database::ReadView;
use crate::fuel_core_graphql_api::api_service::{
CURRENT_FUEL_BLOCK_HEIGHT,
REQUIRED_FUEL_BLOCK_HEIGHT,
};
use async_graphql::{
extensions::{
Extension,
ExtensionContext,
ExtensionFactory,
NextExecute,
NextPrepareRequest,
},
Pos,
Request,
Response,
ServerError,
ServerResult,
Value,
};
use async_graphql_value::ConstValue;
use fuel_core_types::fuel_types::BlockHeight;
use std::sync::{
Arc,
OnceLock,
};

/// The extension that implements the logic for checking whether
/// the precondition that REQUIRED_FUEL_BLOCK_HEADER must
/// be higher than the current block height is met.
/// The value of the REQUIRED_FUEL_BLOCK_HEADER is set in
/// the request data by the graphql handler as a value of type
/// `RequiredHeight`.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub(crate) struct RequiredFuelBlockHeightExtension;

impl RequiredFuelBlockHeightExtension {
pub fn new() -> Self {
Self
}
}

pub(crate) struct RequiredFuelBlockHeightInner {
required_height: OnceLock<BlockHeight>,
}

impl RequiredFuelBlockHeightInner {
pub fn new() -> Self {
Self {
required_height: OnceLock::new(),
}
}
}

impl ExtensionFactory for RequiredFuelBlockHeightExtension {
fn create(&self) -> Arc<dyn Extension> {
Arc::new(RequiredFuelBlockHeightInner::new())
}
}

#[async_trait::async_trait]
impl Extension for RequiredFuelBlockHeightInner {
async fn prepare_request(
&self,
ctx: &ExtensionContext<'_>,
request: Request,
next: NextPrepareRequest<'_>,
) -> ServerResult<Request> {
let required_fuel_block_height =
request.extensions.get(REQUIRED_FUEL_BLOCK_HEIGHT);

if let Some(ConstValue::Number(required_fuel_block_height)) =
required_fuel_block_height
{
if let Some(required_fuel_block_height) = required_fuel_block_height.as_u64()
{
let required_fuel_block_height: u32 =
required_fuel_block_height.try_into().unwrap_or(u32::MAX);
let required_block_height: BlockHeight =
required_fuel_block_height.into();
self.required_height
.set(required_block_height)
.expect("`prepare_request` called only once; qed");
}
}

next.run(ctx, request).await
}

async fn execute(
acerone85 marked this conversation as resolved.
Show resolved Hide resolved
&self,
ctx: &ExtensionContext<'_>,
operation_name: Option<&str>,
next: NextExecute<'_>,
) -> Response {
let view: &ReadView = ctx.data_unchecked();

let current_block_height = view.latest_block_height();

if let Some(required_block_height) = self.required_height.get() {
if let Ok(current_block_height) = current_block_height {
if *required_block_height > current_block_height {
let (line, column) = (line!(), column!());
let mut response = Response::from_errors(vec![ServerError::new(
format!(
"The required fuel block height is higher than the current block height. \
Required: {}, Current: {}",
// required_block_height: &BlockHeight, dereference twice to get the
// corresponding value as u32. This is necessary because the Display
// implementation for BlockHeight displays values in hexadecimal format.
**required_block_height,
// current_fuel_block_height: BlockHeight, dereference once to get the
// corresponding value as u32.
*current_block_height
),
Some(Pos {
line: line as usize,
column: column as usize,
}),
)]);

response.extensions.insert(
CURRENT_FUEL_BLOCK_HEIGHT.to_string(),
Value::Number((*current_block_height).into()),
);

return response
}
}
}

let mut response = next.run(ctx, operation_name).await;

let current_block_height = view.latest_block_height();
netrome marked this conversation as resolved.
Show resolved Hide resolved

if let Ok(current_block_height) = current_block_height {
let current_block_height: u32 = *current_block_height;
response.extensions.insert(
CURRENT_FUEL_BLOCK_HEIGHT.to_string(),
Value::Number(current_block_height.into()),
);
} else {
tracing::error!("Failed to get the current block height");
}

response
}
}
2 changes: 2 additions & 0 deletions tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ mod regenesis;
#[cfg(not(feature = "only-p2p"))]
mod relayer;
#[cfg(not(feature = "only-p2p"))]
mod required_fuel_block_height_extension;
#[cfg(not(feature = "only-p2p"))]
mod snapshot;
#[cfg(not(feature = "only-p2p"))]
mod state_rewind;
Expand Down
Loading
Loading