Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka topic creation uses a wrong configuration #3709

Closed
zappee opened this issue Jan 16, 2025 · 3 comments
Closed

Kafka topic creation uses a wrong configuration #3709

zappee opened this issue Jan 16, 2025 · 3 comments
Labels

Comments

@zappee
Copy link

zappee commented Jan 16, 2025

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Spring Boot 3.4.1 and Spring Kafka 3.3.1

Describe the bug

Following the written doc here the creation of a new Kafka topic during the startup fails.
The TopicBuilder uses a wrong configuration while trying to create a new topic. It seems that it uses the default config values (like bootstrap.servers = [localhost:9092]) despite I have an own config and it fails to create the topic because my config is different.

After the startup when I send a message to the topic everything works great and the KafkaTemplate uses my configuration.

I see there is a configs method on the TopicBuilder here, but it expects Map<String,String> and I am not able to reuse the config map that I use for DefaultKafkaProducerFactory which is Map<String, Object>.

To Reproduce

configuration

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfiguration {

    public static final String KAFKA_TOPIC = "topic1";

    @Value("${KAFKA_SERVERS}")
    private String bootstrapAddresses;

    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, Event> kafkaTemplate() {
        var factory = producerFactory();
        log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
        return new KafkaTemplate<>(factory);
    }

    @Bean
    public NewTopic topic() {
        log.debug("creating a new kafka topic: \"{}\"", KAFKA_TOPIC);
        return TopicBuilder.name(KAFKA_TOPIC)
                .partitions(1)
                .replicas(1)
                .build();
    }

    private String factoryConfigurationToString(ProducerFactory<String, Event> producerFactory) {
        var sb = new StringBuilder();
        producerFactory.
                getConfigurationProperties().
                forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\"", key, value)));
        return sb.toString();
    }
}

sender

@Slf4j
@RestController
@RequestMapping("/api/kafka")
public class KafkaProducerController {

    private final KafkaTemplate<String, Event> kafkaTemplate;

    public KafkaProducerController(KafkaTemplate<String, Event> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/send")
    @MethodStatistics
    public String sendMessageToKafka() {
        var event = Event.builder().....build();
        kafkaTemplate.send(KafkaProducerConfiguration.KAFKA_TOPIC, event);
        return "Message has been sent to Kafka topic.";
    }

log

2025-01-16T15:29:04.448Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Starting Application v0.1.0 using Java 21.0.5 with PID 206 (/jar-to-run/remal-my-kafka-producer-0.1.0.jar started by root in /jar-to-run)
2025-01-16T15:29:04.450Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Running with Spring Boot v3.4.1, Spring v6.2.1
2025-01-16T15:29:04.450Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : No active profile set, falling back to 1 default profile: "default"
2025-01-16T15:29:06.204Z  INFO 206 --- [my-kafka-producer] [           main] o.s.cloud.context.scope.GenericScope     : BeanFactory id=bc87c8e9-04ef-3afc-981b-5d032d24e596
2025-01-16T15:29:07.358Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8443 (https)
2025-01-16T15:29:07.378Z  INFO 206 --- [my-kafka-producer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-01-16T15:29:07.378Z  INFO 206 --- [my-kafka-producer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.34]
2025-01-16T15:29:07.416Z  INFO 206 --- [my-kafka-producer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-01-16T15:29:07.416Z  INFO 206 --- [my-kafka-producer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2918 ms

>>>>> 2025-01-16T15:29:08.062Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaProducerConfiguration : initializing a KafkaTemplate using the following setting: {"bootstrap.servers": "kafka-1.hello.com:9092, kafka-2.hello.com:9092""key.serializer": "class org.apache.kafka.common.serialization.StringSerializer""value.serializer": "class org.springframework.kafka.support.serializer.JsonSerializer"}

>>>>> 2025-01-16T15:29:08.252Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaProducerConfiguration : creating a new topic kafka topic: topic1

2025-01-16T15:29:10.119Z  WARN 206 --- [my-kafka-producer] [           main] iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it's recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath.
2025-01-16T15:29:10.128Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 3 endpoints beneath base path '/actuator'
2025-01-16T15:29:10.225Z  INFO 206 --- [my-kafka-producer] [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	auto.include.jmx.reporter = true
	bootstrap.controllers = []

>>>>> bootstrap.servers = [localhost:9092]
	
	client.dns.lookup = use_all_dns_ips
	client.id = my-kafka-producer-admin-0
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	enable.metrics.push = true
	metadata.max.age.ms = 300000
	metadata.recovery.strategy = none
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.max.ms = 1000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2025-01-16T15:29:10.426Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-16T15:29:10.427Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:29:10.427Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1737041350424
2025-01-16T15:29:10.460Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.463Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:10.566Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.566Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:40.314Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:40.315Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:40.432Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed

>>>>> 2025-01-16T15:29:40.448Z ERROR 206 --- [my-kafka-producer] [           main] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics
	at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:571) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	at java.base/java.util.HashMap.forEach(HashMap.java:1429) ~[na:na]
	at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1707) ~[na:na]
	at org.springframework.kafka.core.KafkaAdmin.checkPartitions(KafkaAdmin.java:550) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:443) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:276) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:245) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:1057) ~[spring-beans-6.2.1.jar!/:6.2.1]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:987) ~[spring-context-6.2.1.jar!/:6.2.1]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.2.1.jar!/:6.2.1]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.1.jar!/:3.4.1]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.1.jar!/:3.4.1]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.1.jar!/:3.4.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.1.jar!/:3.4.1]
	at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:149) ~[spring-boot-3.4.1.jar!/:3.4.1]
	at com.remal.my.service.message.producer.Application.main(Application.java:20) ~[!/:0.1.0]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
	at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
	at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
