-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Avro fails silently on serializing value for DLQ topic. #45644
Comments
/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka) |
I totally agree. It should at least be an error in the log and potentially we can write an empty record with a specific header. @ozangunalp wdyt? |
The record is well present but without the value due to this serialization exception. You can expect this one following the reproducer {
"topic": "internal-service-movies-dlq",
"headers": [
{
"key": "dead-letter-exception-class-name",
"value": "java.lang.RuntimeException"
},
{
"key": "dead-letter-reason",
"value": "Something went wrong"
},
{
"key": "dead-letter-topic",
"value": "movies"
},
{
"key": "dead-letter-partition",
"value": "0"
},
{
"key": "dead-letter-offset",
"value": "0"
}
],
"timestamp": 1737023582150,
"partition": 0,
"offset": 0
} And I guess by passing the header, the consumer can rely on them to check about a serialization issue if the value is null. |
at least it would be nice to have a header |
I thought There should also be a log from DLQ failure handler about messages being sent to the DLQ topic. Maybe the failure reason is not logged. @dcdh don't you see headers with keys starting with |
I put the record from the dlq topic in my previous comment. This is not a deserialization issue in my case but a serialization of the message value in Avro format to byte array for the dlq topic and the value coming from the original topic. Regarding the sample provided:
Headers regarding exception likes dead-letter-exception-class-name ... are well presents. |
Ok, I thought the whole issue was about a deserialization error. But it was about the serialization of DLQ record. Indeed the default implementation of You can specify a custom On the upstream reactive messaging, we can think of having the default handler to add headers, logging etc. in case of serialization error, in the same way |
Ok I confirm it is working :) Having this conf: mp.messaging.incoming.movies-from-external-service-in.dead-letter-queue.value-serialization-failure-handler=dlq-serialization-custom with this @ApplicationScoped
@Identifier("dlq-serialization-custom")
public class CustomSerializationFailureHandler implements SerializationFailureHandler<Object> {
@Override
public byte[] handleSerializationFailure(String topic, boolean isKey, String serializer, Object data, Exception exception, Headers headers) {
if (exception != null) {
headers.add("serialization-exception", exception.getMessage().getBytes());
headers.add("serialization-exception-class", exception.getClass().getName().getBytes(StandardCharsets.UTF_8));
}
return null;
}
} I can add them inside the headers. {
"topic": "internal-service-movies-dlq",
"headers": [
{
"key": "dead-letter-exception-class-name",
"value": "java.lang.RuntimeException"
},
{
"key": "dead-letter-reason",
"value": "Something wrong happened"
},
{
"key": "dead-letter-topic",
"value": "movies"
},
{
"key": "dead-letter-partition",
"value": "0"
},
{
"key": "dead-letter-offset",
"value": "0"
},
{
"key": "serialization-exception",
"value": "Error retrieving Avro schema{\"type\":\"record\",\"name\":\"Movie\",\"namespace\":\"org.acme.kafka.quarkus\",\"fields\":[{\"name\":\"title\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"release_year\",\"type\":\"int\"}]}"
},
{
"key": "serialization-exception-class",
"value": "org.apache.kafka.common.errors.SerializationException"
}
],
"timestamp": 1737045518608,
"partition": 0,
"offset": 0
} I was thinking that defining a custom serializer where not possible because it is missing in the doc : https://quarkus.io/guides/kafka in table Table 1. Incoming Attributes of the 'smallrye-kafka' connector Maybe a paragraphe regarding dead-letter-queue could be added into https://quarkus.io/guides/kafka#handling-serialization-failures Is it possible to update the doc ? Moreover by doing this way I will lost all benefits regarding KafkaDeadLetterSerializationHandler and deserialization headers behaviors. |
would that make sense to add the headers |
@dcdh you can also include the decorateSerialization method or extend the It doesn't figure in the incoming attributes because it configures the internal producer created for the DLQ. Basically, the DLQ accepts all config that a producer channel accepts.
Yes, an update to the doc is needed to clarify this.
@vsevel definitely, I am even thinking the default methods of |
Describe the bug
In my organization we are doing Kafka messaging with Avro and Apicurio.
We do not want to provide the ability to the application to create automatically the Avro Schema when it is not present. It is done by another process inside our continuous deployment.
But sometime, we may miss it (and this is what happened here).
When a message is nack because an exception has been thrown, the message will be forwarded to the DLQ and the original value will be serialized to byte array to be stored as a message in DLQ.
In my case, we've got a serialization issue because the Avro Schema having subject
mydlqtopic-value
is not present inside Apicurio.The value is not present inside the DLQ message - not surprised regarding this is a serialization issue.
But, we where unaware of this null value unless we do a full debug.
Expected behavior
When a serialization issue occurred, we should be notified.
It could be a log or an header dealing about it.
Actual behavior
value is null in DLQ (expected).
The exception is forgotten: not logs. No way to understand that there is an issue while serializing the Avro message with Apicurio.
How to Reproduce?
=> You will notice that a SerializationException has occurred, but no log produced or headers about it.
This reproducer will produce a message for a movie topic with Schema creation in Avro.
Next the consumer will throw a RuntimeException to put the message in DLQ.
And next the processing will failed because the schema creation is forbidden by the application using this configuration in application.properties
mp.messaging.connector.smallrye-kafka.auto.register.schemas=false
So the
io.confluent.kafka.serializers.KafkaAvroSerializer
will not create the Schema in Avro and a SerializationException will be produced at serialization to mimic my organization behavior.You can have a look to this result
Output of
uname -a
orver
MSYS_NT ...
Output of
java -version
openjdk version "21.0.3" 2024-04-16 LTS OpenJDK Runtime Environment Temurin-21.0.3+9 (build 21.0.3+9-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.3+9 (build 21.0.3+9-LTS, mixed mode, sharing
Quarkus version or git rev
3.17.7
Build tool (ie. output of
mvnw --version
orgradlew --version
)Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae) Maven home: E:\projects\maven\apache-maven-3.9.6 Java version: 21.0.1, vendor: Oracle Corporation, runtime: E:\projects\jdk\64\21.0.1 Default locale: en_US, platform encoding: UTF-8 OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"
Additional information
The message will be handled by
io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterSerializationHandler
to enrich it with valuable data about the exception, the message offset, partition and topic...I guess the log or message header should be added here by implementing
handleSerializationFailure
The text was updated successfully, but these errors were encountered: