Skip to content

Commit

Permalink
KAFKA-18134; Disallow group upgrades when custom assignors are used (a…
Browse files Browse the repository at this point in the history
…pache#18046)

Disallow upgrades from classic groups to consumer groups when any member's assignment has non-empty userData.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
squah-confluent authored Dec 9, 2024
1 parent d5c2029 commit 9ae1b0f
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,121 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
)
}

/**
* The test method checks the following scenario:
* 1. Creating a classic group with member 1, whose assignment has non-empty user data.
* 2. Member 2 using consumer protocol joins. The group cannot be upgraded and the join is
* rejected.
* 3. Member 1 leaves.
* 4. Member 2 using consumer protocol joins. The group is upgraded.
*/
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = "bidirectional")
)
)
def testOnlineMigrationWithNonEmptyUserDataInAssignment(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()

// Create the topic.
createTopic(
topic = "foo",
numPartitions = 3
)

// Classic member 1 joins the classic group.
val groupId = "grp"

val memberId1 = joinDynamicConsumerGroupWithOldProtocol(
groupId = groupId,
metadata = metadata(List.empty),
assignment = assignment(List(0, 1, 2), ByteBuffer.allocate(1))
)._1

// The joining request with a consumer group member 2 is rejected.
val errorMessage = consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
expectedError = Errors.GROUP_ID_NOT_FOUND
).errorMessage

assertEquals(
"Cannot upgrade classic group grp to consumer group because an unsupported custom assignor is in use. " +
"Please refer to the documentation or switch to a default assignor before re-attempting the upgrade.",
errorMessage
)

// The group is still a classic group.
assertEquals(
List(
new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType("consumer")
.setGroupState(ClassicGroupState.STABLE.toString)
.setGroupType(Group.GroupType.CLASSIC.toString)
),
listGroups(
statesFilter = List.empty,
typesFilter = List(Group.GroupType.CLASSIC.toString)
)
)

// Classic member 1 leaves the group.
leaveGroup(
groupId = groupId,
memberId = memberId1,
useNewProtocol = false,
version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
)

// Verify that the group is empty.
assertEquals(
List(
new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType("consumer")
.setGroupState(ClassicGroupState.EMPTY.toString)
.setGroupType(Group.GroupType.CLASSIC.toString)
),
listGroups(
statesFilter = List.empty,
typesFilter = List(Group.GroupType.CLASSIC.toString)
)
)

// The joining request with a consumer group member is accepted.
consumerGroupHeartbeat(
groupId = groupId,
memberId = Uuid.randomUuid.toString,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
expectedError = Errors.NONE
)

// The group has become a consumer group.
assertEquals(
List(
new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType("consumer")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupType(Group.GroupType.CONSUMER.toString)
),
listGroups(
statesFilter = List.empty,
typesFilter = List(Group.GroupType.CONSUMER.toString)
)
)
}

private def testUpgradeFromEmptyClassicToConsumerGroup(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
Expand Down Expand Up @@ -1262,10 +1377,11 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
).array
}

private def assignment(assignedPartitions: List[Int]): Array[Byte] = {
private def assignment(assignedPartitions: List[Int], userData: ByteBuffer = null): Array[Byte] = {
ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(
assignedPartitions.map(new TopicPartition("foo", _)).asJava
assignedPartitions.map(new TopicPartition("foo", _)).asJava,
userData
)
).array
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
Expand Down Expand Up @@ -186,6 +187,7 @@
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;

/**
Expand Down Expand Up @@ -674,7 +676,8 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
} else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) {
} else if (createIfNotExists && group.type() == CLASSIC) {
validateOnlineUpgrade((ClassicGroup) group);
return convertToConsumerGroup((ClassicGroup) group, records);
} else {
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
Expand Down Expand Up @@ -1033,23 +1036,28 @@ private void convertToClassicGroup(
* Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request.
*
* @param classicGroup A ClassicGroup.
* @return A boolean indicating whether it's valid to online upgrade the classic group.
* @throws GroupIdNotFoundException if the group cannot be upgraded.
*/
private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
private void validateOnlineUpgrade(ClassicGroup classicGroup) {
if (!config.consumerGroupMigrationPolicy().isUpgradeEnabled()) {
log.info("Cannot upgrade classic group {} to consumer group because the online upgrade is disabled.",
log.info("Cannot upgrade classic group {} to consumer group because online upgrade is disabled.",
classicGroup.groupId());
return false;
throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because online upgrade is disabled.", classicGroup.groupId())
);
} else if (!classicGroup.usesConsumerGroupProtocol()) {
log.info("Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.",
classicGroup.groupId());
return false;
throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because the group does not use the consumer embedded protocol.", classicGroup.groupId())
);
} else if (classicGroup.numMembers() > config.consumerGroupMaxSize()) {
log.info("Cannot upgrade classic group {} to consumer group because the group size exceeds the consumer group maximum size.",
classicGroup.groupId());
return false;
throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because the group size exceeds the consumer group maximum size.", classicGroup.groupId())
);
}
return true;
}

/**
Expand Down Expand Up @@ -1078,12 +1086,21 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
metadataImage.topics()
);
} catch (SchemaException e) {
log.warn("Cannot upgrade the classic group " + classicGroup.groupId() +
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
" to consumer group because the embedded consumer protocol is malformed: "
+ e.getMessage() + ".", e);

throw new GroupIdNotFoundException("Cannot upgrade the classic group " + classicGroup.groupId() +
" to consumer group because the embedded consumer protocol is malformed.");
throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because the embedded consumer protocol is malformed.", classicGroup.groupId())
);
} catch (UnsupportedVersionException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
" to consumer group: " + e.getMessage() + ".", e);

throw new GroupIdNotFoundException(
String.format("Cannot upgrade classic group %s to consumer group because an unsupported custom assignor is in use. " +
"Please refer to the documentation or switch to a default assignor before re-attempting the upgrade.", classicGroup.groupId())
);
}
consumerGroup.createConsumerGroupRecords(records);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
Expand Down Expand Up @@ -1131,6 +1133,9 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
* @param classicGroup The converted classic group.
* @param topicsImage The TopicsImage for topic id and topic name conversion.
* @return The created ConsumerGroup.
*
* @throws SchemaException if any member's subscription or assignment cannot be deserialized.
* @throws UnsupportedVersionException if userData from a custom assignor would be lost.
*/
public static ConsumerGroup fromClassicGroup(
SnapshotRegistry snapshotRegistry,
Expand All @@ -1150,12 +1155,13 @@ public static ConsumerGroup fromClassicGroup(
if (Arrays.equals(classicGroupMember.assignment(), EMPTY_ASSIGNMENT)) {
assignedPartitions = Collections.emptyMap();
} else {
assignedPartitions = toTopicPartitionMap(
ConsumerProtocol.deserializeConsumerProtocolAssignment(
ByteBuffer.wrap(classicGroupMember.assignment())
),
topicsImage
ConsumerProtocolAssignment assignment = ConsumerProtocol.deserializeConsumerProtocolAssignment(
ByteBuffer.wrap(classicGroupMember.assignment())
);
if (assignment.userData() != null && assignment.userData().hasRemaining()) {
throw new UnsupportedVersionException("userData from a custom assignor would be lost");
}
assignedPartitions = toTopicPartitionMap(assignment, topicsImage);
}

// Every member is guaranteed to have metadata set when it joins,
Expand Down
Loading

0 comments on commit 9ae1b0f

Please sign in to comment.