From 09f2a058f460e92953555f6039ae1c5d916eeb7a Mon Sep 17 00:00:00 2001 From: Giorgi Khvedeliani <43849591+PoteRii@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:09:02 +0400 Subject: [PATCH] do not throw exception for existing cap-msg-group message header. overwrite it instead. tolerate kafka message duplicate headers (#1623) --- src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs | 2 +- .../AzureServiceBusConsumerClient.cs | 2 +- src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs | 4 ++-- src/DotNetCore.CAP.NATS/NATSConsumerClient.cs | 2 +- src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs | 2 +- src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs | 3 ++- 6 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs index ae777ce7a..fd241b737 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -111,7 +111,7 @@ Task Consume() var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); - message.Headers.Add(Headers.Group, _groupId); + message.Headers[Headers.Group] = _groupId; return OnMessageCallback!(message, response.Messages[0].ReceiptHandle); } diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index dd5472d53..940026375 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -278,7 +278,7 @@ private TransportMessage ConvertMessage(ServiceBusReceivedMessage message) var headers = message.ApplicationProperties .ToDictionary(x => x.Key, y => y.Value?.ToString()); - headers.Add(Headers.Group, _subscriptionName); + headers[Headers.Group] = _subscriptionName; if (_asbOptions.CustomHeadersBuilder != null) { diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index ad1cb7f0f..e65ba551d 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -171,10 +171,10 @@ private async Task Consume(ConsumeResult consumerResult) foreach (var header in consumerResult.Message.Headers) { var val = header.GetValueBytes(); - headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); + headers[header.Key] = val != null ? Encoding.UTF8.GetString(val) : null; } - headers.Add(Headers.Group, _groupId); + headers[Headers.Group] = _groupId; if (_kafkaOptions.CustomHeadersBuilder != null) { diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index 0e0216d66..aad1ad6a3 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -166,7 +166,7 @@ Task Consume() headers.Add(h, e.Message.Header[h]); } - headers.Add(Headers.Group, _groupName); + headers[Headers.Group] = _groupName; if (_natsOptions.CustomHeadersBuilder != null) { diff --git a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs index 01c5fc15d..40fde876e 100644 --- a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs +++ b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs @@ -78,7 +78,7 @@ Task Consume() headers.Add(header.Key, header.Value); } - headers.Add(Headers.Group, _groupId); + headers[Headers.Group] = _groupId; var message = new TransportMessage(headers, consumerResult.Data); diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 66815e16d..12a2fba51 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Text; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Messages; @@ -67,7 +68,7 @@ Task Consume() headers.Add(header.Key, header.Value?.ToString()); } - headers.Add(Messages.Headers.Group, _groupName); + headers[Messages.Headers.Group] = _groupName; if (_customHeadersBuilder != null) {