Skip to content

Commit

Permalink
Merge pull request #537 from twmb/kversion
Browse files Browse the repository at this point in the history
kversion: fix version detection for Kafka v2.7 through 3.4
  • Loading branch information
twmb authored Aug 18, 2023
2 parents 83cb9fe + 5978156 commit 46dea4a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func SkipKeys(keys ...int16) VersionGuessOpt {
}

// TryRaftBroker changes from guessing the version for a classical ZooKeeper
// based broker to guessing for a raft based broker (v2.8.0+).
// based broker to guessing for a raft based broker (v2.8+).
//
// Note that with raft, there can be a TryRaftController attempt as well.
func TryRaftBroker() VersionGuessOpt {
Expand All @@ -147,7 +147,7 @@ func TryRaftBroker() VersionGuessOpt {

// TryRaftController changes from guessing the version for a classical
// ZooKeeper based broker to guessing for a raft based controller broker
// (v2.8.0+).
// (v2.8+).
//
// Note that with raft, there can be a TryRaftBroker attempt as well. Odds are
// that if you are an end user speaking to a raft based Kafka cluster, you are
Expand All @@ -164,7 +164,7 @@ type guessCfg struct {

// VersionGuess attempts to guess which version of Kafka these versions belong
// to. If an exact match can be determined, this returns a string in the format
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0.0 or post). For
// v0.#.# or v#.# (depending on whether Kafka is pre-1.0 or post). For
// example, v0.8.0 or v2.7.
//
// Patch numbers are not included in the guess as it is not possible to
Expand Down Expand Up @@ -253,7 +253,15 @@ func (g guess) String() string {
func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
cfg := guessCfg{
listener: zkBroker,
skipKeys: []int16{4, 5, 6, 7, 27},
// Envelope was added in 2.7 for kraft and zkBroker in 3.4; we
// need to skip it for 2.7 through 3.4 otherwise the version
// detection fails. We can just skip it generally since there
// are enough differentiating factors that accurately detecting
// envelope doesn't matter.
//
// TODO: add introduced-version to differentiate some specific
// keys.
skipKeys: []int16{4, 5, 6, 7, 27, 58},
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down
72 changes: 72 additions & 0 deletions pkg/kversion/kversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,75 @@ func TestEqual(t *testing.T) {
t.Errorf("unexpectedly not equal after backing v0.8.1 down to v0.8.0, opposite direction")
}
}

func TestVersionProbeKafka3_1(t *testing.T) {
versions := map[int16]int16{
0: 9, // Produce
1: 13, // Fetch
2: 7, // ListOffsets
3: 12, // Metadata
4: 5, // LeaderAndISR
5: 3, // StopReplica
6: 7, // UpdateMetadata
7: 3, // ControlledShutdown
8: 8, // OffsetCommit
9: 8, // OffsetFetch
10: 4, // FindCoordinator
11: 7, // JoinGroup
12: 4, // Heartbeat
13: 4, // LeaveGroup
14: 5, // SyncGroup
15: 5, // DescribeGroups
16: 4, // ListGroups
17: 1, // SASLHandshake
18: 3, // ApiVersions
19: 7, // CreateTopics
20: 6, // DeleteTopics
21: 2, // DeleteRecords
22: 4, // InitProducerID
23: 4, // OffsetForLeaderEpoch
24: 3, // AddPartitionsToTxn
25: 3, // AddOffsetsToTxn
26: 3, // EndTxn
27: 1, // WriteTxnMarkers
28: 3, // TxnOffsetCommit
29: 2, // DescribeACLs
30: 2, // CreateACLs
31: 2, // DeleteACLs
32: 4, // DescribeConfigs
33: 2, // AlterConfigs
34: 2, // AlterReplicaLogDirs
35: 2, // DescribeLogDirs
36: 2, // SASLAuthenticate
37: 3, // CreatePartitions
38: 2, // CreateDelegationToken
39: 2, // RenewDelegationToken
40: 2, // ExpireDelegationToken
41: 2, // DescribeDelegationToken
42: 2, // DeleteGroups
43: 2, // ElectLeaders
44: 1, // IncrementalAlterConfigs
45: 0, // AlterPartitionAssignments
46: 0, // ListPartitionReassignments
47: 0, // OffsetDelete
48: 1, // DescribeClientQuotas
49: 1, // AlterClientQuotas
50: 0, // DescribeUserSCRAMCredentials
51: 0, // AlterUserSCRAMCredentials
56: 0, // AlterPartition
57: 0, // UpdateFeatures
60: 0, // DescribeCluster
61: 0, // DescribeProducers
65: 0, // DescribeTransactions
66: 0, // ListTransactions
67: 0, // AllocateProducerIDs
}

var vs Versions
for k, v := range versions {
vs.SetMaxKeyVersion(k, v)
}
if guess := vs.VersionGuess(); guess != "v3.1" {
t.Errorf("unexpected version guess, got %s != exp %s", guess, "v3.1")
}
}

0 comments on commit 46dea4a

Please sign in to comment.