Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename namespace #701

Open
wants to merge 22 commits into
base: 10.0.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,24 @@
files="JestElasticsearchClient.java"
/>

<suppress
checks="PackageName"
files="io.confluent.connect.elasticsearch_2_4.*"
/>

<suppress
checks="CyclomaticComplexity"
files="io.confluent.connect.elasticsearch_2_4.*"
/>

<suppress
checks="NPathComplexity"
files="io.confluent.connect.elasticsearch_2_4.ElasticsearchWriter"
/>

<suppress
checks="ClassDataAbstractionCoupling"
files="io.confluent.connect.elasticsearch_2_4.ElasticsearchWriter"
/>

</suppressions>
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>io.confluent</groupId>
<artifactId>kafka-connect-elasticsearch</artifactId>
<version>10.0.4-SNAPSHOT</version>
<version>10.0.8</version>
<packaging>jar</packaging>
<name>kafka-connect-elasticsearch</name>
<organization>
Expand Down Expand Up @@ -89,6 +89,11 @@
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
<dependency>
<groupId>com.instana</groupId>
<artifactId>instana-java-sdk</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down Expand Up @@ -275,7 +280,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all,-try</arg>
<arg>-Werror</arg>
<!-- <arg>-Werror</arg>-->
</compilerArgs>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch;
package io.confluent.connect.elasticsearch_2_4;

import io.confluent.connect.elasticsearch.bulk.BulkClient;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.confluent.connect.elasticsearch_2_4.bulk.BulkClient;
import io.confluent.connect.elasticsearch_2_4.bulk.BulkRequest;
import io.confluent.connect.elasticsearch_2_4.bulk.BulkResponse;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch;
package io.confluent.connect.elasticsearch_2_4;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
Expand All @@ -33,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -44,14 +48,16 @@
import java.util.Map;
import java.util.Objects;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
import static io.confluent.connect.elasticsearch_2_4.ElasticsearchSinkConnectorConstants.MAP_KEY;
import static io.confluent.connect.elasticsearch_2_4.ElasticsearchSinkConnectorConstants.MAP_VALUE;

public class DataConverter {

private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
private static final Converter JSON_CONVERTER;

private ObjectMapper objectMapper;

static {
JSON_CONVERTER = new JsonConverter();
JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
Expand All @@ -75,6 +81,7 @@ public DataConverter(boolean useCompactMapEntries, BehaviorOnNullValues behavior
this.useCompactMapEntries = useCompactMapEntries;
this.behaviorOnNullValues =
Objects.requireNonNull(behaviorOnNullValues, "behaviorOnNullValues cannot be null.");
this.objectMapper = new ObjectMapper();
}

private String convertKey(Schema keySchema, Object key) {
Expand Down Expand Up @@ -187,6 +194,49 @@ public IndexableRecord convertRecord(
return new IndexableRecord(new Key(index, type, id), payload, version);
}

public JsonNode getValueAsJson(SinkRecord record) {
if (record.value() == null) {
return null;
}

byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(
record.topic(),
record.valueSchema(),
record.value()
);
JsonNode jsonPayload;
try {
jsonPayload = objectMapper.readTree(rawJsonPayload);
} catch (IOException e) {
// Should not happen if the payload was retrieved correctly.
jsonPayload = null;
}

return jsonPayload;
}

public SchemaAndValue getSchemaAndValueFromJson(String topic, String json) {
if (json == null) {
return null;
}
JsonNode jsonPayload;
try {
jsonPayload = objectMapper.readTree(json);
} catch (IOException e) {
// Should not happen if the payload was retrieved correctly.
return JSON_CONVERTER.toConnectData(
topic,
new byte[0]
);
}

return JSON_CONVERTER.toConnectData(
topic,
jsonPayload.toPrettyString().getBytes()
);
}


private String getPayload(SinkRecord record, boolean ignoreSchema) {
if (record.value() == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch;
package io.confluent.connect.elasticsearch_2_4;

import com.google.gson.JsonObject;
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
import io.confluent.connect.elasticsearch_2_4.bulk.BulkRequest;
import io.confluent.connect.elasticsearch_2_4.bulk.BulkResponse;
import org.apache.kafka.connect.data.Schema;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch;
package io.confluent.connect.elasticsearch_2_4;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
Expand Down
Loading