Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Avro fails silently on serializing value for DLQ topic. #45644

Open
dcdh opened this issue Jan 16, 2025 · 10 comments
Open

Kafka Avro fails silently on serializing value for DLQ topic. #45644

dcdh opened this issue Jan 16, 2025 · 10 comments
Labels
area/kafka env/windows Impacts Windows machines kind/bug Something isn't working

Comments

@dcdh
Copy link
Contributor

dcdh commented Jan 16, 2025

Describe the bug

In my organization we are doing Kafka messaging with Avro and Apicurio.

We do not want to provide the ability to the application to create automatically the Avro Schema when it is not present. It is done by another process inside our continuous deployment.
But sometime, we may miss it (and this is what happened here).

When a message is nack because an exception has been thrown, the message will be forwarded to the DLQ and the original value will be serialized to byte array to be stored as a message in DLQ.

In my case, we've got a serialization issue because the Avro Schema having subject mydlqtopic-value is not present inside Apicurio.

The value is not present inside the DLQ message - not surprised regarding this is a serialization issue.

But, we where unaware of this null value unless we do a full debug.

Expected behavior

When a serialization issue occurred, we should be notified.
It could be a log or an header dealing about it.

Actual behavior

value is null in DLQ (expected).

The exception is forgotten: not logs. No way to understand that there is an issue while serializing the Avro message with Apicurio.

How to Reproduce?

  1. git clone https://github.com/dcdh/avro-serialization-dlq.git
  2. put a breakpoint here
  3. run in debug

=> You will notice that a SerializationException has occurred, but no log produced or headers about it.

This reproducer will produce a message for a movie topic with Schema creation in Avro.
Next the consumer will throw a RuntimeException to put the message in DLQ.

    @Incoming("movies-from-external-service-in")
    public void process(ConsumerRecord<String, Movie> data) {
        throw new RuntimeException("Something went wrong");
    }

And next the processing will failed because the schema creation is forbidden by the application using this configuration in application.properties

mp.messaging.connector.smallrye-kafka.auto.register.schemas=false

So the io.confluent.kafka.serializers.KafkaAvroSerializer will not create the Schema in Avro and a SerializationException will be produced at serialization to mimic my organization behavior.

You can have a look to this result

Output of uname -a or ver

MSYS_NT ...

Output of java -version

