diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
index fac14a9..72159fb 100644
--- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
+++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
@@ -57,7 +57,7 @@ body:
label: Version
description: What version of our software are you running?
options:
- - 2.2.0 (Default)
+ - 2.2.1 (Default)
- 1.3.5
- older (<1.3.5)
validations:
diff --git a/README.md b/README.md
index 9745ecd..04191d8 100644
--- a/README.md
+++ b/README.md
@@ -305,6 +305,9 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
+| mq.jms.receive.timeout | The timeout in milliseconds for receiving messages from JMS Consumer. | long | 2000L | 1 or greater |
+| mq.jms.reconnect.delay.min.ms | The minimum delay in milliseconds for reconnect attempts. | long | 64L | 1 or greater |
+| mq.jms.reconnect.delay.max.ms | The maximum delay in milliseconds for reconnect attempts. | long | 8192L | 1 or greater |
### Using a CCDT file
diff --git a/pom.xml b/pom.xml
index 402a1ce..c2eb558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-source
jar
- 2.2.0
+ 2.2.1
kafka-connect-mq-source
IBM Corporation
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
index 8e505c0..b7b3654 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
@@ -654,4 +654,39 @@ public void verifyEmptyTextMessage() throws Exception {
connectTask.commitRecord(kafkaMessage);
}
+
+ @Test
+ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() throws Exception {
+ connectTask = getSourceTaskWithEmptyKafkaOffset();
+
+ final Map connectorConfigProps = createExactlyOnceConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder",
+ "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+ connectorConfigProps.put("mq.jms.receive.timeout", "5000");
+ connectorConfigProps.put("mq.jms.reconnect.delay.min.ms", "100");
+ connectorConfigProps.put("mq.jms.reconnect.delay.max.ms", "10000");
+
+ final JMSWorker shared = new JMSWorker();
+ shared.configure(getPropertiesConfig(connectorConfigProps));
+ final JMSWorker dedicated = new JMSWorker();
+ dedicated.configure(getPropertiesConfig(connectorConfigProps));
+ final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
+
+ connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
+
+ final List messages = createAListOfMessages(getJmsContext(), 2, "message ");
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
+
+ connectTask.poll();
+
+ final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
+ assertThat(stateMsgs1.size()).isEqualTo(1);
+ shared.attemptRollback();
+ assertThat(stateMsgs1.size()).isEqualTo(1);
+
+ assertEquals(5000L, shared.getReceiveTimeout());
+ assertEquals(100L, shared.getReconnectDelayMillisMin());
+ assertEquals(10000L, shared.getReconnectDelayMillisMax());
+ }
}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
index 8e2d4ac..4e93659 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
@@ -70,11 +70,22 @@ public class JMSWorker {
private boolean connected = false; // Whether connected to MQ
private AtomicBoolean closeNow; // Whether close has been requested
- private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
+ private AbstractConfig config;
+ private long receiveTimeout; // Receive timeout for the jms consumer
+ private long reconnectDelayMillisMin; // Delay between repeated reconnect attempts min
+ private long reconnectDelayMillisMax; // Delay between repeated reconnect attempts max
- private static final long RECEIVE_TIMEOUT = 2000L;
- private static final long RECONNECT_DELAY_MILLIS_MIN = 64L;
- private static final long RECONNECT_DELAY_MILLIS_MAX = 8192L;
+ long getReceiveTimeout() {
+ return receiveTimeout;
+ }
+
+ long getReconnectDelayMillisMin() {
+ return reconnectDelayMillisMin;
+ }
+
+ long getReconnectDelayMillisMax() {
+ return reconnectDelayMillisMax;
+ }
/**
* Configure this class.
@@ -87,6 +98,7 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
config);
+ this.config = config;
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
@@ -132,6 +144,9 @@ public void configure(final AbstractConfig config) {
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
+ receiveTimeout = config.getLong(MQSourceConnector.CONFIG_MAX_RECEIVE_TIMEOUT);
+ reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
+ reconnectDelayMillisMax = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MAX);
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new JMSWorkerConnectionException("JMS connection failed", jmse);
@@ -230,9 +245,9 @@ public Message receive(final String queueName, final QueueConfig queueConfig, fi
Message message = null;
if (wait) {
- log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT);
+ log.debug("Waiting {} ms for message", receiveTimeout);
- message = internalConsumer.receive(RECEIVE_TIMEOUT);
+ message = internalConsumer.receive(receiveTimeout);
if (message == null) {
log.debug("No message received");
@@ -364,20 +379,23 @@ private boolean maybeReconnect() throws JMSRuntimeException {
log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
try {
connect();
- reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
- log.info("Connection to MQ established");
+ // Reset reconnect delay to initial minimum after successful connection
+ reconnectDelayMillisMin = config.getLong(MQSourceConnector.CONFIG_RECONNECT_DELAY_MIN);
+ log.info("Successfully reconnected to MQ.");
} catch (final JMSRuntimeException jmse) {
- // Delay slightly so that repeated reconnect loops don't run too fast
+ log.error("Failed to reconnect to MQ: {}", jmse);
try {
- Thread.sleep(reconnectDelayMillis);
+ log.debug("Waiting for {} ms before next reconnect attempt.", reconnectDelayMillisMin);
+ Thread.sleep(reconnectDelayMillisMin);
} catch (final InterruptedException ie) {
+ log.warn("Reconnect delay interrupted.", ie);
}
- if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
- reconnectDelayMillis = reconnectDelayMillis * 2;
+ // Exponential backoff: double the delay, but do not exceed the maximum limit
+ if (reconnectDelayMillisMin < reconnectDelayMillisMax) {
+ reconnectDelayMillisMin = Math.min(reconnectDelayMillisMin * 2, reconnectDelayMillisMax);
+ log.debug("Reconnect delay increased to {} ms.", reconnectDelayMillisMin);
}
-
- log.error("JMS exception {}", jmse);
log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
this.getClass().getName());
throw jmse;
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
index 1621055..c072d70 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
@@ -163,6 +163,21 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
+ public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.jms.receive.timeout";
+ public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "The timeout in milliseconds for receiving messages from JMS Consumer.";
+ public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "JMS receive timeout";
+ public static final long CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT = 2000L;
+
+ public static final String CONFIG_RECONNECT_DELAY_MIN = "mq.jms.reconnect.delay.min.ms";
+ public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN = "The minimum delay in milliseconds for reconnect attempts.";
+ public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MIN = "JMS reconnect minimum delay";
+ public static final long CONFIG_RECONNECT_DELAY_MIN_DEFAULT = 64L;
+
+ public static final String CONFIG_RECONNECT_DELAY_MAX = "mq.jms.reconnect.delay.max.ms";
+ public static final String CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX = "The maximum delay in milliseconds for reconnect attempts.";
+ public static final String CONFIG_DISPLAY_RECONNECT_DELAY_MAX = "JMS reconnect maximum delay";
+ public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
+
// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -175,7 +190,7 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};
- public static String version = "2.2.0";
+ public static String version = "2.2.1";
private Map configProps;
@@ -531,6 +546,30 @@ null, new ReadableFile(),
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
+ CONFIGDEF.define(CONFIG_MAX_RECEIVE_TIMEOUT,
+ Type.LONG,
+ CONFIG_MAX_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(0),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
+ CONFIG_GROUP_MQ, 26,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
+ CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MIN,
+ Type.LONG,
+ CONFIG_RECONNECT_DELAY_MIN_DEFAULT, ConfigDef.Range.atLeast(0),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_RECONNECT_DELAY_MIN,
+ CONFIG_GROUP_MQ, 27,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_RECONNECT_DELAY_MIN);
+ CONFIGDEF.define(CONFIG_RECONNECT_DELAY_MAX,
+ Type.LONG,
+ CONFIG_RECONNECT_DELAY_MAX_DEFAULT, ConfigDef.Range.atLeast(0),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_RECONNECT_DELAY_MAX,
+ CONFIG_GROUP_MQ, 28,
+ Width.MEDIUM,
+ CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,