From 504a9d762ed380d202c6837fea04d91956a57bc7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 16 Sep 2023 16:14:51 +0100 Subject: [PATCH] kgo: expose ConsiderMissingTopicDeletedAfter Updates #523. --- pkg/kgo/client.go | 2 +- pkg/kgo/config.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..2dbd33e9 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -281,7 +281,7 @@ func (cl *Client) OptValues(opt any) []any { return []any{cfg.hooks} case namefn(ConcurrentTransactionsBackoff): return []any{cfg.txnBackoff} - case namefn(considerMissingTopicDeletedAfter): + case namefn(ConsiderMissingTopicDeletedAfter): return []any{cfg.missingTopicDelete} case namefn(DefaultProduceTopic): diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1281628f..8c4f6807 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -833,10 +833,14 @@ func ConcurrentTransactionsBackoff(backoff time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }} } -// considerMissingTopicDeletedAfter sets the amount of time a topic can be +// ConsiderMissingTopicDeletedAfter sets the amount of time a topic can be // missing from metadata responses _after_ loading it at least once before it -// is considered deleted. -func considerMissingTopicDeletedAfter(t time.Duration) Opt { +// is considered deleted, overriding the default of 15s. Note that for newer +// versions of Kafka, it may take a bit of time (~15s) for the cluster to fully +// recognize a newly created topic. If this option is set too low, there is +// some risk that the client will internally purge and re-see a topic a few +// times until the cluster fully broadcasts the topic creation. +func ConsiderMissingTopicDeletedAfter(t time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.missingTopicDelete = t }} }