-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC Consistency Proposal #2473
base: master
Are you sure you want to change the base?
RPC Consistency Proposal #2473
Changes from 4 commits
a8f5f41
778dd7c
d1b1c33
b77506c
b764cd4
d9091e6
90c411f
625bf8f
085d173
a98ef9b
c95f09a
6099651
3b81537
11e6876
88b9c52
c5f82ff
844e4e6
71f8087
cbea694
da7f057
f5e63f6
5f4500b
37dd63e
9913620
f849eb5
1fddd80
e1f71d4
be7f927
0b1e31f
8e1b0a5
abe13c2
eae1e22
0a3da1f
e7e1db7
6b76769
b6f206d
2f52aaa
cd3be50
dbc5cdc
c5db12f
24b8d69
4a8d839
51ec4a9
37b03f1
396a953
af0c6d6
b1e89d6
6e50f3b
dcec472
8c6af1f
818a717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ use crate::{ | |
view_extension::ViewExtension, | ||
Config, | ||
}, | ||
graphql_api, | ||
graphql_api::{ | ||
self, | ||
}, | ||
schema::{ | ||
CoreSchema, | ||
CoreSchemaBuilder, | ||
|
@@ -42,6 +44,13 @@ use axum::{ | |
ACCESS_CONTROL_ALLOW_ORIGIN, | ||
}, | ||
HeaderValue, | ||
Request as AxumRequest, | ||
Response as AxumResponse, | ||
netrome marked this conversation as resolved.
Show resolved
Hide resolved
|
||
StatusCode, | ||
}, | ||
middleware::{ | ||
self, | ||
Next, | ||
}, | ||
response::{ | ||
sse::Event, | ||
|
@@ -88,7 +97,10 @@ use tower_http::{ | |
pub type Service = fuel_core_services::ServiceRunner<GraphqlService>; | ||
|
||
pub use super::database::ReadDatabase; | ||
use super::ports::worker; | ||
use super::{ | ||
ports::worker, | ||
worker_service::LAST_KNOWN_BLOCK_HEIGHT, | ||
}; | ||
|
||
pub type BlockProducer = Box<dyn BlockProducerPort>; | ||
// In the future GraphQL should not be aware of `TxPool`. It should | ||
|
@@ -214,6 +226,66 @@ impl RunnableTask for Task { | |
} | ||
} | ||
|
||
const REQUIRED_FUEL_BLOCK_HEIGHT_HEADER: &str = "REQUIRED_FUEL_BLOCK_HEIGHT"; | ||
const CURRENT_FUEL_BLOCK_HEIGHT_HEADER: &str = "CURRENT_FUEL_BLOCK_HEIGHT"; | ||
|
||
async fn required_fuel_block_height<B>( | ||
req: AxumRequest<B>, | ||
next: Next<B>, | ||
) -> impl IntoResponse { | ||
acerone85 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let last_known_block_height: BlockHeight = LAST_KNOWN_BLOCK_HEIGHT | ||
.get() | ||
//Maybe too strict? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find it weird that this can be unset. We could always default to 0 as well, which would just mean that we're not in sync yet (which I presume we're not if this value hasn't been set). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be unset until the first block is processed. But if we are going to fetch the value directly from the DB then it won't be the case anymore. I am waiting to hear what approach @xgreenx prefers before refactoring this bit |
||
.ok_or(StatusCode::INTERNAL_SERVER_ERROR)? | ||
.load(std::sync::atomic::Ordering::Acquire) | ||
.into(); | ||
|
||
let Some(required_fuel_block_height_header) = req | ||
.headers() | ||
.get(REQUIRED_FUEL_BLOCK_HEIGHT_HEADER) | ||
.map(|value| value.to_str()) | ||
else { | ||
// Header is not present, so we don't have any requirements. | ||
let mut response = next.run(req).await; | ||
add_current_fuel_block_height_header_to_response( | ||
&mut response, | ||
&last_known_block_height, | ||
); | ||
|
||
return Ok(response); | ||
}; | ||
|
||
let raw_required_fuel_block_height = | ||
required_fuel_block_height_header.map_err(|_err| StatusCode::BAD_REQUEST)?; | ||
|
||
let required_fuel_block_height: BlockHeight = raw_required_fuel_block_height | ||
.parse::<u32>() | ||
.map_err(|_| StatusCode::BAD_REQUEST)? | ||
.into(); | ||
|
||
if required_fuel_block_height > last_known_block_height { | ||
Err(StatusCode::PRECONDITION_FAILED) | ||
} else { | ||
let mut response = next.run(req).await; | ||
add_current_fuel_block_height_header_to_response( | ||
&mut response, | ||
&last_known_block_height, | ||
); | ||
|
||
Ok(response) | ||
} | ||
} | ||
|
||
fn add_current_fuel_block_height_header_to_response<Body>( | ||
response: &mut AxumResponse<Body>, | ||
last_known_block_height: &BlockHeight, | ||
) { | ||
response.headers_mut().insert( | ||
CURRENT_FUEL_BLOCK_HEIGHT_HEADER, | ||
HeaderValue::from_str(&last_known_block_height.to_string()).unwrap(), | ||
); | ||
} | ||
|
||
// Need a separate Data Object for each Query endpoint, cannot be avoided | ||
#[allow(clippy::too_many_arguments)] | ||
pub fn new_service<OnChain, OffChain>( | ||
|
@@ -287,6 +359,7 @@ where | |
.route( | ||
graphql_endpoint, | ||
post(graphql_handler) | ||
.layer(middleware::from_fn(required_fuel_block_height)) | ||
.layer(ConcurrencyLimitLayer::new(concurrency_limit)) | ||
.options(ok), | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,11 +100,18 @@ use futures::{ | |
use std::{ | ||
borrow::Cow, | ||
ops::Deref, | ||
sync::{ | ||
atomic::AtomicU32, | ||
OnceLock, | ||
}, | ||
}; | ||
|
||
#[cfg(test)] | ||
mod tests; | ||
|
||
// The last known block height that was processed by the GraphQL service. | ||
pub static LAST_KNOWN_BLOCK_HEIGHT: OnceLock<AtomicU32> = OnceLock::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: remove Static and store in Service? Should still be accessible from middleware There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this so performance critical that we can't do a database lookup for this value? I'd default to querying this from the database, to ensure we don't have any weird consistency issues when this field and the db isn't in sync. I guess this isn't a big problem right now, but it just doesn't feel right to allow for inconsistencies in a field used to synchronize with other nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't use static. Each extension has access to the context and you can get You can check how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that we should fetch the value from the DB (it is probably going to be fetched from the memtable anyway, since it's frequently updated). I don't agree that we should do this with
diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs
index b6b95228e8..956d5961bb 100644
--- a/crates/fuel-core/src/schema/balance.rs
+++ b/crates/fuel-core/src/schema/balance.rs
@@ -60,6 +60,10 @@ impl BalanceQuery {
#[graphql(desc = "address of the owner")] owner: Address,
#[graphql(desc = "asset_id of the coin")] asset_id: AssetId,
) -> async_graphql::Result<Balance> {
+ println!(
+ "header set: {}",
+ ctx.http_header_contains("REQUIRED_FUEL_BLOCK_HEIGHT")
+ );
let query = ctx.read_view()?;
let base_asset_id = *ctx
.data_unchecked::<ConsensusProvider>()
What I think we should do instead is stick with Axum's middleware: get a handle to the OnchainDatabase from the CombinedDatabase (just because I don't want to wrap the instance to the CombinedDatabase inside an Arc), and use it to fetch the value of the last height in the middleware. Let me know what you think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry completely lost this discussion, I think this makes sense. I guess you've already implemented this. Re-reviewing now... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for context: Headers cannot be used anymore, the reason being that subscriptions will return multiple graphql responses in a single HTTP response (and therefore have a single HeaderMap), while we require that each item in the response has its associated in the end graphql has a (very undocumented) way to add an |
||
|
||
#[derive(Debug, Clone)] | ||
pub enum DaCompressionConfig { | ||
Disabled, | ||
|
@@ -186,6 +193,11 @@ where | |
// update the importer metrics after the block is successfully committed | ||
graphql_metrics().total_txs_count.set(total_tx_count as i64); | ||
|
||
// update the block height recorded in memory | ||
LAST_KNOWN_BLOCK_HEIGHT | ||
.get_or_init(|| AtomicU32::new(**height)) | ||
.store(**height, std::sync::atomic::Ordering::Release); | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
use fuel_core::{ | ||
chain_config::StateConfig, | ||
service::{ | ||
Config, | ||
FuelService, | ||
}, | ||
}; | ||
use fuel_core_client::client::{ | ||
types::primitives::{ | ||
Address, | ||
AssetId, | ||
}, | ||
FuelClient, | ||
}; | ||
|
||
#[tokio::test] | ||
async fn balance_with_block_height_header() { | ||
let owner = Address::default(); | ||
let asset_id = AssetId::BASE; | ||
|
||
// setup config | ||
let state_config = StateConfig::default(); | ||
let config = Config::local_node_with_state_config(state_config); | ||
|
||
// setup server & client | ||
let srv = FuelService::new_node(config).await.unwrap(); | ||
let client = FuelClient::from(srv.bound_address); | ||
|
||
// Issue a request with wrong precondition | ||
let error = client | ||
.balance_with_required_block_header(&owner, Some(&asset_id), 100) | ||
.await | ||
.unwrap_err(); | ||
|
||
let error_str = format!("{:?}", error); | ||
assert_eq!( | ||
error_str, | ||
"Custom { kind: Other, error: ErrorResponse(412, \"\") }" | ||
); | ||
|
||
// Meet precondition on server side | ||
client.produce_blocks(100, None).await.unwrap(); | ||
|
||
// Issue request again | ||
let result = client | ||
.balance_with_required_block_header(&owner, Some(&asset_id), 100) | ||
.await; | ||
|
||
assert!(result.is_ok()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: HeadersMap here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this one https://docs.rs/http/1.2.0/http/header/struct.HeaderMap.html - makes sense 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to mention this is done, see d9091e6