Skip to content

Commit

Permalink
iterator: return NextRowError from next() methods
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Dec 27, 2024
1 parent 6c416a4 commit c4b17fe
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
14 changes: 7 additions & 7 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::statement::{prepared_statement::PreparedStatement, query::Query};
use crate::statement::{Consistency, PagingState, SerialConsistency};
use crate::transport::cluster::ClusterData;
use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse};
use crate::transport::errors::{QueryError, UserRequestError};
use crate::transport::errors::UserRequestError;
use crate::transport::load_balancing::{self, RoutingInfo};
use crate::transport::metrics::Metrics;
use crate::transport::retry_policy::{RequestInfo, RetryDecision, RetrySession};
Expand Down Expand Up @@ -592,11 +592,11 @@ impl QueryPager {
/// borrows from self.
///
/// This is cancel-safe.
async fn next(&mut self) -> Option<Result<ColumnIterator, QueryError>> {
async fn next(&mut self) -> Option<Result<ColumnIterator, NextRowError>> {
let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await;
match res {
Some(Ok(())) => {}
Some(Err(err)) => return Some(Err(err.into())),
Some(Err(err)) => return Some(Err(err)),
None => return None,
}

Expand All @@ -605,7 +605,7 @@ impl QueryPager {
self.current_page
.next()
.unwrap()
.map_err(|err| NextRowError::RowDeserializationError(err).into()),
.map_err(NextRowError::RowDeserializationError),
)
}

Expand Down Expand Up @@ -1043,14 +1043,14 @@ impl<RowT> Stream for TypedRowStream<RowT>
where
RowT: DeserializeOwnedRow,
{
type Item = Result<RowT, QueryError>;
type Item = Result<RowT, NextRowError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next_fut = async {
self.raw_row_lending_stream.next().await.map(|res| {
res.and_then(|column_iterator| {
<RowT as DeserializeRow>::deserialize(column_iterator)
.map_err(|err| NextRowError::RowDeserializationError(err).into())
.map_err(NextRowError::RowDeserializationError)
})
})
};
Expand Down Expand Up @@ -1183,7 +1183,7 @@ mod legacy {
pub enum LegacyNextRowError {
/// Query to fetch next page has failed
#[error(transparent)]
QueryError(#[from] QueryError),
NextRowError(#[from] NextRowError),

/// Parsing values in row as given types failed
#[error(transparent)]
Expand Down
7 changes: 6 additions & 1 deletion scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));

Expand All @@ -844,6 +845,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));

Expand Down Expand Up @@ -986,7 +988,9 @@ where
pager.rows_stream::<R>().map_err(convert_typecheck_error)?;
Ok::<_, QueryError>(stream)
};
fut.into_stream().try_flatten()
fut.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
}

async fn query_keyspaces(
Expand Down Expand Up @@ -1740,6 +1744,7 @@ async fn query_table_partitioners(
Ok::<_, QueryError>(stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten();

let result = rows
Expand Down

0 comments on commit c4b17fe

Please sign in to comment.