Skip to content

Commit

Permalink
Extend SDK Producer with optional retries in case of send messages fa…
Browse files Browse the repository at this point in the history
…ilure (#1485)

Adds new method for `IggyProducerBuilder` -> `send_retries()` which allows to specify an optional number of retries and the interval between them, in case of failure when sending the messages (e.g. client disconnection or server rejecting the messages). It also removes the existing `reconnection_retry_interval()` from the builder, and unifies the retries policy under the new method.
  • Loading branch information
spetz authored Feb 3, 2025
1 parent 081d882 commit e566b46
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.100"
version = "0.6.101"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
18 changes: 9 additions & 9 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct IggyConsumer {
store_after_every_nth_message: u64,
last_polled_at: Arc<AtomicU64>,
current_partition_id: Arc<AtomicU32>,
retry_interval: IggyDuration,
reconnection_retry_interval: IggyDuration,
allow_replay: bool,
}

Expand Down Expand Up @@ -160,7 +160,7 @@ impl IggyConsumer {
},
last_polled_at: Arc::new(AtomicU64::new(0)),
current_partition_id: Arc::new(AtomicU32::new(0)),
retry_interval,
reconnection_retry_interval: retry_interval,
allow_replay,
}
}
Expand Down Expand Up @@ -498,7 +498,7 @@ impl IggyConsumer {
let interval = self.poll_interval_micros;
let last_polled_at = self.last_polled_at.clone();
let can_poll = self.can_poll.clone();
let retry_interval = self.retry_interval;
let retry_interval = self.reconnection_retry_interval;
let last_stored_offset = self.last_stored_offsets.clone();
let last_consumed_offset = self.last_consumed_offsets.clone();
let allow_replay = self.allow_replay;
Expand Down Expand Up @@ -867,7 +867,7 @@ pub struct IggyConsumerBuilder {
auto_join_consumer_group: bool,
create_consumer_group_if_not_exists: bool,
encryptor: Option<Arc<EncryptorKind>>,
retry_interval: IggyDuration,
reconnection_retry_interval: IggyDuration,
allow_replay: bool,
}

Expand Down Expand Up @@ -900,7 +900,7 @@ impl IggyConsumerBuilder {
create_consumer_group_if_not_exists: true,
encryptor,
polling_interval,
retry_interval: IggyDuration::ONE_SECOND,
reconnection_retry_interval: IggyDuration::ONE_SECOND,
allow_replay: false,
}
}
Expand Down Expand Up @@ -1005,10 +1005,10 @@ impl IggyConsumerBuilder {
}
}

/// Sets the retry interval in case of server disconnection.
pub fn retry_interval(self, interval: IggyDuration) -> Self {
/// Sets the reconnection retry interval in case of server disconnection.
pub fn reconnection_retry_interval(self, interval: IggyDuration) -> Self {
Self {
retry_interval: interval,
reconnection_retry_interval: interval,
..self
}
}
Expand Down Expand Up @@ -1039,7 +1039,7 @@ impl IggyConsumerBuilder {
self.auto_join_consumer_group,
self.create_consumer_group_if_not_exists,
self.encryptor,
self.retry_interval,
self.reconnection_retry_interval,
self.allow_replay,
)
}
Expand Down
178 changes: 143 additions & 35 deletions sdk/src/clients/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures_util::StreamExt;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::{sleep, Interval};
use tracing::{error, info, trace, warn};

const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst;
Expand Down Expand Up @@ -47,7 +47,8 @@ pub struct IggyProducer {
default_partitioning: Arc<Partitioning>,
can_send_immediately: bool,
last_sent_at: Arc<AtomicU64>,
retry_interval: IggyDuration,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
}

impl IggyProducer {
Expand All @@ -69,7 +70,8 @@ impl IggyProducer {
topic_replication_factor: Option<u8>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
retry_interval: IggyDuration,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
) -> Self {
Self {
initialized: false,
Expand All @@ -93,7 +95,8 @@ impl IggyProducer {
default_partitioning: Arc::new(Partitioning::balanced()),
can_send_immediately: interval.is_none(),
last_sent_at: Arc::new(AtomicU64::new(0)),
retry_interval,
send_retries_count,
send_retries_interval,
}
}

Expand Down Expand Up @@ -215,11 +218,6 @@ impl IggyProducer {
return Ok(());
}

if !self.can_send.load(ORDERING) {
trace!("Trying to send messages in {}...", self.retry_interval);
sleep(self.retry_interval.get_duration()).await;
}

if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, None)
Expand Down Expand Up @@ -249,11 +247,6 @@ impl IggyProducer {
return Ok(());
}

if !self.can_send.load(ORDERING) {
trace!("Trying to send messages in {}...", self.retry_interval);
sleep(self.retry_interval.get_duration()).await;
}

if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, partitioning)
Expand Down Expand Up @@ -281,11 +274,6 @@ impl IggyProducer {
return Ok(());
}

if !self.can_send.load(ORDERING) {
trace!("Trying to send messages in {}...", self.retry_interval);
sleep(self.retry_interval.get_duration()).await;
}

