Skip to content

Commit

Permalink
Merge pull request #834 from twmb/805
Browse files Browse the repository at this point in the history
kgo: ignore OOOSN where possible
  • Loading branch information
twmb authored Oct 15, 2024
2 parents 019799a + 3548d1f commit 3d71b14
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,8 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt {
// For Kafka-to-Kafka transactions, the transactional ID is only one half of
// the equation. You must also assign a group to consume from.
//
// To produce transactionally, you first BeginTransaction, then produce records
// consumed from a group, then you EndTransaction. All records produced outside
// To produce transactionally, you first [BeginTransaction], then produce records
// consumed from a group, then you [EndTransaction]. All records produced outside
// of a transaction will fail immediately with an error.
//
// After producing a batch, you must commit what you consumed. Auto committing
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func Balancers(balancers ...GroupBalancer) GroupOpt {
// in this timeout, the broker will remove the member from the group and
// initiate a rebalance.
//
// If you are using a GroupTransactSession for EOS, wish to lower this, and are
// If you are using a [GroupTransactSession] for EOS, wish to lower this, and are
// talking to a Kafka cluster pre 2.5, consider lowering the
// TransactionTimeout. If you do not, you risk a transaction finishing after a
// group has rebalanced, which could lead to duplicate processing. If you are
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti
failing: mp.loadErr != 0,
sink: mp.sns.sink,
topicPartitionData: td,
lastAckedOffset: -1,
}
} else {
p.cursor = &cursor{
Expand Down
33 changes: 33 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,20 @@ func (s *sink) handleReqRespBatch(
// handling, but KIP-360 demonstrated that resetting sequence
// numbers is fundamentally unsafe, so we treat it like OOOSN.
//
// KAFKA-5793 specifically mentions for OOOSN "when you get it,
// it should always mean data loss". Sometime after KIP-360,
// Kafka changed the client to remove all places
// UnknownProducerID was returned, and then started referring
// to OOOSN as retryable. KIP-890 definitively says OOOSN is
// retryable. However, the Kafka source as of 24-10-10 still
// only retries OOOSN for batches that are NOT the expected
// next batch (i.e., it's next + 1, for when there are multiple
// in flight). With KIP-890, we still just disregard whatever
// supposedly non-retryable / actually-is-retryable error is
// returned if the LogStartOffset is _after_ what we previously
// produced. Specifically, this is step (4) in in wiki link
// within KAFKA-5793.
//
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
Expand Down Expand Up @@ -886,6 +900,22 @@ func (s *sink) handleReqRespBatch(
// txn coordinator requests, which have PRODUCER_FENCED vs
// TRANSACTION_TIMED_OUT.

if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset {
s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers",
"broker", logID(s.nodeID),
"topic", topic,
"partition", rp.Partition,
"producer_id", producerID,
"producer_epoch", producerEpoch,
"err", err,
)
s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID)
if debug {
fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err)
}
return true, false
}

if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID",
"broker", logID(s.nodeID),
Expand Down Expand Up @@ -956,6 +986,7 @@ func (s *sink) handleReqRespBatch(
)
} else {
batch.owner.okOnSink = true
batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records))
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err)
didProduce = err == nil
Expand Down Expand Up @@ -1227,6 +1258,8 @@ type recBuf struct {
// to drain.
inflight uint8

lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced

topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated)

// seq is used for the seq in each record batch. It is incremented when
Expand Down

0 comments on commit 3d71b14

Please sign in to comment.