Skip to content

Commit

Permalink
Merge pull request #840 from twmb/kafka-3.8.0
Browse files Browse the repository at this point in the history
Kafka 3.8.0
  • Loading branch information
twmb authored Oct 15, 2024
2 parents 03ae400 + 85097a3 commit a6d66a9
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 13 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ This library attempts to provide an intuitive API while interacting with Kafka t

## Features

- Feature complete client (Kafka >= 0.8.0 through v3.4+)
- Feature complete client (Kafka >= 0.8.0 through v3.8+) _minus_ the next generation group protocol
- Full Exactly-Once-Semantics (EOS)
- Idempotent & transactional producers
- Simple (legacy) consumer
Expand Down Expand Up @@ -403,11 +403,13 @@ generation.
| [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft)`AlterPartition.TopicID` | 3.3 | Supported |
| [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) — Next gen consumer rebalance protocol | 3.7 | Unsupported (proto supported) |
| [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported |
| [KIP-890](https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense) — Transactions server side defense | 3.8 (partial) | Supported |
| [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported |
| [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) |
| [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR) — Stale broker epoch fencing | 3.5 | Supported (proto) |
| [KIP-919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) — Admin client talk to KRaft , Controller registration | 3.7 | Supported (proto) |
| [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) — Leader discovery optimizations | 3.7 | Supported |
| [KIP-994](https://cwiki.apache.org/confluence/display/KAFKA/KIP-994%3A+Minor+Enhancements+to+ListTransactions+and+DescribeTransactions+APIs) — List/Describe transactions enhancements | 3.8 (partial) | Supported |

Missing from above but included in librdkafka is:

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/klauspost/compress v1.17.8
github.com/pierrec/lz4/v4 v4.1.21
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
golang.org/x/crypto v0.23.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
20 changes: 20 additions & 0 deletions pkg/kerr/kerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ var (
MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."}
UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."}
UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"}

// UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."}
// TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."}
// InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."}

TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."}

// InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."}
// ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."}
// InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."}
// FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."}
// InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."}
// DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."}
// VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."}
// InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."}
)

var code2err = map[int16]error{
Expand Down Expand Up @@ -312,4 +327,9 @@ var code2err = map[int16]error{
115: UnsupportedEndpointType, // ""
116: UnknownControllerID, // ""

// 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779
// 118: TelemetryTooLarge, // ""
// 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361

120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314
}
11 changes: 8 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {

if txnReq != nil {
// txnReq can fail from:
// - TransactionAbortable
// - retry failure
// - auth failure
// - producer id mapping / epoch errors
Expand All @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool {
batchesStripped, err := s.doTxnReq(req, txnReq)
if err != nil {
switch {
case errors.Is(err, kerr.TransactionAbortable):
// If we get TransactionAbortable, we continue into producing.
// The produce will fail with the same error, and this is the
// only way to notify the user to abort the txn.
case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
Expand All @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// with produce request vs. end txn (KAFKA-12671)
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err)
return false
}
return false
}

// If we stripped everything, ensure we backoff to force a
Expand Down Expand Up @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq(
continue
}
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable
// OperationNotAttempted is set for all partitions that are authorized
// if any partition is unauthorized _or_ does not exist. We simply remove
// unattempted partitions and treat them as retryable.
Expand Down Expand Up @@ -2057,7 +2062,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32
//////////////

func (*produceRequest) Key() int16 { return 0 }
func (*produceRequest) MaxVersion() int16 { return 10 }
func (*produceRequest) MaxVersion() int16 { return 11 }
func (p *produceRequest) SetVersion(v int16) { p.version = v }
func (p *produceRequest) GetVersion() int16 { return p.version }
func (p *produceRequest) IsFlexible() bool { return p.version >= 9 }
Expand Down
15 changes: 11 additions & 4 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions),
errors.Is(err, kerr.UnknownServerError):
errors.Is(err, kerr.UnknownServerError),
errors.Is(err, kerr.TransactionAbortable):
return true
}
return false
Expand Down Expand Up @@ -408,6 +409,11 @@ retry:
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.TransactionAbortable):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort")
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.UnknownServerError):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
Expand Down Expand Up @@ -517,7 +523,7 @@ const (
// Deprecated: Kafka 3.6 removed support for the hacky behavior that
// this option was abusing. Thus, as of Kafka 3.6, this option does not
// work against Kafka. This option also has never worked for Redpanda
// becuse Redpanda always strictly validated that partitions were a
// because Redpanda always strictly validated that partitions were a
// part of a transaction. Later versions of Kafka and Redpanda will
// remove the need for AddPartitionsToTxn at all and thus this option
// ultimately will be unnecessary anyway.
Expand Down Expand Up @@ -820,8 +826,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() {
//
// If the producer ID has an error and you are trying to commit, this will
// return with kerr.OperationNotAttempted. If this happened, retry
// EndTransaction with TryAbort. Not other error is retryable, and you should
// not retry with TryAbort.
// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you
// can retry with TryAbort. No other error is retryable, and you should not
// retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5, then aborting here will potentially allow the client to recover for
Expand Down
36 changes: 34 additions & 2 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var versions = []struct {
{"v3.5", V3_5_0()},
{"v3.6", V3_6_0()},
{"v3.7", V3_7_0()},
{"v3.8", V3_8_0()},
}

// VersionStrings returns all recognized versions, minus any patch, that can be
Expand Down Expand Up @@ -333,7 +334,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
//
// TODO: add introduced-version to differentiate some specific
// keys.
skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67},
skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67, 74, 75},
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down Expand Up @@ -378,6 +379,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
{max350, "v3.5"},
{max360, "v3.6"},
{max370, "v3.7"},
{max380, "v3.8"},
} {
for k, v := range comparison.cmp.filter(cfg.listener) {
if v == -1 {
Expand Down Expand Up @@ -520,6 +522,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) }
func V3_5_0() *Versions { return zkBrokerOf(max350) }
func V3_6_0() *Versions { return zkBrokerOf(max360) }
func V3_7_0() *Versions { return zkBrokerOf(max370) }
func V3_8_0() *Versions { return zkBrokerOf(max380) }

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
Expand Down Expand Up @@ -1158,8 +1161,37 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys {
return v
})

var max380 = nextMax(max370, func(v listenerKeys) listenerKeys {
// KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890
// Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327
v[0].inc() // 11 produce
v[10].inc() // 5 find coordinator
v[22].inc() // 5 init producer id
v[24].inc() // 5 add partitions to txn
v[25].inc() // 4 add offsets to txn
v[26].inc() // 4 end txn
v[28].inc() // 4 txn offset commit

// KAFKA-15460 68745ef21a9d8fe0f37a8c5fbc7761a598718d46 KIP-848
v[16].inc() // 5 list groups

// KAFKA-14509 90e646052a17e3f6ec1a013d76c1e6af2fbb756e KIP-848 added
// 7b0352f1bd9b923b79e60b18b40f570d4bfafcc0
// b7c99e22a77392d6053fe231209e1de32b50a98b
// 68389c244e720566aaa8443cd3fc0b9d2ec4bb7a
// 5f410ceb04878ca44d2d007655155b5303a47907 stabilized
v = append(v,
k(zkBroker, rBroker), // 69 consumer group describe
)

// KAFKA-16265 b4e96913cc6c827968e47a31261e0bd8fdf677b5 KIP-994 (part 1)
v[66].inc()

return v
})

var (
maxStable = max370
maxStable = max380
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
return v
})
Expand Down

0 comments on commit a6d66a9

Please sign in to comment.