if self.can_send_immediately {
return self
.send_immediately(&self.stream_id, &self.topic_id, messages, partitioning)
Expand Down Expand Up @@ -324,9 +312,7 @@ impl IggyProducer {
);
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
let client = self.client.read().await;
client
.send_messages(&self.stream_id, &self.topic_id, &partitioning, batch)
self.try_send_messages(&self.stream_id, &self.topic_id, &partitioning, batch)
.await?;
trace!("Sent {messages_count} messages ({current_batch}/{batches_count} batch(es)).");
current_batch += 1;
Expand All @@ -345,21 +331,18 @@ impl IggyProducer {
self.encrypt_messages(&mut messages)?;
let partitioning = self.get_partitioning(stream, topic, &messages, partitioning)?;
let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE);
let client = self.client.read().await;
if messages.len() <= batch_size {
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
client
.send_messages(stream, topic, &partitioning, &mut messages)
self.try_send_messages(stream, topic, &partitioning, &mut messages)
.await?;
return Ok(());
}

for batch in messages.chunks_mut(batch_size) {
self.last_sent_at
.store(IggyTimestamp::now().into(), ORDERING);
client
.send_messages(stream, topic, &partitioning, batch)
self.try_send_messages(stream, topic, &partitioning, batch)
.await?;
}
Ok(())
Expand Down Expand Up @@ -392,6 +375,125 @@ impl IggyProducer {
Ok(())
}

async fn try_send_messages(
&self,
stream: &Identifier,
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [Message],
) -> Result<(), IggyError> {
let client = self.client.read().await;
let Some(max_retries) = self.send_retries_count else {
return client
.send_messages(stream, topic, partitioning, messages)
.await;
};

if max_retries == 0 {
return client
.send_messages(stream, topic, partitioning, messages)
.await;
}

let mut timer = if let Some(interval) = self.send_retries_interval {
let mut timer = tokio::time::interval(interval.get_duration());
timer.tick().await;
Some(timer)
} else {
None
};

self.wait_until_connected(max_retries, stream, topic, &mut timer)
.await?;
self.send_with_retries(
max_retries,
stream,
topic,
partitioning,
messages,
&mut timer,
)
.await
}

async fn wait_until_connected(
&self,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
let mut retries = 0;
while !self.can_send.load(ORDERING) {
retries += 1;
if retries > max_retries {
error!(
"Failed to send messages to topic: {topic}, stream: {stream} \
after {max_retries} retries. Client is disconnected."
);
return Err(IggyError::CannotSendMessagesDueToClientDisconnection);
}

error!(
"Trying to send messages to topic: {topic}, stream: {stream} \
but the client is disconnected. Retrying {retries}/{max_retries}..."
);

if let Some(timer) = timer.as_mut() {
trace!(
"Waiting for the next retry to send messages to topic: {topic}, \
stream: {stream} for disconnected client..."
);
timer.tick().await;
}
}
Ok(())
}

async fn send_with_retries(
&self,
max_retries: u32,
stream: &Identifier,
topic: &Identifier,
partitioning: &Arc<Partitioning>,
messages: &mut [Message],
timer: &mut Option<Interval>,
) -> Result<(), IggyError> {
let client = self.client.read().await;
let mut retries = 0;
loop {
match client
.send_messages(stream, topic, partitioning, messages)
.await
{
Ok(_) => return Ok(()),
Err(error) => {
retries += 1;
if retries > max_retries {
error!(
"Failed to send messages to topic: {topic}, stream: {stream} \
after {max_retries} retries. {error}."
);
return Err(error);
}

error!(
"Failed to send messages to topic: {topic}, stream: {stream}. \
{error} Retrying {retries}/{max_retries}..."
);

if let Some(t) = timer.as_mut() {
trace!(
"Waiting for the next retry to send messages to topic: {topic}, \
stream: {stream}..."
);
t.tick().await;
}
}
}
}
}

fn get_partitioning(
&self,
stream: &Identifier,
Expand Down Expand Up @@ -430,9 +532,10 @@ pub struct IggyProducerBuilder {
create_topic_if_not_exists: bool,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
retry_interval: IggyDuration,
pub topic_message_expiry: IggyExpiry,
pub topic_max_size: MaxTopicSize,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
topic_message_expiry: IggyExpiry,
topic_max_size: MaxTopicSize,
}

impl IggyProducerBuilder {
Expand Down Expand Up @@ -461,9 +564,10 @@ impl IggyProducerBuilder {
create_topic_if_not_exists: true,
topic_partitions_count: 1,
topic_replication_factor: None,
retry_interval: IggyDuration::ONE_SECOND,
topic_message_expiry: IggyExpiry::ServerDefault,
topic_max_size: MaxTopicSize::ServerDefault,
send_retries_count: Some(3),
send_retries_interval: Some(IggyDuration::ONE_SECOND),
}
}

Expand Down Expand Up @@ -603,10 +707,13 @@ impl IggyProducerBuilder {
}
}

/// Sets the retry interval in case of server disconnection.
pub fn retry_interval(self, interval: IggyDuration) -> Self {
/// Sets the retry policy (maximum number of retries and interval between them) in case of messages sending failure.
/// The error can be related either to disconnecting from the server or to the server rejecting the messages.
/// Default is 3 retries with 1 second interval between them.
pub fn send_retries(self, retries: Option<u32>, interval: Option<IggyDuration>) -> Self {
Self {
retry_interval: interval,
send_retries_count: retries,
send_retries_interval: interval,
..self
}
}
Expand All @@ -632,7 +739,8 @@ impl IggyProducerBuilder {
self.topic_replication_factor,
self.topic_message_expiry,
self.topic_max_size,
self.retry_interval,
self.send_retries_count,
self.send_retries_interval,
)
}
}
2 changes: 2 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ pub enum IggyError {
InvalidKeyValueLength = 4028,
#[error("Command length error: {0}")]
CommandLengthError(String) = 4029,
#[error("Cannot sed messages due to client disconnection")]
CannotSendMessagesDueToClientDisconnection = 4050,
#[error("Invalid offset: {0}")]
InvalidOffset(u64) = 4100,
#[error("Consumer group with ID: {0} for topic with ID: {1} was not found.")]
Expand Down

0 comments on commit e566b46

Please sign in to comment.