Caused by: java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.8.1.jar!/:na]
	at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:553) ~[spring-kafka-3.3.1.jar!/:3.3.1]
	... 20 common frames omitted

2025-01-16T15:29:41.253Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:41.254Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:50.450Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient     : [AdminClient clientId=my-kafka-producer-admin-0] Forcing a hard I/O thread shutdown. Requests in progress will be aborted.
2025-01-16T15:29:50.451Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for my-kafka-producer-admin-0 unregistered
2025-01-16T15:29:50.452Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata

2025-01-16T15:29:50.452Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient     : [AdminClient clientId=my-kafka-producer-admin-0] Timed out 2 remaining operation(s) during close.
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2025-01-16T15:29:50.523Z  INFO 206 --- [my-kafka-producer] [           main] o.a.t.util.net.NioEndpoint.certificate   : Connector [https-jsse-nio-8443], TLS virtual host [_default_], certificate type [UNDEFINED] configured from keystore [/root/.keystore] using alias [kafka-producer-service-1.hello.com] with trust store [null]
2025-01-16T15:29:50.532Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8443 (https) with context path '/'
2025-01-16T15:29:50.539Z  INFO 206 --- [my-kafka-producer] [           main] o.s.c.c.s.ConsulServiceRegistry          : Registering service with consul: NewService{id='my-kafka-producer', name='my-kafka-producer', tags=[service, 0.1.0], address='kafka-producer-service-1.hello.com', meta={secure=true}, port=8443, enableTagOverride=null, check=Check{script='null', dockerContainerID='null', shell='null', interval='2s', ttl='null', http='https://kafka-producer-service-1.hello.com:8443/actuator/health', method='null', header={}, tcp='null', timeout='2s', deregisterCriticalServiceAfter='10s', tlsSkipVerify=null, status='null', grpc='null', grpcUseTLS=null}, checks=null}
2025-01-16T15:29:50.580Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Started Application in 51.927 seconds (process running for 52.723)
2025-01-16T15:29:50.636Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-6] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
{"status":"UP"}

>>>>>  2025-01-16T15:32:14.748Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendMessageToKafka()...
2025-01-16T15:32:14.769Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	auto.include.jmx.reporter = true
	batch.size = 16384

>>>>> bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092]
	
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = my-kafka-producer-producer-1
	compression.gzip.level = -1
	compression.lz4.level = 9
	compression.type = none
	compression.zstd.level = 3
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	enable.metrics.push = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metadata.recovery.strategy = none
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.adaptive.partitioning.enable = true
	partitioner.availability.timeout.ms = 0
	partitioner.class = null
	partitioner.ignore.keys = false
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.max.ms = 1000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism = GSSAPI
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2025-01-16T15:32:14.826Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-01-16T15:32:14.852Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=my-kafka-producer-producer-1] Instantiated an idempotent producer.
2025-01-16T15:32:14.884Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-16T15:32:14.884Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:32:14.885Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1737041534884
2025-01-16T15:32:15.448Z  WARN 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=my-kafka-producer-producer-1] Error while fetching metadata with correlation id 2 : {topic1=LEADER_NOT_AVAILABLE}
2025-01-16T15:32:15.450Z  INFO 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=my-kafka-producer-producer-1] Cluster ID: M_1qzbN3Q4e2C_SPkTcrhQ
2025-01-16T15:32:15.450Z  INFO 206 --- [my-kafka-producer] [ucer-producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=my-kafka-producer-producer-1] ProducerId set to 0 with epoch 0
>>>>> 2025-01-16T15:32:15.598Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendMessageToKafka, arguments: , return: "Message has been sent to Kafka topic.", execution-in-ms: 851}

Expected behavior

TopicBuilder and KafkaTemplate must use the same configuration.

Sample

see above

@artembilan
Copy link
Member

Feels like a duplicate of #3667.
That is not a TopicBuilder fault to expose an API based on String.
That is how NewTopic API in Kafka client works.

Not sure what else you'd like to hear from us, but you may consider to re-map that Map<String, Object> for producer to the Map<String,String> for this TopicBuilder.

@zappee
Copy link
Author

zappee commented Jan 16, 2025

Adding a new config line to the TopicBuilder did not help. I got the same issue then before and my kafka server URLs are not used.

@Bean
public NewTopic topic() {
    log.debug("creating a new kafka topic: \"{}\"", KAFKA_TOPIC);
    return TopicBuilder.name(KAFKA_TOPIC)
            .config(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses)   // <-- did not help
            .partitions(1)
            .replicas(1)
            .build();
}

@artembilan
Copy link
Member

Also this is a duplication of SO thread: https://stackoverflow.com/questions/79362234/kafka-topic-creation-uses-wrong-configuration.

I see that you use NewTopic and custom producer config, but you don't use that custom config for KafkaAdmin.
Therefore this one is auto-configured with defaults by Spring Boot.

See if you can rely on auto-configuration for everything or use custom KafaAdmin bean: https://docs.spring.io/spring-kafka/reference/kafka/configuring-topics.html.

Really closing as a duplication.
Let's continue discussion on that SO thread!

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants