Skip to content

Commit

Permalink
kgo: add AssumeConsumersRequireStable()
Browse files Browse the repository at this point in the history
This allows group transact sessions to NOT internally force an abort if
a revoke happens.

Closes #754.
  • Loading branch information
twmb committed Oct 11, 2024
1 parent b66ceb7 commit 17340ca
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 33 deletions.
20 changes: 16 additions & 4 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ type cfg struct {
balancers []GroupBalancer // balancers we can use
protocol string // "consumer" by default, expected to never be overridden

sessionTimeout time.Duration
rebalanceTimeout time.Duration
heartbeatInterval time.Duration
requireStable bool
sessionTimeout time.Duration
rebalanceTimeout time.Duration
heartbeatInterval time.Duration
requireStable bool
assumeConsumersRequireStable bool

onAssigned func(context.Context, *Client, map[string][]int32)
onRevoked func(context.Context, *Client, map[string][]int32)
Expand Down Expand Up @@ -1503,10 +1504,21 @@ func HeartbeatInterval(interval time.Duration) GroupOpt {
// transactional timeouts to a small value (10s) rather than the default 60s.
// Lowering the transactional timeout will reduce the chance that consumers are
// entirely blocked.
//
// If all consumers in your group also require stable fetch offsets, you may
// want to additionally use [AssumeConsumersRequireStable].
func RequireStableFetchOffsets() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.requireStable = true }}
}

// AssumeConsumersRequireStable opts the [GroupTransactSession] into NOT
// aborting whenever rebalance occur (i.e., opts this client into assuming
// every other client in the group also requires stable offsets). This
// option should be used in tandem with [RequireStableFetchOffsets].
func AssumeConsumersRequireStable() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.assumeConsumersRequireStable = true }}
}

// BlockRebalanceOnPoll switches the client to block rebalances whenever you
// poll until you explicitly call AllowRebalance. This option also ensures that
// any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestGroupETL(t *testing.T) {
errs,
false,
tc.balancer,
false,
)
})
}
Expand Down
27 changes: 14 additions & 13 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ func (c *testConsumer) wait() {
c.wg.Wait()
}

