Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Consistency Proposal #2473

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a8f5f41
Start structuring middleware
acerone85 Dec 5, 2024
778dd7c
Use a static atomic to keep track of the current block height
acerone85 Dec 5, 2024
d1b1c33
Add changelog entry
acerone85 Dec 5, 2024
b77506c
Add tests and fix middleware
acerone85 Dec 5, 2024
b764cd4
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 6, 2024
d9091e6
Set headers in fuel client
acerone85 Dec 6, 2024
90c411f
AxumRequest/AxumResponse -> http::Request/http::Response
acerone85 Dec 6, 2024
625bf8f
Fix compilation
acerone85 Dec 6, 2024
085d173
Extension instead of Axum middleware
acerone85 Dec 7, 2024
a98ef9b
Merge branch 'master' into 1897-rpc-consistency-proposal
xgreenx Dec 12, 2024
c95f09a
Add response header
acerone85 Dec 12, 2024
6099651
Better handling of response
acerone85 Dec 14, 2024
3b81537
Use BlockHeight type when possible
acerone85 Dec 14, 2024
11e6876
Fix compilation error
acerone85 Dec 14, 2024
88b9c52
Use Request data to retrieve required block height
acerone85 Dec 14, 2024
c5f82ff
Remove useless .into
acerone85 Dec 15, 2024
844e4e6
Move up const definitions
acerone85 Dec 15, 2024
71f8087
Remove useless .into()
acerone85 Dec 16, 2024
cbea694
Inject request data in subscription handler
acerone85 Dec 16, 2024
da7f057
Improvements
acerone85 Dec 16, 2024
f5e63f6
Add documentation
acerone85 Dec 16, 2024
5f4500b
Do not fetch the read view twice
acerone85 Dec 16, 2024
37dd63e
Remove commented code
acerone85 Dec 16, 2024
9913620
Typo
acerone85 Dec 16, 2024
f849eb5
Update crates/fuel-core/src/graphql_api/api_service.rs
acerone85 Dec 16, 2024
1fddd80
Improve setting headers in client
acerone85 Dec 16, 2024
e1f71d4
Revert to setting header in graphql extension when possible
acerone85 Dec 16, 2024
be7f927
Update comment
acerone85 Dec 16, 2024
0b1e31f
Remove use of Arc<Mutex<_>>
acerone85 Dec 16, 2024
8e1b0a5
Remove stray comment
acerone85 Dec 16, 2024
abe13c2
WIP
acerone85 Dec 16, 2024
eae1e22
Example of how it can be implemented via `extensions`
xgreenx Dec 16, 2024
0a3da1f
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Dec 18, 2024
e7e1db7
Add current height header on failed requests
acerone85 Dec 18, 2024
6b76769
Return current level after request has been executed
acerone85 Dec 19, 2024
b6f206d
WIP: Use only graphql extensions: tests still to be adjusted
acerone85 Dec 31, 2024
2f52aaa
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 20, 2025
cd3be50
Add capability to the client to set required_fuel_block_height + fix …
acerone85 Jan 20, 2025
dbc5cdc
Fix other tests
acerone85 Jan 20, 2025
c5db12f
Rename test file
acerone85 Jan 20, 2025
24b8d69
Fix BlockHeight base format in extension error
acerone85 Jan 20, 2025
4a8d839
Fix typo
acerone85 Jan 21, 2025
51ec4a9
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
37b03f1
Fix compilation error after rename
acerone85 Jan 21, 2025
396a953
Reference follow-up issue
acerone85 Jan 21, 2025
af0c6d6
Fix rustfmt
acerone85 Jan 21, 2025
b1e89d6
Fix subscriptions feature
acerone85 Jan 21, 2025
6e50f3b
Use HashMap::new instead of Default::default
acerone85 Jan 21, 2025
dcec472
Downgrade netlink-proto
acerone85 Jan 21, 2025
8c6af1f
Revert "Downgrade netlink-proto"
acerone85 Jan 21, 2025
818a717
Merge branch 'master' into 1897-rpc-consistency-proposal
acerone85 Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.
- [2233](https://github.com/FuelLabs/fuel-core/pull/2233): Introduce a new column `modification_history_v2` for storing the modification history in the historical rocksDB. Keys in this column are stored in big endian order. Changed the behaviour of the historical rocksDB to write changes for new block heights to the new column, and to perform lookup of values from the `modification_history_v2` table first, and then from the `modification_history` table, performing a migration upon access if necessary.
- [2473](https://github.com/FuelLabs/fuel-core/pull/2473): Graphql requests look for a `REQUIRED_FUEL_BLOCK_HEIGHT` header. If the header is specified, the request will not be served unless the node's current fuel block height is at least the value specified in the header. All graphql responses now contain a `CURRENT_FUEL_BLOCK_HEIGHT` header which contains the block height of the last block processed by the node.

#### Breaking
- [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`.
Expand Down
48 changes: 48 additions & 0 deletions crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,28 @@ impl FuelClient {
Self::decode_response(response)
}

pub async fn query_with_headers<ResponseData, Vars>(
&self,
q: Operation<ResponseData, Vars>,
headers: impl IntoIterator<Item = (String, String)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: HeadersMap here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to mention this is done, see d9091e6

) -> io::Result<ResponseData>
where
Vars: serde::Serialize,
ResponseData: serde::de::DeserializeOwned + 'static,
{
let mut request_builder = self.client.post(self.url.clone());
for (header_name, header_value) in headers {
request_builder = request_builder.header(header_name, header_value);
}

let response = request_builder
.run_graphql(q)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Self::decode_response(response)
}

fn decode_response<R>(response: GraphQlResponse<R>) -> io::Result<R>
where
R: serde::de::DeserializeOwned + 'static,
Expand Down Expand Up @@ -1080,6 +1102,32 @@ impl FuelClient {
Ok(balance.amount)
}

pub async fn balance_with_required_block_header(
&self,
owner: &Address,
asset_id: Option<&AssetId>,
required_block_height: u32,
) -> io::Result<u128> {
netrome marked this conversation as resolved.
Show resolved Hide resolved
let owner: schema::Address = (*owner).into();
let asset_id: schema::AssetId = match asset_id {
Some(asset_id) => (*asset_id).into(),
None => schema::AssetId::default(),
};
let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
let balance: types::Balance = self
.query_with_headers(
query,
vec![(
"REQUIRED_FUEL_BLOCK_HEIGHT".to_string(),
netrome marked this conversation as resolved.
Show resolved Hide resolved
required_block_height.to_string(),
)],
)
.await?
.balance
.into();
Ok(balance.amount)
}

// Retrieve a page of balances by their owner
pub async fn balances(
&self,
Expand Down
77 changes: 75 additions & 2 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::{
view_extension::ViewExtension,
Config,
},
graphql_api,
graphql_api::{
self,
},
schema::{
CoreSchema,
CoreSchemaBuilder,
Expand All @@ -42,6 +44,13 @@ use axum::{
ACCESS_CONTROL_ALLOW_ORIGIN,
},
HeaderValue,
Request as AxumRequest,
Response as AxumResponse,
netrome marked this conversation as resolved.
Show resolved Hide resolved
StatusCode,
},
middleware::{
self,
Next,
},
response::{
sse::Event,
Expand Down Expand Up @@ -88,7 +97,10 @@ use tower_http::{
pub type Service = fuel_core_services::ServiceRunner<GraphqlService>;

pub use super::database::ReadDatabase;
use super::ports::worker;
use super::{
ports::worker,
worker_service::LAST_KNOWN_BLOCK_HEIGHT,
};

pub type BlockProducer = Box<dyn BlockProducerPort>;
// In the future GraphQL should not be aware of `TxPool`. It should
Expand Down Expand Up @@ -214,6 +226,66 @@ impl RunnableTask for Task {
}
}

const REQUIRED_FUEL_BLOCK_HEIGHT_HEADER: &str = "REQUIRED_FUEL_BLOCK_HEIGHT";
const CURRENT_FUEL_BLOCK_HEIGHT_HEADER: &str = "CURRENT_FUEL_BLOCK_HEIGHT";

async fn required_fuel_block_height<B>(
req: AxumRequest<B>,
next: Next<B>,
) -> impl IntoResponse {
acerone85 marked this conversation as resolved.
Show resolved Hide resolved
let last_known_block_height: BlockHeight = LAST_KNOWN_BLOCK_HEIGHT
.get()
//Maybe too strict?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it weird that this can be unset. We could always default to 0 as well, which would just mean that we're not in sync yet (which I presume we're not if this value hasn't been set).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be unset until the first block is processed. But if we are going to fetch the value directly from the DB then it won't be the case anymore. I am waiting to hear what approach @xgreenx prefers before refactoring this bit

.ok_or(StatusCode::INTERNAL_SERVER_ERROR)?
.load(std::sync::atomic::Ordering::Acquire)
.into();

let Some(required_fuel_block_height_header) = req
.headers()
.get(REQUIRED_FUEL_BLOCK_HEIGHT_HEADER)
.map(|value| value.to_str())
else {
// Header is not present, so we don't have any requirements.
let mut response = next.run(req).await;
add_current_fuel_block_height_header_to_response(
&mut response,
&last_known_block_height,
);

return Ok(response);
};

let raw_required_fuel_block_height =
required_fuel_block_height_header.map_err(|_err| StatusCode::BAD_REQUEST)?;

let required_fuel_block_height: BlockHeight = raw_required_fuel_block_height
.parse::<u32>()
.map_err(|_| StatusCode::BAD_REQUEST)?
.into();

if required_fuel_block_height > last_known_block_height {
Err(StatusCode::PRECONDITION_FAILED)
} else {
let mut response = next.run(req).await;
add_current_fuel_block_height_header_to_response(
&mut response,
&last_known_block_height,
);

Ok(response)
}
}

fn add_current_fuel_block_height_header_to_response<Body>(
response: &mut AxumResponse<Body>,
last_known_block_height: &BlockHeight,
) {
response.headers_mut().insert(
CURRENT_FUEL_BLOCK_HEIGHT_HEADER,
HeaderValue::from_str(&last_known_block_height.to_string()).unwrap(),
);
}

// Need a separate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service<OnChain, OffChain>(
Expand Down Expand Up @@ -287,6 +359,7 @@ where
.route(
graphql_endpoint,
post(graphql_handler)
.layer(middleware::from_fn(required_fuel_block_height))
.layer(ConcurrencyLimitLayer::new(concurrency_limit))
.options(ok),
)
Expand Down
12 changes: 12 additions & 0 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,18 @@ use futures::{
use std::{
borrow::Cow,
ops::Deref,
sync::{
atomic::AtomicU32,
OnceLock,
},
};

#[cfg(test)]
mod tests;

// The last known block height that was processed by the GraphQL service.
pub static LAST_KNOWN_BLOCK_HEIGHT: OnceLock<AtomicU32> = OnceLock::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: remove Static and store in Service? Should still be accessible from middleware

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this so performance critical that we can't do a database lookup for this value? I'd default to querying this from the database, to ensure we don't have any weird consistency issues when this field and the db isn't in sync. I guess this isn't a big problem right now, but it just doesn't feel right to allow for inconsistencies in a field used to synchronize with other nodes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use static. Each extension has access to the context and you can get ReadDatabase from the context. ReadDatabase knows the latest block height.

You can check how ViewExtension works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should fetch the value from the DB (it is probably going to be fetched from the memtable anyway, since it's frequently updated). I don't agree that we should do this with async_graphql Extensions, the reason being that support for headers is pretty limited in there.

  1. Headers do not seem to be set when building the query context in async_graphql (see https://github.com/async-graphql/async-graphql/blob/7f1791488463d4e9c5adcd543962173e2f6cbd34/src/schema.rs#L927).
    I have confirmed this by applying the patch below to my PR and running the test. The output confirms that the header is not set by the time we execute the query.
diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs
index b6b95228e8..956d5961bb 100644
--- a/crates/fuel-core/src/schema/balance.rs
+++ b/crates/fuel-core/src/schema/balance.rs
@@ -60,6 +60,10 @@ impl BalanceQuery {
         #[graphql(desc = "address of the owner")] owner: Address,
         #[graphql(desc = "asset_id of the coin")] asset_id: AssetId,
     ) -> async_graphql::Result<Balance> {
+        println!(
+            "header set: {}",
+            ctx.http_header_contains("REQUIRED_FUEL_BLOCK_HEIGHT")
+        );
         let query = ctx.read_view()?;
         let base_asset_id = *ctx
             .data_unchecked::<ConsensusProvider>()
  1. Maybe we could get the header somehow from the ExtensionContext, but this is going to be an ad hoc, undocumented solution and likely not readable.

What I think we should do instead is stick with Axum's middleware: get a handle to the OnchainDatabase from the CombinedDatabase (just because I don't want to wrap the instance to the CombinedDatabase inside an Arc), and use it to fetch the value of the last height in the middleware.

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry completely lost this discussion, I think this makes sense. I guess you've already implemented this. Re-reviewing now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for context: Headers cannot be used anymore, the reason being that subscriptions will return multiple graphql responses in a single HTTP response (and therefore have a single HeaderMap), while we require that each item in the response has its associated CURRENT_FUEL_BLOCK_HEIGHT field. Moreover the Response metadata for subscriptions is returned before the actual response body (as it is standard with HTTP).

in the end graphql has a (very undocumented) way to add an extensions field to responses, at the same level of data/errors field. This does not require to tweak anything at the HTTP level.


#[derive(Debug, Clone)]
pub enum DaCompressionConfig {
Disabled,
Expand Down Expand Up @@ -186,6 +193,11 @@ where
// update the importer metrics after the block is successfully committed
graphql_metrics().total_txs_count.set(total_tx_count as i64);

// update the block height recorded in memory
LAST_KNOWN_BLOCK_HEIGHT
.get_or_init(|| AtomicU32::new(**height))
.store(**height, std::sync::atomic::Ordering::Release);

Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ mod regenesis;
#[cfg(not(feature = "only-p2p"))]
mod relayer;
#[cfg(not(feature = "only-p2p"))]
mod required_fuel_block_height_header;
#[cfg(not(feature = "only-p2p"))]
mod snapshot;
#[cfg(not(feature = "only-p2p"))]
mod state_rewind;
Expand Down
50 changes: 50 additions & 0 deletions tests/tests/required_fuel_block_height_header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use fuel_core::{
chain_config::StateConfig,
service::{
Config,
FuelService,
},
};
use fuel_core_client::client::{
types::primitives::{
Address,
AssetId,
},
FuelClient,
};

#[tokio::test]
async fn balance_with_block_height_header() {
let owner = Address::default();
let asset_id = AssetId::BASE;

// setup config
let state_config = StateConfig::default();
let config = Config::local_node_with_state_config(state_config);

// setup server & client
let srv = FuelService::new_node(config).await.unwrap();
let client = FuelClient::from(srv.bound_address);

// Issue a request with wrong precondition
let error = client
.balance_with_required_block_header(&owner, Some(&asset_id), 100)
.await
.unwrap_err();

let error_str = format!("{:?}", error);
assert_eq!(
error_str,
"Custom { kind: Other, error: ErrorResponse(412, \"\") }"
);

// Meet precondition on server side
client.produce_blocks(100, None).await.unwrap();

// Issue request again
let result = client
.balance_with_required_block_header(&owner, Some(&asset_id), 100)
.await;

assert!(result.is_ok());
}
Loading