Skip to content

Commit

Permalink
Fix consumer group member partition assignments for no partitions (#1480
Browse files Browse the repository at this point in the history
)
  • Loading branch information
spetz authored Feb 2, 2025
1 parent 13c06c5 commit 70b44e6
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 48 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.92"
version = "0.6.93"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/models/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub const POLLED_MESSAGE_METADATA: u32 = 8 + 1 + 8 + 4;
/// - `messages`: the collection of messages.
#[derive(Debug, Serialize, Deserialize)]
pub struct PolledMessages {
/// The identifier of the partition.
/// The identifier of the partition. If it's '0', then there's no partition assigned to the consumer group member.
pub partition_id: u32,
/// The current offset of the partition.
pub current_offset: u64,
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.131"
version = "0.4.140"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
11 changes: 9 additions & 2 deletions server/src/streaming/systems/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@ impl System {
return Err(IggyError::NoPartitions(topic.topic_id, topic.stream_id));
}

let (polling_consumer, partition_id) = topic
// There might be no partition assigned, if it's the consumer group member without any partitions.
let Some((polling_consumer, partition_id)) = topic
.resolve_consumer_with_partition_id(consumer, session.client_id, partition_id, true)
.await
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))?;
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
return Ok(PolledMessages {
messages: vec![],
partition_id: 0,
current_offset: 0,
})
};

let mut polled_messages = topic
.get_messages(polling_consumer, partition_id, args.strategy, args.count)
Expand Down
61 changes: 33 additions & 28 deletions server/src/streaming/topics/consumer_group.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ahash::AHashMap;
use iggy::error::IggyError;
use std::collections::HashMap;
use tokio::sync::RwLock;
use tracing::trace;

Expand All @@ -9,15 +9,15 @@ pub struct ConsumerGroup {
pub group_id: u32,
pub name: String,
pub partitions_count: u32,
members: HashMap<u32, RwLock<ConsumerGroupMember>>,
members: AHashMap<u32, RwLock<ConsumerGroupMember>>,
}

#[derive(Debug)]
pub struct ConsumerGroupMember {
pub id: u32,
partitions: HashMap<u32, u32>,
current_partition_index: u32,
current_partition_id: u32,
partitions: AHashMap<u32, u32>,
current_partition_index: Option<u32>,
current_partition_id: Option<u32>,
}

impl ConsumerGroup {
Expand All @@ -27,7 +27,7 @@ impl ConsumerGroup {
group_id,
name: name.to_string(),
partitions_count,
members: HashMap::new(),
members: AHashMap::new(),
}
}

Expand All @@ -40,7 +40,7 @@ impl ConsumerGroup {
self.assign_partitions().await;
}

