Skip to content

Commit

Permalink
Upgrade to apicurio registry 3
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Feb 6, 2025
1 parent 4e16b57 commit 1391725
Show file tree
Hide file tree
Showing 36 changed files with 91 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public void clearHttpClient() {
Field providerReference = AbstractSchemaResolver.class.getDeclaredField("vertx");
providerReference.setAccessible(true);
AtomicReference ref = (AtomicReference) providerReference.get(null);
ref.set(null);
if (ref != null) {
ref.set(null);
}
} catch (NoSuchFieldException | IllegalAccessException t) {
log.error("Failed to clear Apicurio Http Client provider", t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.quarkus.apicurio.registry.protobuf;

import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
Expand All @@ -26,18 +25,11 @@ public void apicurioRegistryProtobuf(BuildProducer<ReflectiveClassBuildItem> ref
"io.apicurio.registry.serde.strategy.TopicIdStrategy").methods().fields()
.build());

reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.Default4ByteIdHandler",
"io.apicurio.registry.serde.Legacy8ByteIdHandler",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler").methods().fields()
.build());

String defaultSchemaResolver = "io.apicurio.registry.serde.DefaultSchemaResolver";
if (QuarkusClassLoader.isClassPresentAtRuntime(defaultSchemaResolver)) {
// Class not present after 2.2.0.Final
reflectiveClass.produce(ReflectiveClassBuildItem.builder(defaultSchemaResolver).methods()
.fields().build());
}
}

