Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added ability to specify backup rpc for connecting to the network #28

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/specify_backup_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use near_api::{prelude::*, types::reference::Reference};

#[tokio::main]
async fn main() {
let mut network = NetworkConfig::mainnet();
network.rpc_endpoints.push(
RPCEndpoint::new("https://rpc.mainnet.pagoda.co/".parse().unwrap())
.with_api_key("potential api key".parse().unwrap())
.with_retries(5),
);
// Query latest block
let _block = Chain::block()
.at(Reference::Optimistic)
.fetch_from_mainnet()
.await
.unwrap();
}
97 changes: 28 additions & 69 deletions src/common/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use serde::de::DeserializeOwned;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::{
common::utils::{retry, RetryResponse},
config::NetworkConfig,
config::{retry, NetworkConfig, RetryResponse},
errors::QueryError,
types::Data,
};
Expand Down Expand Up @@ -155,9 +154,6 @@ where
reference: Reference,
requests: Vec<Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>>,
handler: ResponseHandler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> MultiRpcBuilder<Handler, Method, Reference>
Expand All @@ -173,10 +169,6 @@ where
reference,
requests: vec![],
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand Down Expand Up @@ -205,8 +197,6 @@ where
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Method> {
let json_rpc_client = network.json_rpc_client();

debug!(target: QUERY_EXECUTOR_TARGET, "Preparing queries");
let requests: Vec<_> = self
.requests
Expand All @@ -219,32 +209,27 @@ where
.collect::<Result<_, _>>()?;

info!(target: QUERY_EXECUTOR_TARGET, "Sending {} queries", requests.len());
let requests = requests.into_iter().map(|(query, request)| {
let json_rpc_client = json_rpc_client.clone();
async move {
retry(
|| async {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if request.is_critical_error(&err) => {
RetryResponse::Critical(err)
}
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
);
let requests = requests.into_iter().map(|(query, request)| async move {
retry(network.clone(), |json_rpc_client| {
let query = &query;
let request = &request;

async move {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if request.is_critical_error(&err) => RetryResponse::Critical(err),
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
target: QUERY_EXECUTOR_TARGET,
"Querying RPC with {:?} resulted in {:?}",
query,
result
},
self.retries,
self.sleep_duration,
self.exponential_backoff,
)
.await
}
);
result
}
})
.await
});

let requests: Vec<_> = join_all(requests)
Expand Down Expand Up @@ -275,9 +260,6 @@ pub struct RpcBuilder<Handler, Method, Reference> {
reference: Reference,
request: Arc<dyn QueryCreator<Method, RpcReference = Reference> + Send + Sync>,
handler: Handler,
retries: u8,
sleep_duration: std::time::Duration,
exponential_backoff: bool,
}

impl<Handler, Method, Reference> RpcBuilder<Handler, Method, Reference>
Expand All @@ -297,10 +279,6 @@ where
reference,
request: Arc::new(request),
handler,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

Expand All @@ -311,37 +289,21 @@ where
}
}

pub const fn with_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}

pub const fn with_sleep_duration(mut self, sleep_duration: std::time::Duration) -> Self {
self.sleep_duration = sleep_duration;
self
}

pub const fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

