From f95eb987d533f3a01a256bc7a417fc64dfc08382 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Mon, 11 Sep 2023 18:04:37 +0200 Subject: [PATCH] Allow to provide custom kafka configs --- .../connect/s3/S3SinkConnectorConfig.java | 2 +- .../connect/s3/file/KafkaFileEventConfig.java | 48 ++++----- .../s3/integration/S3SinkFileEventIT.java | 99 +++++++++++-------- 3 files changed, 81 insertions(+), 68 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 626e1bcee..d41ecbe7b 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -843,7 +843,7 @@ public static ConfigDef newConfigDef() { Importance.LOW, "File event configuration as json format. " + "Mandatory Fields: bootstrap_servers, topic_name, schema_registry_url. " - + "Optional fields: sasl_mechanism, security_protocol, sasl_jaas_config. " + + "Custom fields can be added in a customs field as a map of attribute" + "By default an empty json.", group, ++orderInGroup, diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/file/KafkaFileEventConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/file/KafkaFileEventConfig.java index e023249d6..8730295dd 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/file/KafkaFileEventConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/file/KafkaFileEventConfig.java @@ -15,10 +15,12 @@ package io.confluent.connect.s3.file; +import java.util.Map; import java.util.Properties; public class KafkaFileEventConfig extends AbstractFileEventConfig { + Map customs; private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String VALUE_SERIALIZER = @@ -32,22 +34,17 @@ public class KafkaFileEventConfig extends AbstractFileEventConfig { private String saslJaasConfig; /** empty constructor for jackson */ - public KafkaFileEventConfig() { - } + public KafkaFileEventConfig() {} public KafkaFileEventConfig( String topicName, String bootstrapServers, String schemaRegistryUrl, - String securityProtocol, - String saslMechanism, - String saslJaasConfig) { + Map customs) { this.topicName = topicName; this.bootstrapServers = bootstrapServers; this.schemaRegistryUrl = schemaRegistryUrl; - this.securityProtocol = securityProtocol; - this.saslMechanism = saslMechanism; - this.saslJaasConfig = saslJaasConfig; + this.customs = customs; } @Override @@ -64,16 +61,15 @@ public String toJson() { sb.append("\"topic_name\": \"").append(topicName).append('"'); sb.append(", \"bootstrap_servers\": \"").append(bootstrapServers).append('"'); sb.append(", \"schema_registry_url\": \"").append(schemaRegistryUrl).append('"'); - if (securityProtocol != null) { - sb.append(", \"security_protocol\": \"").append(securityProtocol).append('"'); - } - if (saslMechanism != null) { - sb.append(", \"sasl_mechanism\": \"").append(saslMechanism).append('"'); + sb.append(", \"customs\": {"); + String customIncrement = ""; + for (Map.Entry custom : customs.entrySet()) { + sb.append( + String.format( + "%s \"%s\": \"%s\"", customIncrement, custom.getKey(), custom.getValue().toString())); + customIncrement = ","; } - if (saslJaasConfig != null) { - sb.append(", \"sasl_jaas_config\": \"").append(saslJaasConfig).append('"'); - } - sb.append('}'); + sb.append("}}"); return sb.toString(); } @@ -82,21 +78,13 @@ public Properties toProps() { Properties prop = new Properties(); prop.setProperty("key.serializer", KEY_SERIALIZER); prop.setProperty("value.serializer", VALUE_SERIALIZER); - prop.setProperty("use.latest.version", "true"); - prop.setProperty("auto.register.schemas", "false"); // mandatory prop.setProperty("bootstrap.servers", bootstrapServers); prop.setProperty("topic.name", topicName); prop.setProperty("schema.registry.url", schemaRegistryUrl); - // optional - if (saslMechanism != null) { - prop.setProperty("sasl.mechanism", saslMechanism); - } - if (securityProtocol != null) { - prop.setProperty("security.protocol", securityProtocol); - } - if (saslJaasConfig != null) { - prop.setProperty("sasl.jaas.config", saslJaasConfig); + // customs + for (Map.Entry custom : customs.entrySet()) { + prop.setProperty(custom.getKey(), custom.getValue().toString()); } return prop; } @@ -124,4 +112,8 @@ public String getSaslMechanism() { public String getSaslJaasConfig() { return saslJaasConfig; } + + public Map getCustoms() { + return customs; + } } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkFileEventIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkFileEventIT.java index bb591d38d..ba52d1f9c 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkFileEventIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkFileEventIT.java @@ -70,11 +70,20 @@ public class S3SinkFileEventIT extends BaseConnectorIT { private static final String CONNECTOR_NAME = "s3-sink"; private static final String DEFAULT_TEST_TOPIC_NAME = "TestTopic"; - private static final List KAFKA_TOPICS = Collections.singletonList(DEFAULT_TEST_TOPIC_NAME); + private static final List KAFKA_TOPICS = + Collections.singletonList(DEFAULT_TEST_TOPIC_NAME); private JsonConverter jsonConverter; // custom producer to enable sending records with headers private Producer producer; + private Map autoCreate = + new HashMap() { + { + put("auto.register.schemas", "true"); + put("auto.create.topics.enable", "true"); + } + }; + ; @Before public void before() throws InterruptedException { @@ -82,7 +91,7 @@ public void before() throws InterruptedException { initializeCustomProducer(); setupProperties(); waitForSchemaRegistryToStart(); - //add class specific props + // add class specific props props.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", KAFKA_TOPICS)); props.put(FLUSH_SIZE_CONFIG, Integer.toString(FLUSH_SIZE_STANDARD)); props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); @@ -94,7 +103,9 @@ public void before() throws InterruptedException { // file event props.put(FILE_EVENT_ENABLE, "true"); // TimeBasedPartitioner - props.put(PartitionerConfig.PARTITIONER_CLASS_CONFIG, "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"); + props.put( + PartitionerConfig.PARTITIONER_CLASS_CONFIG, + "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"); props.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, "100"); props.put(PartitionerConfig.PATH_FORMAT_CONFIG, "'event_date'=YYYY-MM-dd/'event_hour'=HH"); props.put(PartitionerConfig.LOCALE_CONFIG, "FR_fr"); @@ -113,21 +124,18 @@ public void after() throws Exception { waitForFilesInBucket(TEST_BUCKET_NAME, 0); } - @Test public void testBasicRecordsWrittenParquetAndRelatedFileEvents() throws Throwable { // add test specific props props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName()); String topicFileEvent = "TopicFileEvent"; props.put( - FILE_EVENT_CONFIG_JSON, + FILE_EVENT_CONFIG_JSON, new KafkaFileEventConfig( topicFileEvent, connect.kafka().bootstrapServers(), restApp.restServer.getURI().toString(), - null, - null, - null) + this.autoCreate) .toJson()); connect.kafka().createTopic(topicFileEvent); testBasicRecordsWrittenAndRelatedFileEvents(PARQUET_EXTENSION, topicFileEvent); @@ -139,36 +147,45 @@ public void testFileEventPartition() { String fileEventTopic = "file_event_topic"; connect.kafka().createTopic(fileEventTopic); KafkaFileEventConfig kafkaFileEventConfig = - new KafkaFileEventConfig( - fileEventTopic, - bootstrapServers, - restApp.restServer.getURI().toString(), - null, - null, - null); + new KafkaFileEventConfig( + fileEventTopic, + bootstrapServers, + restApp.restServer.getURI().toString(), + this.autoCreate); KafkaFileEventProvider fileEvent = - new KafkaFileEventProvider(kafkaFileEventConfig.toJson(), false); - fileEvent.call("baz-topic", "version/event/hour", "file1.avro", 12, - new DateTime(1234L), new DateTime(123L), - 34, new DateTime(1234L).withZone(DateTimeZone.UTC)); - fileEvent.call("foo-topic", "version/event/hour", "fil2.avro", 8, - new DateTime(12345L), new DateTime(1234L), 12, new DateTime(12345L)); + new KafkaFileEventProvider(kafkaFileEventConfig.toJson(), false); + fileEvent.call( + "baz-topic", + "version/event/hour", + "file1.avro", + 12, + new DateTime(1234L), + new DateTime(123L), + 34, + new DateTime(1234L).withZone(DateTimeZone.UTC)); + fileEvent.call( + "foo-topic", + "version/event/hour", + "fil2.avro", + 8, + new DateTime(12345L), + new DateTime(1234L), + 12, + new DateTime(12345L)); // fails if two records are not present in kafka within 1s connect.kafka().consume(2, 1000L, fileEventTopic); } /** - * Test that the expected records are written for a given file extension - * Optionally, test that topics which have "*.{expectedFileExtension}*" in them are processed - * and written. + * Test that the expected records are written for a given file extension Optionally, test that + * topics which have "*.{expectedFileExtension}*" in them are processed and written. + * * @param expectedFileExtension The file extension to test against * @param fileEventTopic The fileEvent topic name * @throws Throwable */ private void testBasicRecordsWrittenAndRelatedFileEvents( - String expectedFileExtension, - String fileEventTopic - ) throws Throwable { + String expectedFileExtension, String fileEventTopic) throws Throwable { // Add an extra topic with this extension inside of the name // Use a TreeSet for test determinism Set topicNames = new TreeSet<>(KAFKA_TOPICS); @@ -176,14 +193,16 @@ private void testBasicRecordsWrittenAndRelatedFileEvents( // start sink connector connect.configureConnector(CONNECTOR_NAME, props); // wait for tasks to spin up - EmbeddedConnectUtils.waitForConnectorToStart(connect, CONNECTOR_NAME, Math.min(topicNames.size(), MAX_TASKS)); + EmbeddedConnectUtils.waitForConnectorToStart( + connect, CONNECTOR_NAME, Math.min(topicNames.size(), MAX_TASKS)); Schema recordValueSchema = getSampleStructSchema(); Struct recordValueStruct = getSampleStructVal(recordValueSchema); for (String thisTopicName : topicNames) { // Create and send records to Kafka using the topic name in the current 'thisTopicName' - SinkRecord sampleRecord = getSampleTopicRecord(thisTopicName, recordValueSchema, recordValueStruct); + SinkRecord sampleRecord = + getSampleTopicRecord(thisTopicName, recordValueSchema, recordValueStruct); produceRecordsNoHeaders(NUM_RECORDS_INSERT, sampleRecord); } @@ -194,13 +213,13 @@ private void testBasicRecordsWrittenAndRelatedFileEvents( Set expectedTopicFilenames = new TreeSet<>(); for (String thisTopicName : topicNames) { - List theseFiles = getExpectedFilenames( + List theseFiles = + getExpectedFilenames( thisTopicName, TOPIC_PARTITION, FLUSH_SIZE_STANDARD, NUM_RECORDS_INSERT, - expectedFileExtension - ); + expectedFileExtension); assertEquals(theseFiles.size(), countPerTopic); expectedTopicFilenames.addAll(theseFiles); } @@ -221,8 +240,8 @@ private void produceRecords( SinkRecord record, boolean withKey, boolean withValue, - boolean withHeaders - ) throws ExecutionException, InterruptedException { + boolean withHeaders) + throws ExecutionException, InterruptedException { byte[] kafkaKey = null; byte[] kafkaValue = null; Iterable
headers = Collections.emptyList(); @@ -230,12 +249,13 @@ private void produceRecords( kafkaKey = jsonConverter.fromConnectData(topic, Schema.STRING_SCHEMA, record.key()); } if (withValue) { - kafkaValue = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + kafkaValue = + jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); } if (withHeaders) { headers = sampleHeaders(); } - ProducerRecord producerRecord = + ProducerRecord producerRecord = new ProducerRecord<>(topic, TOPIC_PARTITION, kafkaKey, kafkaValue, headers); for (long i = 0; i < recordCount; i++) { producer.send(producerRecord).get(); @@ -253,9 +273,11 @@ private void initializeJsonConverter() { private void initializeCustomProducer() { Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers()); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + producerProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName()); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + producerProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName()); producer = new KafkaProducer<>(producerProps); } @@ -270,5 +292,4 @@ private void setupProperties() { // aws credential if exists props.putAll(getAWSCredentialFromPath()); } - }