Skip to content

Commit

Permalink
kgo: expose ConsiderMissingTopicDeletedAfter
Browse files Browse the repository at this point in the history
Updates #523.
  • Loading branch information
twmb committed Sep 16, 2023
1 parent 01651af commit 504a9d7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.hooks}
case namefn(ConcurrentTransactionsBackoff):
return []any{cfg.txnBackoff}
case namefn(considerMissingTopicDeletedAfter):
case namefn(ConsiderMissingTopicDeletedAfter):
return []any{cfg.missingTopicDelete}

case namefn(DefaultProduceTopic):
Expand Down
10 changes: 7 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,10 +833,14 @@ func ConcurrentTransactionsBackoff(backoff time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }}
}

// considerMissingTopicDeletedAfter sets the amount of time a topic can be
// ConsiderMissingTopicDeletedAfter sets the amount of time a topic can be
// missing from metadata responses _after_ loading it at least once before it
// is considered deleted.
func considerMissingTopicDeletedAfter(t time.Duration) Opt {
// is considered deleted, overriding the default of 15s. Note that for newer
// versions of Kafka, it may take a bit of time (~15s) for the cluster to fully
// recognize a newly created topic. If this option is set too low, there is
// some risk that the client will internally purge and re-see a topic a few
// times until the cluster fully broadcasts the topic creation.
func ConsiderMissingTopicDeletedAfter(t time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.missingTopicDelete = t }}
}

Expand Down

0 comments on commit 504a9d7

Please sign in to comment.