@BuildStep
Expand Down
6 changes: 3 additions & 3 deletions extensions/schema-registry/apicurio/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
<dependencyManagement>
<dependencies>
<dependency>
<artifactId>kotlinx-serialization-core-jvm</artifactId>
<groupId>org.jetbrains.kotlinx</groupId>
<version>1.6.1</version>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
<version>3.10.2</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
10 changes: 1 addition & 9 deletions extensions/schema-registry/apicurio/protobuf/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@
<dependencies>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<artifactId>apicurio-registry-protobuf-serde-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<artifactId>checker-qual</artifactId>
<groupId>org.checkerframework</groupId>
Expand All @@ -31,10 +27,6 @@
<artifactId>slf4j-jboss-logging</artifactId>
<groupId>org.jboss.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>okhttp</artifactId>
<groupId>com.squareup.okhttp3</groupId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class DefaultSerdeDiscoveryState {
private Boolean connectorHasValueDeserializer;

private Boolean hasConfluent;
private Boolean hasApicurio1;
private Boolean hasApicurio2Avro;
private Boolean hasApicurioAvro;
private Boolean hasJsonb;

DefaultSerdeDiscoveryState(IndexView index) {
Expand Down Expand Up @@ -142,32 +141,18 @@ boolean hasConfluent() {
return hasConfluent;
}

boolean hasApicurio1() {
if (hasApicurio1 == null) {
try {
Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
hasApicurio1 = true;
} catch (ClassNotFoundException e) {
hasApicurio1 = false;
}
}

return hasApicurio1;
}

boolean hasApicurio2Avro() {
if (hasApicurio2Avro == null) {
boolean hasApicurioAvro() {
if (hasApicurioAvro == null) {
try {
Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
hasApicurio2Avro = true;
hasApicurioAvro = true;
} catch (ClassNotFoundException e) {
hasApicurio2Avro = false;
hasApicurioAvro = false;
}
}

return hasApicurio2Avro;
return hasApicurioAvro;
}

boolean hasJsonb() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
if (isAvroGenerated || DotNames.AVRO_GENERIC_RECORD.equals(typeName)) {
int avroLibraries = 0;
avroLibraries += discovery.hasConfluent() ? 1 : 0;
avroLibraries += discovery.hasApicurio1() ? 1 : 0;
avroLibraries += discovery.hasApicurio2Avro() ? 1 : 0;
avroLibraries += discovery.hasApicurioAvro() ? 1 : 0;
if (avroLibraries > 1) {
LOGGER.debugf("Skipping Avro serde autodetection for %s, because multiple Avro serde libraries are present",
typeName);
Expand All @@ -965,12 +964,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
? Result.of("io.confluent.kafka.serializers.KafkaAvroSerializer")
: Result.of("io.confluent.kafka.serializers.KafkaAvroDeserializer")
.with(isAvroGenerated, "specific.avro.reader", "true");
} else if (discovery.hasApicurio1()) {
return serializer
? Result.of("io.apicurio.registry.utils.serde.AvroKafkaSerializer")
: Result.of("io.apicurio.registry.utils.serde.AvroKafkaDeserializer")
.with(isAvroGenerated, "apicurio.registry.use-specific-avro-reader", "true");
} else if (discovery.hasApicurio2Avro()) {
} else if (discovery.hasApicurioAvro()) {
return serializer
? Result.of("io.apicurio.registry.serde.avro.AvroKafkaSerializer")
: Result.of("io.apicurio.registry.serde.avro.AvroKafkaDeserializer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,9 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-integration-test-kafka-avro-apicurio2</artifactId>
<name>Quarkus - Integration Tests - Kafka Avro with Apicurio 2.x</name>
<description>The Apache Kafka Avro with Apicurio Registry 2.x integration tests module</description>

<!--
- This must be a separate Maven module, because adding Apicurio Registry 2.x
- libraries to the `kafka-avro` module would lead to dependency divergence.
- When we no longer care about Apicurio Registry 1.x, the `kafka-avro`
- module can be deleted, as this module is a copy and hence also includes
- the tests for Confluent schema registry.
-->
<artifactId>quarkus-integration-test-kafka-avro-apicurio3</artifactId>
<name>Quarkus - Integration Tests - Kafka Avro with Apicurio 3.x</name>
<description>The Apache Kafka Avro with Apicurio Registry 3.x integration tests module</description>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ protected void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String pat
.header("content-type", "application/json")
.body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
.post(path);

ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next();

Assertions.assertEquals(records.key(), (Integer) 0);
Pet pet = records.value();
Assertions.assertEquals("neo", pet.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-integration-test-kafka-json-schema-apicurio2</artifactId>
<name>Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x</name>
<description>The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module</description>
<artifactId>quarkus-integration-test-kafka-json-schema-apicurio3</artifactId>
<name>Quarkus - Integration Tests - Kafka Json Schema with Apicurio 3.x</name>
<description>The Apache Kafka Json Schema with Apicurio Registry 3.x integration tests module</description>

<dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,17 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-integration-test-kafka-protobuf-apicurio2</artifactId>
<name>Quarkus - Integration Tests - Kafka Protobuf with Apicurio 2.x</name>
<description>The Apache Kafka Protobuf with Apicurio Registry 2.x integration tests module</description>
<artifactId>quarkus-integration-test-kafka-protobuf-apicurio3</artifactId>
<name>Quarkus - Integration Tests - Kafka Protobuf with Apicurio 3.x</name>
<description>The Apache Kafka Protobuf with Apicurio Registry 3.x integration tests module</description>

<dependencyManagement>
<dependencies>
<dependency>
<artifactId>kotlinx-serialization-core-jvm</artifactId>
<groupId>org.jetbrains.kotlinx</groupId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
<version>3.6.0</version>
<artifactId>okio-jvm</artifactId>
<version>3.10.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -213,16 +209,36 @@
<executions>
<execution>
<id>gencode</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<phase>generate-sources</phase>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-dist</id>
<goals>
<goal>copy-resources</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<protoSourceRoot>./src/main/proto</protoSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}
</protocArtifact>
<outputDirectory>${project.build.outputDirectory}</outputDirectory>
<resources>
<resource>
<directory>${project.basedir}/target/generated-test-sources/protobuf/</directory>
<filtering>false</filtering>
<excludes/>
</resource>
</resources>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.example.tutorial.PetOuterClass;

import io.vertx.core.json.JsonObject;

/**
Expand All @@ -32,29 +34,29 @@ public JsonObject getApicurio() {

@POST
@Path("/apicurio")
public void sendApicurio(io.quarkus.it.kafka.protobuf.Pet pet) {
KafkaProducer<Integer, com.example.tutorial.PetOuterClass.Pet> p = creator
public void sendApicurio(Pet pet) {
KafkaProducer<Integer, PetOuterClass.Pet> p = creator
.createApicurioProducer("test-protobuf-apicurio");
send(p, pet, "test-protobuf-apicurio-producer");
}

private JsonObject get(KafkaConsumer<Integer, com.example.tutorial.PetOuterClass.Pet> consumer) {
final ConsumerRecords<Integer, com.example.tutorial.PetOuterClass.Pet> records = consumer
private JsonObject get(KafkaConsumer<Integer, PetOuterClass.Pet> consumer) {
final ConsumerRecords<Integer, PetOuterClass.Pet> records = consumer
.poll(Duration.ofMillis(60000));
if (records.isEmpty()) {
return null;
}
ConsumerRecord<Integer, com.example.tutorial.PetOuterClass.Pet> consumerRecord = records.iterator().next();
com.example.tutorial.PetOuterClass.Pet p = consumerRecord.value();
ConsumerRecord<Integer, PetOuterClass.Pet> consumerRecord = records.iterator().next();
PetOuterClass.Pet p = consumerRecord.value();
// We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema.
JsonObject result = new JsonObject();
result.put("name", p.getName());
result.put("color", p.getColor());
return result;
}

private void send(KafkaProducer<Integer, com.example.tutorial.PetOuterClass.Pet> producer, Pet pet, String topic) {
com.example.tutorial.PetOuterClass.Pet protoPet = com.example.tutorial.PetOuterClass.Pet.newBuilder()
private void send(KafkaProducer<Integer, PetOuterClass.Pet> producer, Pet pet, String topic) {
PetOuterClass.Pet protoPet = PetOuterClass.Pet.newBuilder()
.setColor(pet.getColor())
.setName(pet.getName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import com.example.tutorial.PetOuterClass.Pet;
import com.example.tutorial.PetOuterClass;

import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.config.KafkaSerdeConfig;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer;

Expand Down Expand Up @@ -44,36 +45,36 @@ public String getApicurioRegistryUrl() {
return apicurioRegistryUrl;
}

public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
public KafkaConsumer<Integer, PetOuterClass.Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
public KafkaProducer<Integer, PetOuterClass.Pet> createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId);
}

public static KafkaConsumer<Integer, Pet> createApicurioConsumer(String bootstrap, String apicurio,
public static KafkaConsumer<Integer, PetOuterClass.Pet> createApicurioConsumer(String bootstrap, String apicurio,
String groupdIdConfig, String subscribtionName) {
Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
return createConsumer(p, subscribtionName);
}

public static KafkaProducer<Integer, Pet> createApicurioProducer(String bootstrap, String apicurio,
public static KafkaProducer<Integer, PetOuterClass.Pet> createApicurioProducer(String bootstrap, String apicurio,
String clientId) {
Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
return createProducer(p);
}

private static KafkaConsumer<Integer, Pet> createConsumer(Properties props, String subscribtionName) {
private static KafkaConsumer<Integer, PetOuterClass.Pet> createConsumer(Properties props, String subscribtionName) {
if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
KafkaConsumer<Integer, Pet> consumer = new KafkaConsumer<>(props);
KafkaConsumer<Integer, PetOuterClass.Pet> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(subscribtionName));
return consumer;
}

private static KafkaProducer<Integer, Pet> createProducer(Properties props) {
private static KafkaProducer<Integer, PetOuterClass.Pet> createProducer(Properties props) {
if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
}
Expand All @@ -84,6 +85,8 @@ public static Properties getApicurioConsumerProperties(String bootstrap, String
Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName());
props.put(SerdeConfig.REGISTRY_URL, apicurio);
//When a specific class is used, headers have to be enabled so that the class is passed down to the deserializer
props.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
return props;
}

Expand All @@ -104,6 +107,8 @@ private static Properties getApicurioProducerProperties(String bootstrap, String
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName());
props.put(SerdeConfig.REGISTRY_URL, apicurio);
props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
//When a specific class is used, headers have to be enabled so that the class is passed down to the deserializer
props.put(KafkaSerdeConfig.ENABLE_HEADERS, "true");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN
# enable health check
quarkus.kafka.health.enabled=true

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry:3.0.6
Loading

0 comments on commit 1391725

Please sign in to comment.