pub async fn calculate_partition_id(&self, member_id: u32) -> Result<u32, IggyError> {
pub async fn calculate_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> {
let member = self.members.get(&member_id);
if let Some(member) = member {
return Ok(member.write().await.calculate_partition_id());
Expand All @@ -52,7 +52,7 @@ impl ConsumerGroup {
))
}

pub async fn get_current_partition_id(&self, member_id: u32) -> Result<u32, IggyError> {
pub async fn get_current_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> {
let member = self.members.get(&member_id);
if let Some(member) = member {
return Ok(member.read().await.current_partition_id);
Expand All @@ -69,9 +69,9 @@ impl ConsumerGroup {
member_id,
RwLock::new(ConsumerGroupMember {
id: member_id,
partitions: HashMap::new(),
current_partition_index: 0,
current_partition_id: 0,
partitions: AHashMap::new(),
current_partition_index: None,
current_partition_id: None,
}),
);
trace!(
Expand Down Expand Up @@ -104,8 +104,8 @@ impl ConsumerGroup {
let members_count = members.len() as u32;
for member in members.iter_mut() {
let mut member = member.write().await;
member.current_partition_index = 0;
member.current_partition_id = 0;
member.current_partition_index = None;
member.current_partition_id = None;
member.partitions.clear();
}

Expand All @@ -118,6 +118,10 @@ impl ConsumerGroup {
member
.partitions
.insert(member_partition_index, partition_id);
if member.current_partition_id.is_none() {
member.current_partition_id = Some(partition_id);
member.current_partition_index = Some(member_partition_index);
}
trace!("Assigned partition ID: {} to member with ID: {} for topic with ID: {} in consumer group: {}",
partition_id, member.id, self.topic_id, self.group_id)
}
Expand All @@ -129,30 +133,30 @@ impl ConsumerGroupMember {
self.partitions.values().copied().collect()
}

pub fn calculate_partition_id(&mut self) -> u32 {
let partition_index = self.current_partition_index;
let partition_id = if let Some(partition_id) = self.partitions.get(&partition_index) {
*partition_id
} else {
pub fn calculate_partition_id(&mut self) -> Option<u32> {
let partition_index = self.current_partition_index?;
let Some(partition_id) = self.partitions.get(&partition_index) else {
trace!(
"No partition ID found for index: {} for member with ID: {}.",
partition_index,
self.id
);
return 1;
return None;
};
self.current_partition_id = partition_id;

let partition_id = *partition_id;
self.current_partition_id = Some(partition_id);
if self.partitions.len() <= (partition_index + 1) as usize {
self.current_partition_index = 0;
self.current_partition_index = Some(0);
} else {
self.current_partition_index += 1;
self.current_partition_index = Some(partition_index + 1);
}
trace!(
"Calculated partition ID: {} for member with ID: {}",
partition_id,
self.id
);
partition_id
Some(partition_id)
}
}

Expand All @@ -168,15 +172,16 @@ mod tests {
group_id: 1,
name: "test".to_string(),
partitions_count: 3,
members: HashMap::new(),
members: AHashMap::new(),
};

consumer_group.add_member(member_id).await;
for i in 0..1000 {
let partition_id = consumer_group
.calculate_partition_id(member_id)
.await
.unwrap();
.unwrap()
.expect("Partition ID not found");
assert_eq!(partition_id, (i % consumer_group.partitions_count) + 1);
}
}
Expand All @@ -189,7 +194,7 @@ mod tests {
group_id: 1,
name: "test".to_string(),
partitions_count: 3,
members: HashMap::new(),
members: AHashMap::new(),
};

consumer_group.add_member(member_id).await;
Expand All @@ -214,7 +219,7 @@ mod tests {
group_id: 1,
name: "test".to_string(),
partitions_count: 3,
members: HashMap::new(),
members: AHashMap::new(),
};

consumer_group.add_member(member1_id).await;
Expand Down Expand Up @@ -251,7 +256,7 @@ mod tests {
group_id: 1,
name: "test".to_string(),
partitions_count: 1,
members: HashMap::new(),
members: AHashMap::new(),
};

consumer_group.add_member(member1_id).await;
Expand Down
21 changes: 15 additions & 6 deletions server/src/streaming/topics/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ impl Topic {
partition_id: Option<u32>,
client_id: u32,
) -> Result<(), IggyError> {
let (polling_consumer, partition_id) = self
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(&consumer, client_id, partition_id, false)
.await
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))?;
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))? else {
return Err(IggyError::ConsumerOffsetNotFound(client_id));
};

let partition = self.get_partition(partition_id).with_error_context(|_| {
format!("{COMPONENT} - failed to get partition with id: {partition_id}")
})?;
Expand Down Expand Up @@ -48,10 +51,13 @@ impl Topic {
partition_id: Option<u32>,
client_id: u32,
) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
let (polling_consumer, partition_id) = self
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(consumer, client_id, partition_id, false)
.await
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {client_id}, partition ID: {:?}", partition_id))?;
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {client_id}, partition ID: {:?}", partition_id))? else {
return Ok(None);
};

let partition = self.get_partition(partition_id).with_error_context(|_| {
format!("{COMPONENT} - failed to get partition with id: {partition_id}")
})?;
Expand Down Expand Up @@ -79,10 +85,13 @@ impl Topic {
partition_id: Option<u32>,
client_id: u32,
) -> Result<(), IggyError> {
let (polling_consumer, partition_id) = self
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(&consumer, client_id, partition_id, false)
.await
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))?;
.with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer ID: {}, client ID: {}, partition ID: {:?}", consumer.id, client_id, partition_id))? else {
return Err(IggyError::ConsumerOffsetNotFound(client_id));
};

let partition = self.get_partition(partition_id).with_error_context(|_| {
format!("{COMPONENT} - failed to get partition with id: {partition_id}")
})?;
Expand Down
18 changes: 11 additions & 7 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,33 +176,37 @@ impl Topic {
client_id: u32,
partition_id: Option<u32>,
calculate_partition_id: bool,
) -> Result<(PollingConsumer, u32), IggyError> {
) -> Result<Option<(PollingConsumer, u32)>, IggyError> {
match consumer.kind {
ConsumerKind::Consumer => {
let partition_id = partition_id.unwrap_or(1);
Ok((
Ok(Some((
PollingConsumer::consumer(&consumer.id, partition_id),
partition_id,
))
)))
}
ConsumerKind::ConsumerGroup => {
let consumer_group = self.get_consumer_group(&consumer.id)?.read().await;
if let Some(partition_id) = partition_id {
return Ok((
return Ok(Some((
PollingConsumer::consumer_group(consumer_group.group_id, client_id),
partition_id,
));
)));
}

let partition_id = if calculate_partition_id {
consumer_group.calculate_partition_id(client_id).await?
} else {
consumer_group.get_current_partition_id(client_id).await?
};
Ok((
let Some(partition_id) = partition_id else {
return Ok(None);
};

Ok(Some((
PollingConsumer::consumer_group(consumer_group.group_id, client_id),
partition_id,
))
)))
}
}
}
Expand Down

0 comments on commit 70b44e6

Please sign in to comment.