func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int) {
func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int, assumeStable bool) {
if transactional {
c.goTransact(etlsBeforeQuit)
c.goTransact(etlsBeforeQuit, assumeStable)
} else {
c.goGroupETL(etlsBeforeQuit)
}
Expand All @@ -419,6 +419,7 @@ func testChainETL(
errs chan error,
transactional bool,
balancer GroupBalancer,
assumeStable bool,
) {
var (
/////////////
Expand Down Expand Up @@ -484,24 +485,24 @@ func testChainETL(
////////////////////

for i := 0; i < 3; i++ { // three consumers start with standard poll&commit behavior
consumers1.goRun(transactional, -1)
consumers2.goRun(transactional, -1)
consumers3.goRun(transactional, -1)
consumers1.goRun(transactional, -1, assumeStable)
consumers2.goRun(transactional, -1, assumeStable)
consumers3.goRun(transactional, -1, assumeStable)
}

consumers1.goRun(transactional, 0) // bail immediately
consumers1.goRun(transactional, 2) // bail after two txns
consumers2.goRun(transactional, 2) // same
consumers1.goRun(transactional, 0, assumeStable) // bail immediately
consumers1.goRun(transactional, 2, assumeStable) // bail after two txns
consumers2.goRun(transactional, 2, assumeStable) // same

time.Sleep(5 * time.Second)
for i := 0; i < 3; i++ { // trigger rebalance after 5s with more consumers
consumers1.goRun(transactional, -1)
consumers2.goRun(transactional, -1)
consumers3.goRun(transactional, -1)
consumers1.goRun(transactional, -1, assumeStable)
consumers2.goRun(transactional, -1, assumeStable)
consumers3.goRun(transactional, -1, assumeStable)
}

consumers2.goRun(transactional, 0) // bail immediately
consumers1.goRun(transactional, 1) // bail after one txn
consumers2.goRun(transactional, 0, assumeStable) // bail immediately
consumers1.goRun(transactional, 1, assumeStable) // bail after one txn

doneConsume := make(chan struct{})
go func() {
Expand Down
20 changes: 16 additions & 4 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ const (
// (EOS).
//
// If you are running Kafka 2.5+, it is strongly recommended that you also use
// RequireStableFetchOffsets. See that config option's documentation for more
// details.
// [RequireStableFetchOffsets]. See that config option's documentation for more
// details. By default, if the client detects any rebalance, any active transaction
// is aborted for safety. You can use the [AssumeConsumersRequireStable] to opt into
// NOT aborting automatically on rebalance. See issue 754 for more detail.
type GroupTransactSession struct {
cl *Client

Expand Down Expand Up @@ -94,18 +96,27 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
userRevoked := cfg.onRevoked
cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.revoked {
s.failMu.Unlock()
return
}

if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else if cl.cfg.assumeConsumersRequireStable {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke, but we are assuming all consumers require stable; allowing commit while in user revoked")
defer func() {
s.failMu.Lock()
s.revoked = true
close(s.revokedCh)
s.failMu.Unlock()
}()
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
s.revoked = true
close(s.revokedCh)
}
s.failMu.Unlock()

if userRevoked != nil {
userRevoked(ctx, cl, rev)
Expand All @@ -115,14 +126,15 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
userLost := cfg.onLost
cfg.onLost = func(ctx context.Context, cl *Client, lost map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.lost {
s.failMu.Unlock()
return
}

cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction")
s.lost = true
close(s.lostCh)
s.failMu.Unlock()

if userLost != nil {
userLost(ctx, cl, lost)
Expand Down
36 changes: 24 additions & 12 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ func TestTxnEtl(t *testing.T) {
////////////////////////////

for _, tc := range []struct {
name string
balancer GroupBalancer
name string
balancer GroupBalancer
assumeStable bool
}{
{"roundrobin", RoundRobinBalancer()},
{"range", RangeBalancer()},
{"sticky", StickyBalancer()},
{"cooperative-sticky", CooperativeStickyBalancer()},
{"roundrobin", RoundRobinBalancer(), false},
{"range", RangeBalancer(), true},
{"sticky", StickyBalancer(), false},
{"cooperative-sticky", CooperativeStickyBalancer(), true},
{"cooperative-sticky", CooperativeStickyBalancer(), false},
} {
t.Run(tc.name, func(t *testing.T) {
testChainETL(
Expand All @@ -124,17 +126,18 @@ func TestTxnEtl(t *testing.T) {
errs,
true,
tc.balancer,
tc.assumeStable,
)
})
}
}

func (c *testConsumer) goTransact(txnsBeforeQuit int) {
func (c *testConsumer) goTransact(txnsBeforeQuit int, assumeStable bool) {
c.wg.Add(1)
go c.transact(txnsBeforeQuit)
go c.transact(txnsBeforeQuit, assumeStable)
}

func (c *testConsumer) transact(txnsBeforeQuit int) {
func (c *testConsumer) transact(txnsBeforeQuit int, assumeStable bool) {
defer c.wg.Done()

opts := []Opt{
Expand All @@ -155,12 +158,21 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
Balancers(c.balancer),
MaxBufferedRecords(10000),
}
if requireStableFetch {
opts = append(opts, RequireStableFetchOffsets())
var txnSess *GroupTransactSession
if assumeStable {
opts = append(opts, OnPartitionsRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) {

Check failure on line 163 in pkg/kgo/txn_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

unused-parameter: parameter 'cl' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 163 in pkg/kgo/txn_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

unused-parameter: parameter 'cl' seems to be unused, consider removing or renaming it as _ (revive)
txnSess.End(ctx, TryCommit)
}))
}
if requireStableFetch || assumeStable {
opts = append(opts,
RequireStableFetchOffsets(),
AssumeConsumersRequireStable(),
)
}
opts = append(opts, testClientOpts()...)

txnSess, _ := NewGroupTransactSession(opts...)
txnSess, _ = NewGroupTransactSession(opts...)
defer txnSess.Close()

ntxns := 0 // for if txnsBeforeQuit is non-negative
Expand Down

0 comments on commit 17340ca

Please sign in to comment.