#[instrument(skip(self, network))]
pub async fn fetch_from(
self,
network: &NetworkConfig,
) -> ResultWithMethod<Handler::Response, Method> {
debug!(target: QUERY_EXECUTOR_TARGET, "Preparing query");
let json_rpc_client = network.json_rpc_client();
let query = self.request.create_query(network, self.reference)?;

let query_response = retry(
|| async {
let query_response = retry(network.clone(), |json_rpc_client| {
let query = &query;
let request = &self.request;
async move {
let result = match json_rpc_client.call(&query).await {
Ok(result) => RetryResponse::Ok(result),
Err(err) if self.request.is_critical_error(&err) => {
RetryResponse::Critical(err)
}
Err(err) if request.is_critical_error(&err) => RetryResponse::Critical(err),
Err(err) => RetryResponse::Retry(err),
};
tracing::debug!(
Expand All @@ -351,11 +313,8 @@ where
result
);
result
},
3,
std::time::Duration::from_secs(1),
false,
)
}
})
.await?;

debug!(target: QUERY_EXECUTOR_TARGET, "Processing query response");
Expand Down
49 changes: 8 additions & 41 deletions src/common/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use reqwest::Response;
use tracing::{debug, info};

use crate::{
common::utils::{is_critical_transaction_error, RetryResponse},
config::NetworkConfig,
common::utils::is_critical_transaction_error,
config::{retry, NetworkConfig, RetryResponse},
errors::{
ExecuteMetaTransactionsError, ExecuteTransactionError, MetaSignError, SignerError,
ValidationError,
Expand All @@ -22,8 +22,7 @@ use crate::{
};

use super::{
signed_delegate_action::SignedDelegateActionAsBase64, utils::retry,
META_TRANSACTION_VALID_FOR_DEFAULT,
signed_delegate_action::SignedDelegateActionAsBase64, META_TRANSACTION_VALID_FOR_DEFAULT,
};

const TX_EXECUTOR_TARGET: &str = "near_api::tx::executor";
Expand Down Expand Up @@ -80,42 +79,20 @@ impl From<SignedTransaction> for PrepopulateTransaction {
pub struct ExecuteSignedTransaction {
pub tr: TransactionableOrSigned<SignedTransaction>,
pub signer: Arc<Signer>,
pub retries: u8,
pub sleep_duration: std::time::Duration,
pub exponential_backoff: bool,
}

impl ExecuteSignedTransaction {
pub fn new<T: Transactionable + 'static>(tr: T, signer: Arc<Signer>) -> Self {
Self {
tr: TransactionableOrSigned::Transactionable(Box::new(tr)),
signer,
retries: 5,
// 50ms, 100ms, 200ms, 400ms, 800ms
sleep_duration: std::time::Duration::from_millis(50),
exponential_backoff: true,
}
}

pub fn meta(self) -> ExecuteMetaTransaction {
ExecuteMetaTransaction::from_box(self.tr.transactionable(), self.signer)
}

pub const fn with_retries(mut self, retries: u8) -> Self {
self.retries = retries;
self
}

pub const fn with_sleep_duration(mut self, sleep_duration: std::time::Duration) -> Self {
self.sleep_duration = sleep_duration;
self
}

pub const fn with_exponential_backoff(mut self) -> Self {
self.exponential_backoff = true;
self
}

pub async fn presign_offline(
mut self,
public_key: PublicKey,
Expand Down Expand Up @@ -169,9 +146,6 @@ impl ExecuteSignedTransaction {
mut self,
network: &NetworkConfig,
) -> Result<FinalExecutionOutcomeView, ExecuteTransactionError> {
let sleep_duration = self.sleep_duration;
let retries = self.retries;

let (signed, transactionable) = match &mut self.tr {
TransactionableOrSigned::Transactionable(tr) => {
debug!(target: TX_EXECUTOR_TARGET, "Preparing unsigned transaction");
Expand Down Expand Up @@ -212,7 +186,7 @@ impl ExecuteSignedTransaction {
signed.transaction.nonce(),
);

Self::send_impl(network, signed, retries, sleep_duration).await
Self::send_impl(network, signed).await
}

pub async fn send_to_mainnet(
Expand All @@ -232,15 +206,11 @@ impl ExecuteSignedTransaction {
async fn send_impl(
network: &NetworkConfig,
signed_tr: SignedTransaction,
retries: u8,
sleep_duration: std::time::Duration,
) -> Result<FinalExecutionOutcomeView, ExecuteTransactionError> {
retry(
|| {
let signed_tr = signed_tr.clone();
async move {
let result = match network
.json_rpc_client()
retry(network.clone(), |json_rpc_client| {
let signed_tr = signed_tr.clone();
async move {
let result = match json_rpc_client
.call(
near_jsonrpc_client::methods::broadcast_tx_commit::RpcBroadcastTxCommitRequest {
signed_transaction: signed_tr.clone(),
Expand All @@ -263,9 +233,6 @@ impl ExecuteSignedTransaction {
result
}
},
retries,
sleep_duration,
false,
)
.await
.map_err(ExecuteTransactionError::TransactionError)
Expand Down
51 changes: 1 addition & 50 deletions src/common/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// https://github.com/near/near-token-rs/blob/3feafec624e7d1028ed00695f2acf87e1d823fa7/src/utils.rs#L1-L49

use crate::errors::{DecimalNumberParsingError, RetryError};
use crate::errors::DecimalNumberParsingError;

/// Parsing decimal numbers from `&str` type in `u128`.
/// Function also takes a value of metric prefix in u128 type.
Expand Down Expand Up @@ -49,55 +49,6 @@ pub fn parse_decimal_number(s: &str, pref_const: u128) -> Result<u128, DecimalNu
Ok(result)
}

#[derive(Debug)]
pub enum RetryResponse<R, E> {
Ok(R),
Retry(E),
Critical(E),
}

impl<R, E> From<Result<R, E>> for RetryResponse<R, E> {
fn from(value: Result<R, E>) -> Self {
match value {
Ok(value) => Self::Ok(value),
Err(value) => Self::Retry(value),
}
}
}

pub async fn retry<R, E, T, F>(
mut task: F,
retries: u8,
initial_sleep: std::time::Duration,
exponential_backoff: bool,
) -> Result<R, RetryError<E>>
where
F: FnMut() -> T + Send,
T: core::future::Future<Output = RetryResponse<R, E>> + Send,
T::Output: Send,
{
let mut retries = (1..=retries).rev();
let mut sleep_duration = initial_sleep;
loop {
let result = task().await;
match result {
RetryResponse::Ok(result) => return Ok(result),
RetryResponse::Retry(_) if retries.next().is_some() => {
tokio::time::sleep(sleep_duration).await;
sleep_duration = if exponential_backoff {
sleep_duration * 2
} else {
sleep_duration
};
}
RetryResponse::Retry(err) => {
return Err(RetryError::RetriesExhausted(err));
}
RetryResponse::Critical(err) => return Err(RetryError::Critical(err)),
}
}
}

pub fn is_critical_blocks_error(
err: &near_jsonrpc_client::errors::JsonRpcError<
near_jsonrpc_primitives::types::blocks::RpcBlockError,
Expand Down
Loading
Loading