openjdk version "21.0.3" 2024-04-16 LTS OpenJDK Runtime Environment Temurin-21.0.3+9 (build 21.0.3+9-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.3+9 (build 21.0.3+9-LTS, mixed mode, sharing

Quarkus version or git rev

3.17.7

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: E:\projects\maven\apache-maven-3.9.6 Java version: 21.0.1, vendor: Oracle Corporation, runtime: E:\projects\jdk\64\21.0.1 Default locale: en_US, platform encoding: UTF-8 OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"

Additional information

The message will be handled by io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterSerializationHandler to enrich it with valuable data about the exception, the message offset, partition and topic...

I guess the log or message header should be added here by implementing handleSerializationFailure

@dcdh dcdh added the kind/bug Something isn't working label Jan 16, 2025
@quarkus-bot quarkus-bot bot added area/kafka env/windows Impacts Windows machines labels Jan 16, 2025
Copy link

quarkus-bot bot commented Jan 16, 2025

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

@cescoffier
Copy link
Member

I totally agree. It should at least be an error in the log and potentially we can write an empty record with a specific header.

@ozangunalp wdyt?

@dcdh
Copy link
Contributor Author

dcdh commented Jan 16, 2025

The record is well present but without the value due to this serialization exception.

You can expect this one following the reproducer

{
  "topic": "internal-service-movies-dlq",
  "headers": [
    {
      "key": "dead-letter-exception-class-name",
      "value": "java.lang.RuntimeException"
    },
    {
      "key": "dead-letter-reason",
      "value": "Something went wrong"
    },
    {
      "key": "dead-letter-topic",
      "value": "movies"
    },
    {
      "key": "dead-letter-partition",
      "value": "0"
    },
    {
      "key": "dead-letter-offset",
      "value": "0"
    }
  ],
  "timestamp": 1737023582150,
  "partition": 0,
  "offset": 0
}

And I guess by passing the header, the consumer can rely on them to check about a serialization issue if the value is null.

@vsevel
Copy link
Contributor

vsevel commented Jan 16, 2025

at least it would be nice to have a header dead-letter-publish-error stating the serialization issue in that case

@ozangunalp
Copy link
Contributor

I thought DeserializationFailureHandler and KafkaDeadLetterSerializationHandler handled exactly that case.

There should also be a log from DLQ failure handler about messages being sent to the DLQ topic. Maybe the failure reason is not logged.

@dcdh don't you see headers with keys starting with deserialization-failure- inside the DLQ topic?

@dcdh
Copy link
Contributor Author

dcdh commented Jan 16, 2025

@ozangunalp

I put the record from the dlq topic in my previous comment.

This is not a deserialization issue in my case but a serialization of the message value in Avro format to byte array for the dlq topic and the value coming from the original topic.

Regarding the sample provided:

  1. I receive a message in my movie topic bind with the channel movies-from-external-service-in
  2. The channel consumption will produce a RuntimeException with message Something went wrong
  3. The dead-letter strategy (@Identifier(Strategy.DEAD_LETTER_QUEUE) setup the "value-serialization-failure-handler" here to be handled by the KafkaDeadLetterSerializationHandler linked with the Identifier dlq-serialization
  4. In this case the KafkaDeadLetterSerializationHandler is involved to serialize the message using Avro with subject compound of dlq topic name - value likes internal-service-movies-dlq-value
  5. The Avro schema does not existe => SerializationException
  6. A call to handleSerializationFailure is made
  7. Nothing is done because handleSerializationFailure by default do nothing.

Headers regarding exception likes dead-letter-exception-class-name ... are well presents.

@ozangunalp
Copy link
Contributor

Ok, I thought the whole issue was about a deserialization error. But it was about the serialization of DLQ record.

Indeed the default implementation of KafkaDeadLetterSerializationHandler takes care of the case where there was already a deserialization issue with the record.

You can specify a custom SerializationFailureHandler implementation and provide it to the DLQ using
mp.messaging.incoming.movies-from-external-service-in.dead-letter-queue.value-serialization-failure-handler=dlq-serialization-custom

On the upstream reactive messaging, we can think of having the default handler to add headers, logging etc. in case of serialization error, in the same way SerializationFailureHandler does.

@dcdh
Copy link
Contributor Author

dcdh commented Jan 16, 2025

Ok I confirm it is working :)

Having this conf:

mp.messaging.incoming.movies-from-external-service-in.dead-letter-queue.value-serialization-failure-handler=dlq-serialization-custom

with this CustomSerializationFailureHandler

@ApplicationScoped
@Identifier("dlq-serialization-custom")
public class CustomSerializationFailureHandler implements SerializationFailureHandler<Object> {

    @Override
    public byte[] handleSerializationFailure(String topic, boolean isKey, String serializer, Object data, Exception exception, Headers headers) {
        if (exception != null) {
            headers.add("serialization-exception", exception.getMessage().getBytes());
            headers.add("serialization-exception-class", exception.getClass().getName().getBytes(StandardCharsets.UTF_8));
        }
        return null;
    }
}

I can add them inside the headers.

{
  "topic": "internal-service-movies-dlq",
  "headers": [
    {
      "key": "dead-letter-exception-class-name",
      "value": "java.lang.RuntimeException"
    },
    {
      "key": "dead-letter-reason",
      "value": "Something wrong happened"
    },
    {
      "key": "dead-letter-topic",
      "value": "movies"
    },
    {
      "key": "dead-letter-partition",
      "value": "0"
    },
    {
      "key": "dead-letter-offset",
      "value": "0"
    },
    {
      "key": "serialization-exception",
      "value": "Error retrieving Avro schema{\"type\":\"record\",\"name\":\"Movie\",\"namespace\":\"org.acme.kafka.quarkus\",\"fields\":[{\"name\":\"title\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"release_year\",\"type\":\"int\"}]}"
    },
    {
      "key": "serialization-exception-class",
      "value": "org.apache.kafka.common.errors.SerializationException"
    }
  ],
  "timestamp": 1737045518608,
  "partition": 0,
  "offset": 0
}

I was thinking that defining a custom serializer where not possible because it is missing in the doc : https://quarkus.io/guides/kafka in table Table 1. Incoming Attributes of the 'smallrye-kafka' connector

Maybe a paragraphe regarding dead-letter-queue could be added into https://quarkus.io/guides/kafka#handling-serialization-failures

Is it possible to update the doc ?

Moreover by doing this way I will lost all benefits regarding KafkaDeadLetterSerializationHandler and deserialization headers behaviors.

@vsevel
Copy link
Contributor

vsevel commented Jan 16, 2025

would that make sense to add the headers serialization-exception* in the existing standard handler KafkaDeadLetterSerializationHandler ?

@ozangunalp
Copy link
Contributor

@dcdh you can also include the decorateSerialization method or extend the KafkaDeadLetterSerializationHandler directly.
The serialization call is wrapped in a uni to be able to easily retry the serialization etc.

It doesn't figure in the incoming attributes because it configures the internal producer created for the DLQ. Basically, the DLQ accepts all config that a producer channel accepts.

Is it possible to update the doc ?

Yes, an update to the doc is needed to clarify this.

would that make sense to add the headers serialization-exception* in the existing standard handler KafkaDeadLetterSerializationHandler ?

@vsevel definitely, I am even thinking the default methods of SerializationFailureHandler can implement that behaviour, like the DeserializationFailureHandler does.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka env/windows Impacts Windows machines kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants