Skip to content

Commit

Permalink
feat: new config to control constants used by JMSWorker (#141)
Browse files Browse the repository at this point in the history
- New config for JMS receive timeout
- New config for JMS reconnect minimum delay
- New config for JMS reconnect maximum delay

Signed-off-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson authored Dec 12, 2024
1 parent fb50376 commit 16b8500
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.2.0 (Default)
- 2.3.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| mq.message.receive.timeout | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| mq.reconnect.delay.min.ms | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| mq.reconnect.delay.max.ms | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.2.0</version>
<version>2.3.0</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ private Map<String, String> createDefaultConnectorProperties() {
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("topic", "mytopic");
props.put("mq.message.receive.timeout", "5000");
props.put("mq.reconnect.delay.min.ms", "100");
props.put("mq.reconnect.delay.max.ms", "10000");
return props;
}

Expand Down Expand Up @@ -654,4 +657,39 @@ public void verifyEmptyTextMessage() throws Exception {

connectTask.commitRecord(kafkaMessage);
}

@Test
public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.message.receive.timeout", "5000");
connectorConfigProps.put("mq.reconnect.delay.min.ms", "100");
connectorConfigProps.put("mq.reconnect.delay.max.ms", "10000");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

connectTask.poll();

final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
shared.attemptRollback();
assertThat(stateMsgs1.size()).isEqualTo(1);

assertEquals(5000L, shared.getReceiveTimeout());
assertEquals(100L, shared.getReconnectDelayMillisMin());
assertEquals(10000L, shared.getReconnectDelayMillisMax());
}
}
46 changes: 32 additions & 14 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,22 @@ public class JMSWorker {

private boolean connected = false; // Whether connected to MQ
private AtomicBoolean closeNow; // Whether close has been requested
private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
private AbstractConfig config;
private long receiveTimeout; // Receive timeout for the jms consumer
private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max

private static final long RECEIVE_TIMEOUT = 2000L;
private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
long getReceiveTimeout() {
return receiveTimeout;
}

long getReconnectDelayMillisMin() {
return reconnectDelayMillisMin;
}

long getReconnectDelayMillisMax() {
return reconnectDelayMillisMax;
}

/**
* Configure this class.
Expand All @@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
config);

this.config = config;
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());

Expand Down Expand Up @@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
Expand Down Expand Up @@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi

Message message = null;
if (wait) {
log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
log.debug("Waiting {} ms for message", receiveTimeout);

message = internalConsumer.receive(RECEIVE_TIMEOUT);
message = internalConsumer.receive(receiveTimeout);

if (message == null) {
log.debug("No message received");
Expand Down Expand Up @@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
try {
connect();
reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
log.info("Connection to MQ established");
// Reset reconnect delay to initial minimum after successful connection
reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
log.info("Successfully reconnected to MQ.");
} catch (final JMSRuntimeException jmse) {
// Delay slightly so that repeated reconnect loops don't run too fast
log.error("Failed to reconnect to MQ: {}", jmse);
try {
Thread.sleep(reconnectDelayMillis);
log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
Thread.sleep(reconnectDelayMillisMin);
} catch (final InterruptedException ie) {
log.warn("Reconnect delay interrupted.", ie);
}

if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
reconnectDelayMillis = reconnectDelayMillis * 2;
// Exponential backoff: double the delay, but do not exceed the maximum limit
if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
}

log.error("JMS exception {}", jmse);
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
this.getClass().getName());
throw jmse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";

public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
public static final long CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM = 1L;

public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.reconnect.delay.min.ms";
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "reconnect minimum delay";
public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
public static final long CONFIG_RECONNECT_DELAY_MIN_MINIMUM = 1L;

public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.reconnect.delay.max.ms";
public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "reconnect maximum delay";
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;

// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
Expand All @@ -175,7 +193,7 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};

public static String version = "2.2.0";
public static String version = "2.3.0";

private Map<String, String> configProps;

Expand Down Expand Up @@ -265,6 +283,7 @@ public Config validate(final Map<String, String> connectorConfigs) {
final Config config = super.validate(connectorConfigs);

MQSourceConnector.validateMQClientReconnectOptions(config);
MQSourceConnector.validateRetryDelayConfig(config);
return config;
}

Expand All @@ -291,6 +310,31 @@ private static void validateMQClientReconnectOptions(final Config config) {
}
}

/**
* Validates if the retry delay max value is greater than or equal to the min value.
* Adds an error message if the validation fails.
*/
private static void validateRetryDelayConfig(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue reconnectDelayMaxConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
final ConfigValue reconnectDelayMinConfigValue = configValues.get(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);

final long maxReceiveTimeout = (long) reconnectDelayMaxConfigValue.value();
final long minReceiveTimeout = (long) reconnectDelayMinConfigValue.value();

// Validate if the max value is greater than min value
if (maxReceiveTimeout < minReceiveTimeout) {
reconnectDelayMaxConfigValue.addErrorMessage(String.format(
"The value of '%s' must be greater than or equal to the value of '%s'.",
MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX,
MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN
));
}
}

/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;

Expand Down Expand Up @@ -531,6 +575,30 @@ null, new ReadableFile(),
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
Type.LONG,
CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_RECEIVE_TIMEOUT_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
CONFIG_GROUP_MQ, 26,
Width.MEDIUM,
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
Type.LONG,
CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MIN_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
CONFIG_GROUP_MQ, 27,
Width.MEDIUM,
CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
Type.LONG,
CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(CONFIG_RECONNECT_DELAY_MAX_MINIMUM),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
CONFIG_GROUP_MQ, 28,
Width.MEDIUM,
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);

CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.ibm.eventstreams.connect.mqsource;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand All @@ -30,11 +29,7 @@
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MQSourceConnectorTest {
@Test
Expand Down Expand Up @@ -146,4 +141,49 @@ public void testValidateMQClientReconnectOptionsWithANYOption() {
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateRetryDelayConfig() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.reconnect.delay.max.ms", "10");
configProps.put("mq.reconnect.delay.min.ms", "100");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
}

@Test
public void testValidateRetryDelayConfigWithNoReconnectValues() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.allMatch(msg -> msg == null));
}

@Test
public void testValidateRetryDelayConfigWithDefaultValues() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.reconnect.delay.min.ms", "1000000");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'.")));
}
}

0 comments on commit 16b8500

Please sign in to comment.