Skip to content

Commit

Permalink
jspecify Nullability changes in config package
Browse files Browse the repository at this point in the history
Signed-off-by: Soby Chacko <[email protected]>
  • Loading branch information
sobychacko committed Feb 19, 2025
1 parent 9d91a79 commit 1024c44
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,8 +41,9 @@ class Config {

@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }

ConcurrentKafkaListenerContainerFactory<Int, String>().apply {
setConsumerFactory(consumerFactory)
}

@Bean
fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2024 the original author or authors.
* Copyright 2014-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.regex.Pattern;

import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -70,49 +71,50 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); // NOSONAR

private CommonErrorHandler commonErrorHandler;
private @Nullable CommonErrorHandler commonErrorHandler;

private ConsumerFactory<? super K, ? super V> consumerFactory;
private @Nullable ConsumerFactory<? super K, ? super V> consumerFactory;

private Boolean autoStartup;
private @Nullable Boolean autoStartup;

private Integer phase;
private @Nullable Integer phase;

private RecordMessageConverter recordMessageConverter;
private @Nullable RecordMessageConverter recordMessageConverter;

private BatchMessageConverter batchMessageConverter;
private @Nullable BatchMessageConverter batchMessageConverter;

private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;
private @Nullable RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;

private Boolean ackDiscarded;
private @Nullable Boolean ackDiscarded;

private Boolean batchListener;
private @Nullable Boolean batchListener;

private ApplicationEventPublisher applicationEventPublisher;
private @Nullable ApplicationEventPublisher applicationEventPublisher;

private KafkaTemplate<?, ?> replyTemplate;
private @Nullable KafkaTemplate<?, ?> replyTemplate;

private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor;
private @Nullable AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor;

private ReplyHeadersConfigurer replyHeadersConfigurer;
private @Nullable ReplyHeadersConfigurer replyHeadersConfigurer;

private Boolean missingTopicsFatal;
private @Nullable Boolean missingTopicsFatal;

private RecordInterceptor<K, V> recordInterceptor;
private @Nullable RecordInterceptor<K, V> recordInterceptor;

private BatchInterceptor<K, V> batchInterceptor;
private @Nullable BatchInterceptor<K, V> batchInterceptor;

private BatchToRecordAdapter<K, V> batchToRecordAdapter;
private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;

@SuppressWarnings("NullAway.Init")
private ApplicationContext applicationContext;

private ContainerCustomizer<K, V, C> containerCustomizer;
private @Nullable ContainerCustomizer<K, V, C> containerCustomizer;

private String correlationHeaderName;
private @Nullable String correlationHeaderName;

private Boolean changeConsumerThreadName;
private @Nullable Boolean changeConsumerThreadName;

private Function<MessageListenerContainer, String> threadNameSupplier;
private @Nullable Function<MessageListenerContainer, String> threadNameSupplier;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Expand All @@ -127,7 +129,7 @@ public void setConsumerFactory(ConsumerFactory<? super K, ? super V> consumerFac
this.consumerFactory = consumerFactory;
}

