Skip to content

Commit

Permalink
feat: Allow config for client reconnect in connector (#138)
Browse files Browse the repository at this point in the history
Contributes to: event-integration/eventstreams-planning#12716

Signed-off-by: Joel Hanson <[email protected]>
Signed-off-by: Joel Hanson <[email protected]>
Co-authored-by: Joel Hanson <[email protected]>
  • Loading branch information
Joel-hanson and Joel Hanson authored Sep 19, 2024
1 parent bbb1102 commit 7e9d940
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 8 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- 2.1.0 (Default)
- 1.3.5
- 2.1.1 (Default)
- 1.3.5
- older (<1.3.5)
validations:
required: true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.1.0</version>
<version>2.1.1</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,4 @@ public void testQueueHoldsMoreThanOneMessage_twoMessageOnQueue() throws Exceptio
public void testQueueHoldsMoreThanOneMessage_queueNotFound() {
assertThrows(Exception.class, ()->jmsWorker.queueHoldsMoreThanOneMessage("QUEUE_DOES_NOT_EXIST"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,81 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th
assertThatNoException()
.isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds)));
}


@Test
public void testConfigureClientReconnectOptions() throws Exception {
// setup test condition: put messages on source queue, poll once to read them
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.client.reconnect.options", "QMGR");

JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 2, "message ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

connectTask.poll();

List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
shared.attemptRollback();
assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back

}

@Test
public void verifyEmptyMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

Message emptyMessage = getJmsContext().createMessage();
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}

@Test
public void verifyEmptyTextMessage() throws Exception {
connectTask = new MQSourceTask();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

TextMessage emptyMessage = getJmsContext().createTextMessage();
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());

final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertNull(kafkaMessage.value());

connectTask.commitRecord(kafkaMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.URL;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void configure(final AbstractConfig config) {
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
}
}
configureClientReconnectOptions(config, mqConnFactory);

userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
Expand All @@ -144,6 +146,31 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
}

// Configure client reconnect option based on the config
private static void configureClientReconnectOptions(final AbstractConfig config,
final MQConnectionFactory mqConnFactory) throws JMSException {
String clientReconnectOptions = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);

clientReconnectOptions = clientReconnectOptions.toUpperCase(Locale.ENGLISH);

switch (clientReconnectOptions) {
case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
break;

case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_Q_MGR);
break;

case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_DISABLED);
break;

default:
mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_AS_DEF);
break;
}
}

/**
* Used for tests.
Expand All @@ -152,7 +179,7 @@ protected void setRecordBuilder(final RecordBuilder recordBuilder) {
this.recordBuilder = recordBuilder;
}

protected JMSContext getContext() { // used to enable testing
protected JMSContext getContext() { // used to enable testing
if (jmsCtxt == null) maybeReconnect();
return jmsCtxt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand Down Expand Up @@ -152,7 +155,27 @@ public class MQSourceConnector extends SourceConnector {
+ "previous batch of messages to be delivered to Kafka before starting a new poll.";
public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms";

public static String version = "2.1.0";
public static final String CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS = "mq.client.reconnect.options";
public static final String CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS = "Options governing MQ reconnection.";
public static final String CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS = "MQ client reconnect options";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR = "QMGR";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY = "ANY";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";

// Define valid reconnect options
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.toLowerCase(Locale.ENGLISH),
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
};

public static String version = "2.1.1";

private Map<String, String> configProps;

Expand Down Expand Up @@ -237,6 +260,37 @@ public ConfigDef config() {
return CONFIGDEF;
}

@Override
public Config validate(final Map<String, String> connectorConfigs) {
final Config config = super.validate(connectorConfigs);

MQSourceConnector.validateMQClientReconnectOptions(config);
return config;
}

private static void validateMQClientReconnectOptions(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue clientReconnectOptionsConfigValue = configValues
.get(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
final ConfigValue exactlyOnceStateQueueConfigValue = configValues
.get(MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE);

// Check if the exactly once state queue is configured
if (exactlyOnceStateQueueConfigValue.value() == null) {
return;
}

// Validate the client reconnect options
final Boolean isClientReconnectOptionQMGR = CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.equals(clientReconnectOptionsConfigValue.value());
if (!isClientReconnectOptionQMGR) {
clientReconnectOptionsConfigValue.addErrorMessage(
"When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided. For example: `mq.client.reconnect.options: QMGR`");
}
}

/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;

Expand Down Expand Up @@ -468,6 +522,16 @@ null, new ReadableFile(),
null, 24, Width.MEDIUM,
CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS);

CONFIGDEF.define(CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS,
Type.STRING,
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
ConfigDef.ValidString.in(CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS),
Importance.MEDIUM,
CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS,
CONFIG_GROUP_MQ, 25,
Width.SHORT,
CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);

CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
// user must specify the topic name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.ibm.eventstreams.connect.mqsource;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
Expand All @@ -28,7 +30,11 @@
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MQSourceConnectorTest {
@Test
Expand Down Expand Up @@ -82,5 +88,62 @@ public void testConnectorConfigSupportsExactlyOnce() {
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", "")));
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", null)));
}

}

@Test
public void testValidateMQClientReconnectOptions() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithoutExactlyOnce() {
final Map<String, String> configProps = new HashMap<String, String>();
final Config config = new MQSourceConnector().validate(configProps);

assertFalse(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithQMGROption() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("mq.client.reconnect.options", "QMGR");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertFalse(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}

@Test
public void testValidateMQClientReconnectOptionsWithANYOption() {
final Map<String, String> configProps = new HashMap<String, String>();
configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
configProps.put("mq.client.reconnect.options", "ANY");
configProps.put("tasks.max", "1");

final Config config = new MQSourceConnector().validate(configProps);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
}
}

0 comments on commit 7e9d940

Please sign in to comment.