Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codewide: small performance optimisations #1169

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ where
let retry_decision = self.retry_session.decide_should_retry(query_info);
trace!(
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
retry_decision = ?retry_decision
);

last_error = request_error.into_query_error();
Expand Down Expand Up @@ -856,7 +856,7 @@ impl QueryPager {
serialized_values_size,
);
if let Some(replicas) = replicas.as_ref() {
span.record_replicas(replicas);
span.record_replicas(replicas.iter().map(|(node, shard)| (node, *shard)));
}
span
};
Expand Down
8 changes: 3 additions & 5 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,10 +1426,8 @@ where
if !span.span().is_disabled() {
if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
let cluster_data = self.get_cluster_data();
let replicas: smallvec::SmallVec<[_; 8]> = cluster_data
.get_token_endpoints_iter(table_spec, token)
.collect();
span.record_replicas(&replicas)
let replicas = cluster_data.get_token_endpoints_iter(table_spec, token);
span.record_replicas(replicas)
}
}

Expand Down Expand Up @@ -2096,7 +2094,7 @@ where
let retry_decision = context.retry_session.decide_should_retry(query_info);
trace!(
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
retry_decision = ?retry_decision
);

last_error = Some(request_error.into_query_error());
Expand Down
31 changes: 12 additions & 19 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::policies::host_filter::HostFilter;
use crate::routing::Token;
use crate::statement::query::Query;
use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState};
use crate::utils::pretty::{CommaSeparatedDisplayer, DisplayUsingDebug};

use futures::future::{self, FutureExt};
use futures::stream::{self, StreamExt, TryStreamExt};
Expand All @@ -37,8 +38,7 @@ use scylla_macros::DeserializeRow;
use std::borrow::BorrowMut;
use std::cell::Cell;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::fmt::{self, Formatter};
use std::net::{IpAddr, SocketAddr};
use std::num::NonZeroUsize;
use std::str::FromStr;
Expand Down Expand Up @@ -562,11 +562,7 @@ impl MetadataReader {
self.known_peers.shuffle(&mut thread_rng());
debug!(
"Known peers: {}",
self.known_peers
.iter()
.map(|endpoint| format!("{:?}", endpoint))
.collect::<Vec<String>>()
.join(", ")
CommaSeparatedDisplayer(self.known_peers.iter().map(DisplayUsingDebug))
);

let address_of_failed_control_connection = self.control_connection_endpoint.address();
Expand Down Expand Up @@ -633,11 +629,9 @@ impl MetadataReader {
};

warn!(
control_connection_address = self
control_connection_address = tracing::field::display(self
.control_connection_endpoint
.address()
.to_string()
.as_str(),
.address()),
error = %err,
"Failed to fetch metadata using current control connection"
);
Expand Down Expand Up @@ -701,11 +695,9 @@ impl MetadataReader {
// and print an error message about this fact
if !metadata.peers.is_empty() && self.known_peers.is_empty() {
error!(
node_ips = ?metadata
.peers
.iter()
.map(|peer| peer.address)
.collect::<Vec<_>>(),
node_ips = tracing::field::display(CommaSeparatedDisplayer(
metadata.peers.iter().map(|peer| peer.address)
)),
"The host filter rejected all nodes in the cluster, \
no connections that can serve user queries have been \
established. The session cannot serve any queries!"
Expand All @@ -721,12 +713,12 @@ impl MetadataReader {
if let Some(peer) = control_connection_peer {
if !self.host_filter.as_ref().map_or(true, |f| f.accept(peer)) {
warn!(
filtered_node_ips = ?metadata
filtered_node_ips = tracing::field::display(CommaSeparatedDisplayer(metadata
.peers
.iter()
.filter(|peer| self.host_filter.as_ref().map_or(true, |p| p.accept(peer)))
.map(|peer| peer.address)
.collect::<Vec<_>>(),
)),
control_connection_address = ?self.control_connection_endpoint.address(),
"The node that the control connection is established to \
is not accepted by the host filter. Please verify that \
Expand Down Expand Up @@ -884,9 +876,10 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe

let peers = translated_peers_futures
.buffer_unordered(256)
.try_filter_map(|x| std::future::ready(Ok(x)))
.try_collect::<Vec<_>>()
.await?;
Ok(peers.into_iter().flatten().collect())
Ok(peers)
}

async fn create_peer_from_row(
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl ClusterState {
&self,
table_spec: &TableSpec,
token: Token,
) -> impl Iterator<Item = (NodeRef<'_>, Shard)> {
) -> impl Iterator<Item = (NodeRef<'_>, Shard)> + Clone {
let keyspace = self.keyspaces.get(table_spec.ks_name());
let strategy = keyspace
.map(|k| &k.strategy)
Expand Down
12 changes: 5 additions & 7 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(feature = "cloud")]
use crate::cloud::set_ssl_config_for_scylla_cloud_host;
use crate::utils::pretty::CommaSeparatedDisplayer;

use super::connection::{
open_connection, open_connection_to_shard_aware_port, Connection, ConnectionConfig,
Expand Down Expand Up @@ -358,12 +359,9 @@ impl NodeConnectionPool {

fn choose_random_connection_from_slice(v: &[Arc<Connection>]) -> Option<Arc<Connection>> {
trace!(
connections = v
.iter()
.map(|conn| conn.get_connect_address().to_string())
.collect::<Vec<String>>()
.join(",")
.as_str(),
connections = tracing::field::display(CommaSeparatedDisplayer(
v.iter().map(|conn| conn.get_connect_address())
)),
"Available"
);
if v.is_empty() {
Expand Down Expand Up @@ -587,7 +585,7 @@ impl PoolRefiller {
}
}
trace!(
pool_state = format!("{:?}", ShardedConnectionVectorWrapper(&self.conns)).as_str()
pool_state = ?ShardedConnectionVectorWrapper(&self.conns)
);

// Schedule refilling here
Expand Down
28 changes: 14 additions & 14 deletions scylla/src/observability/driver_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,26 @@ impl RequestSpan {
}
}

pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [(impl Borrow<Arc<Node>>, Shard)]) {
struct ReplicaIps<'a, N>(&'a [(N, Shard)]);
impl<N> Display for ReplicaIps<'_, N>
pub(crate) fn record_replicas<'a>(
&'a self,
replicas: impl Iterator<Item = (impl Borrow<Arc<Node>> + 'a, Shard)> + Clone,
) {
struct Replica<N>(N, Shard);
impl<N> Display for Replica<N>
where
N: Borrow<Arc<Node>>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut nodes_with_shards = self.0.iter();
if let Some((node, shard)) = nodes_with_shards.next() {
write!(f, "{}-shard{}", node.borrow().address.ip(), shard)?;

for (node, shard) in nodes_with_shards {
write!(f, ",{}-shard{}", node.borrow().address.ip(), shard)?;
}
}
Ok(())
let Self(node, shard) = self;
write!(f, "{}-shard{}", node.borrow().address.ip(), shard)
}
}
self.span
.record("replicas", tracing::field::display(&ReplicaIps(replicas)));
self.span.record(
"replicas",
tracing::field::display(CommaSeparatedDisplayer(
replicas.map(|(node, shard)| Replica(node, shard)),
)),
);
}

pub(crate) fn record_request_size(&self, size: usize) {
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/routing/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ impl<'a> IntoIterator for ReplicaSet<'a> {
}
}

#[derive(Clone)]
enum ReplicaSetIteratorInner<'a> {
/// Token ring with SimpleStrategy, any datacenter
Plain {
Expand Down Expand Up @@ -502,6 +503,7 @@ enum ReplicaSetIteratorInner<'a> {
}

/// Iterator that returns replicas from some replica set.
#[derive(Clone)]
pub struct ReplicaSetIterator<'a> {
inner: ReplicaSetIteratorInner<'a>,
token: Token,
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/routing/locator/replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{ops::Index, sync::Arc};
///
/// This type is very similar to `Cow`, but unlike `Cow`,
/// it holds references in an `Owned` variant `Vec`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ReplicasArray<'a> {
Borrowed(&'a [Arc<Node>]),
Owned(Vec<NodeRef<'a>>),
Expand Down
5 changes: 3 additions & 2 deletions scylla/src/routing/locator/replication_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ impl ReplicationInfo {

let unique_nodes_in_global_ring = global_ring
.iter()
.map(|(_t, n)| n.clone())
.map(|(_t, n)| n)
.unique()
.cloned()
.collect();

let mut datacenter_nodes: HashMap<&str, Vec<(Token, Arc<Node>)>> = HashMap::new();
Expand All @@ -82,7 +83,7 @@ impl ReplicationInfo {
for (datacenter_name, this_datacenter_nodes) in datacenter_nodes {
let dc_ring = TokenRing::new(this_datacenter_nodes.into_iter());
let unique_nodes_in_dc_ring =
dc_ring.iter().map(|(_t, n)| n.clone()).unique().collect();
dc_ring.iter().map(|(_t, n)| n).unique().cloned().collect();
// When counting racks consider None as a separate rack
let rack_count: usize = dc_ring
.iter()
Expand Down
7 changes: 7 additions & 0 deletions scylla/src/utils/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ where
}
}

pub(crate) struct DisplayUsingDebug<T>(pub(crate) T);
impl<T: std::fmt::Debug> std::fmt::Display for DisplayUsingDebug<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<T as std::fmt::Debug>::fmt(&self.0, f)
}
}

#[cfg(test)]
mod tests {
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
Expand Down
Loading