public ConsumerFactory<? super K, ? super V> getConsumerFactory() {
public @Nullable ConsumerFactory<? super K, ? super V> getConsumerFactory() {
return this.consumerFactory;
}

Expand Down Expand Up @@ -190,7 +192,7 @@ public void setAckDiscarded(Boolean ackDiscarded) {
* @return true for a batch listener.
* @since 1.1
*/
public Boolean isBatchListener() {
public @Nullable Boolean isBatchListener() {
return this.batchListener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,53 +72,55 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));

private String id;
private @Nullable String id;

private String groupId;
private @Nullable String groupId;

private final Collection<String> topics = new ArrayList<>();

private Pattern topicPattern;
private @Nullable Pattern topicPattern;

private final Collection<TopicPartitionOffset> topicPartitions = new ArrayList<>();

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

private BeanExpressionResolver resolver;
private @Nullable BeanExpressionResolver resolver;

private BeanExpressionContext expressionContext;
private @Nullable BeanExpressionContext expressionContext;

private BeanResolver beanResolver;
private @Nullable BeanResolver beanResolver;

private String group;
private @Nullable String group;

private RecordFilterStrategy<K, V> recordFilterStrategy;
private @Nullable RecordFilterStrategy<K, V> recordFilterStrategy;

private boolean ackDiscarded;

private Boolean batchListener;
private @Nullable Boolean batchListener;

private KafkaTemplate<?, ?> replyTemplate;
private @Nullable KafkaTemplate<?, ?> replyTemplate;

private String clientIdPrefix;
private @Nullable String clientIdPrefix;

private Integer concurrency;
private @Nullable Integer concurrency;

private Boolean autoStartup;
private @Nullable Boolean autoStartup;

private ReplyHeadersConfigurer replyHeadersConfigurer;
private @Nullable ReplyHeadersConfigurer replyHeadersConfigurer;

private Properties consumerProperties;
private @Nullable Properties consumerProperties;

private boolean splitIterables = true;

private BatchToRecordAdapter<K, V> batchToRecordAdapter;
private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;

@SuppressWarnings("NullAway.Init")
private byte[] listenerInfo;

private String correlationHeaderName;
private @Nullable String correlationHeaderName;

private ContainerPostProcessor<?, ?, ?> containerPostProcessor;
private @Nullable ContainerPostProcessor<?, ?, ?> containerPostProcessor;

@Nullable
private String mainListenerId;
Expand Down Expand Up @@ -244,7 +246,7 @@ public TopicPartitionOffset[] getTopicPartitionsToAssign() {
* @see #setTopicPartitions(TopicPartitionOffset...)
* @see #setTopics(String...)
*/
public void setTopicPattern(Pattern topicPattern) {
public void setTopicPattern(@Nullable Pattern topicPattern) {
this.topicPattern = topicPattern;
}

Expand All @@ -268,7 +270,7 @@ public String getGroup() {
* Set the group for the corresponding listener container.
* @param group the group.
*/
public void setGroup(String group) {
public void setGroup(@Nullable String group) {
this.group = group;
}

Expand Down Expand Up @@ -354,7 +356,7 @@ public String getClientIdPrefix() {
* @param clientIdPrefix the prefix.
* @since 2.1.1
*/
public void setClientIdPrefix(String clientIdPrefix) {
public void setClientIdPrefix(@Nullable String clientIdPrefix) {
this.clientIdPrefix = clientIdPrefix;
}

Expand All @@ -369,7 +371,7 @@ public Integer getConcurrency() {
* @param concurrency the concurrency.
* @since 2.2
*/
public void setConcurrency(Integer concurrency) {
public void setConcurrency(@Nullable Integer concurrency) {
this.concurrency = concurrency;
}

Expand All @@ -384,7 +386,7 @@ public Boolean getAutoStartup() {
* @param autoStartup the autoStartup.
* @since 2.2
*/
public void setAutoStartup(Boolean autoStartup) {
public void setAutoStartup(@Nullable Boolean autoStartup) {
this.autoStartup = autoStartup;
}

Expand Down Expand Up @@ -414,7 +416,7 @@ public Properties getConsumerProperties() {
* @see #setGroupId(String)
* @see #setClientIdPrefix(String)
*/
public void setConsumerProperties(Properties consumerProperties) {
public void setConsumerProperties(@Nullable Properties consumerProperties) {
this.consumerProperties = consumerProperties;
}

Expand All @@ -434,7 +436,7 @@ public void setSplitIterables(boolean splitIterables) {
}

@Override
@Nullable
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public byte[] getListenerInfo() {
return this.listenerInfo; // NOSONAR
}
Expand All @@ -444,7 +446,7 @@ public byte[] getListenerInfo() {
* @param listenerInfo the info.
* @since 2.8.4
*/
public void setListenerInfo(@Nullable byte[] listenerInfo) { // NOSONAR
public void setListenerInfo(byte[] listenerInfo) { // NOSONAR
this.listenerInfo = listenerInfo; // NOSONAR
}

Expand Down Expand Up @@ -474,7 +476,7 @@ public void setCorrelationHeaderName(String correlationHeaderName) {
}

@Override
public ContainerPostProcessor<?, ?, ?> getContainerPostProcessor() {
public @Nullable ContainerPostProcessor<?, ?, ?> getContainerPostProcessor() {
return this.containerPostProcessor;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,12 @@

import java.util.Collection;

import org.jspecify.annotations.Nullable;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;

/**
* A {@link KafkaListenerContainerFactory} implementation to build a
Expand All @@ -46,7 +49,7 @@
public class ConcurrentKafkaListenerContainerFactory<K, V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {

private Integer concurrency;
private @Nullable Integer concurrency;

/**
* Specify the container concurrency.
Expand All @@ -59,13 +62,14 @@ public void setConcurrency(Integer concurrency) {

@Override
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
@Nullable TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
Collection<String> topics = endpoint.getTopics();
Assert.state(topics != null, "'topics' must not be null");
if (!topics.isEmpty()) { // NOSONAR
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public interface KafkaListenerEndpoint {
* Return the topics for this endpoint.
* @return the topics for this endpoint.
*/
@Nullable
Collection<String> getTopics();

/**
Expand Down Expand Up @@ -151,7 +152,7 @@ void setupListenerContainer(MessageListenerContainer listenerContainer,
* @return the info.
* @since 2.8.4
*/
@Nullable
@SuppressWarnings("NullAway") // Dataflow analysis limitation
default byte[] getListenerInfo() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ public String getGroup() {
return null;
}

@Nullable
@Override
public Collection<String> getTopics() {
public @Nullable Collection<String> getTopics() {
return Collections.emptyList();
}

Expand Down
Loading

0 comments on commit 1024c44

Please sign in to comment.