From 285904f31c59a12fcf19f62681866fee3eeb08b0 Mon Sep 17 00:00:00 2001 From: Jon Abaunza Date: Thu, 26 Sep 2024 12:39:50 +0200 Subject: [PATCH 1/2] feat: provide support for Kafka message keys different than "string?" Implements #313 Signed-off-by: Jon Abaunza --- .../CloudNative.CloudEvents.Kafka.csproj | 4 +- .../KafkaExtensions.cs | 56 ++++++-- .../BinaryGuidPartitionKeyAdapter.cs | 36 +++++ .../IPartitionKeyAdapter.cs | 26 ++++ .../NullPartitionKeyAdapter.cs | 23 +++ .../StringPartitionKeyAdapter.cs | 22 +++ .../Kafka/KafkaTest.cs | 131 +++++++++++++++--- 7 files changed, 259 insertions(+), 39 deletions(-) create mode 100644 src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs create mode 100644 src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs create mode 100644 src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs create mode 100644 src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs diff --git a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj index 2bc9947..6009427 100644 --- a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj +++ b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj @@ -1,10 +1,10 @@ - + netstandard2.0;netstandard2.1;net8.0 Kafka extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;kafka - 8.0 + 9.0 enable diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs index c6f6858..cfa4008 100644 --- a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs +++ b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs @@ -1,9 +1,10 @@ -// Copyright (c) Cloud Native Foundation. +// Copyright (c) Cloud Native Foundation. // Licensed under the Apache 2.0 license. // See LICENSE file in the project root for full license information. using CloudNative.CloudEvents.Core; using CloudNative.CloudEvents.Extensions; +using CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; using Confluent.Kafka; using System; using System.Collections.Generic; @@ -32,7 +33,7 @@ public static class KafkaExtensions /// /// The message to check for the presence of a CloudEvent. Must not be null. /// true, if the request is a CloudEvent - public static bool IsCloudEvent(this Message message) => + public static bool IsCloudEvent(this Message message) => GetHeaderValue(message, SpecVersionKafkaHeader) is object || MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName)); @@ -56,6 +57,21 @@ public static CloudEvent ToCloudEvent(this Message message, /// A reference to a validated CloudEvent instance. public static CloudEvent ToCloudEvent(this Message message, CloudEventFormatter formatter, IEnumerable? extensionAttributes) + { + return ToCloudEvent(message, formatter, extensionAttributes, new StringPartitionKeyAdapter()); + } + + /// + /// Converts this Kafka message into a CloudEvent object. + /// + /// The Kafka message to convert. Must not be null. + /// The event formatter to use to parse the CloudEvent. Must not be null. + /// The extension attributes to use when parsing the CloudEvent. May be null. + /// The PartitionKey Adapter responsible for determining wether to set the partitionKey attribute and its value. + /// The type of key of the Kafka message. + /// A reference to a validated CloudEvent instance. + public static CloudEvent ToCloudEvent(this Message message, + CloudEventFormatter formatter, IEnumerable? extensionAttributes, IPartitionKeyAdapter partitionKeyAdapter) { Validation.CheckNotNull(message, nameof(message)); Validation.CheckNotNull(formatter, nameof(formatter)); @@ -109,16 +125,11 @@ public static CloudEvent ToCloudEvent(this Message message, formatter.DecodeBinaryModeEventData(message.Value, cloudEvent); } - InitPartitioningKey(message, cloudEvent); - return Validation.CheckCloudEventArgument(cloudEvent, nameof(message)); - } - - private static void InitPartitioningKey(Message message, CloudEvent cloudEvent) - { - if (!string.IsNullOrEmpty(message.Key)) + if (partitionKeyAdapter.ConvertKeyToPartitionKeyAttributeValue(message.Key, out var partitionKeyAttributeValue)) { - cloudEvent[Partitioning.PartitionKeyAttribute] = message.Key; + cloudEvent[Partitioning.PartitionKeyAttribute] = partitionKeyAttributeValue; } + return Validation.CheckCloudEventArgument(cloudEvent, nameof(message)); } /// @@ -136,12 +147,22 @@ private static void InitPartitioningKey(Message message, CloudE /// Content mode. Structured or binary. /// The formatter to use within the conversion. Must not be null. public static Message ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) + => ToKafkaMessage(cloudEvent, contentMode, formatter, new StringPartitionKeyAdapter()); + + /// + /// Converts a CloudEvent to a Kafka message. + /// + /// The CloudEvent to convert. Must not be null, and must be a valid CloudEvent. + /// Content mode. Structured or binary. + /// The formatter to use within the conversion. Must not be null. + /// The partition key adapter responsible for transforming the cloud event partitioning key into the desired Kafka key type. + /// The Kafka Key type to be used + public static Message ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, IPartitionKeyAdapter partitionKeyAdapter) { Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); Validation.CheckNotNull(formatter, nameof(formatter)); var headers = MapHeaders(cloudEvent); - string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute]; byte[] value; string? contentTypeHeaderValue; @@ -163,12 +184,17 @@ private static void InitPartitioningKey(Message message, CloudE { headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue)); } - return new Message + var message = new Message { Headers = headers, - Value = value, - Key = key + Value = value }; + if (partitionKeyAdapter.ConvertPartitionKeyAttributeValueToKey((string?)cloudEvent[Partitioning.PartitionKeyAttribute], out var keyValue) + && keyValue != null) + { + message.Key = keyValue; + } + return message; } private static Headers MapHeaders(CloudEvent cloudEvent) @@ -191,4 +217,4 @@ private static Headers MapHeaders(CloudEvent cloudEvent) return headers; } } -} \ No newline at end of file +} diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs new file mode 100644 index 0000000..434b1d9 --- /dev/null +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs @@ -0,0 +1,36 @@ +using System; + +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +{ + /// + /// Partion Key Adapter that converts to and from Guids in binary representation. + /// + public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter + { + /// + public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue) + { + if (keyValue == null) + { + attributeValue = null; + return false; + } + + attributeValue = new Guid(keyValue).ToString(); + return true; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue) + { + if (string.IsNullOrEmpty(attributeValue)) + { + keyValue = default; + return false; + } + + keyValue = Guid.Parse(attributeValue).ToByteArray(); + return true; + } + } +} diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs new file mode 100644 index 0000000..81b8ac1 --- /dev/null +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs @@ -0,0 +1,26 @@ +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +{ + /// + /// Defines the methods of the adapters responsible for transforming from cloud event + /// PartitionKey Attribute to Kafka Message Key. + /// + /// + public interface IPartitionKeyAdapter + { + /// + /// Converts a Message Key to PartionKey Attribute Value. + /// + /// The key value to transform. + /// The transformed attribute value (output). + /// Whether the attribute should be set. + bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue); + + /// + /// Converts a PartitionKey Attribute value to a Message Key. + /// + /// The attribute value to transform. + /// The transformed key value (output) + /// Whether the key should be set. + bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue); + } +} diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs new file mode 100644 index 0000000..50d88b8 --- /dev/null +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs @@ -0,0 +1,23 @@ +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +{ + /// + /// Partion Key Adapter that skips handling the key. + /// + /// The type of Kafka Message Key + public class NullPartitionKeyAdapter : IPartitionKeyAdapter + { + /// + public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue) + { + attributeValue = null; + return false; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue) + { + keyValue = default; + return false; + } + } +} diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs new file mode 100644 index 0000000..bcd0f9c --- /dev/null +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs @@ -0,0 +1,22 @@ +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +{ + /// + /// Partion Key Adapter that skips handling the key. + /// + public class StringPartitionKeyAdapter : IPartitionKeyAdapter + { + /// + public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue) + { + attributeValue = keyValue; + return true; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue) + { + keyValue = attributeValue; + return true; + } + } +} diff --git a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs index afe37f4..413e511 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs @@ -37,16 +37,9 @@ public void IsCloudEvent(string headerName, string headerValue, bool expectedRes public void IsCloudEvent_NoHeaders() => Assert.False(new Message().IsCloudEvent()); - [Fact] - public void KafkaStructuredMessageTest() + private static CloudEvent CreateTestCloudEvent() { - // Kafka doesn't provide any way to get to the message transport level to do the test properly - // and it doesn't have an embedded version of a server for .Net so the lowest we can get is - // the `Message` - - var jsonEventFormatter = new JsonEventFormatter(); - - var cloudEvent = new CloudEvent + return new CloudEvent { Type = "com.github.pull.create", Source = new Uri("https://github.com/cloudevents/spec/pull"), @@ -55,21 +48,12 @@ public void KafkaStructuredMessageTest() Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero), DataContentType = MediaTypeNames.Text.Xml, Data = "", - ["comexampleextension1"] = "value" + ["comexampleextension1"] = "value", }; + } - var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, new JsonEventFormatter()); - - Assert.True(message.IsCloudEvent()); - - // Using serialization to create fully independent copy thus simulating message transport. - // The real transport will work in a similar way. - var serialized = JsonConvert.SerializeObject(message, new HeaderConverter()); - var messageCopy = JsonConvert.DeserializeObject>(serialized, new HeadersConverter(), new HeaderConverter())!; - - Assert.True(messageCopy.IsCloudEvent()); - var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter); - + private static void VerifyTestCloudEvent(CloudEvent receivedCloudEvent) + { Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion); Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source); @@ -82,6 +66,109 @@ public void KafkaStructuredMessageTest() Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]); } + private static Message? SimulateMessageTransport(Message message) + { + // Using serialization to create fully independent copy thus simulating message transport. + // The real transport will work in a similar way. + var serialized = JsonConvert.SerializeObject(message, new HeaderConverter()); + var messageCopy = JsonConvert.DeserializeObject>(serialized, new HeadersConverter(), new HeaderConverter())!; + return messageCopy; + } + + [Fact] + public void KafkaStructuredMessageTest() + { + // Kafka doesn't provide any way to get to the message transport level to do the test properly + // and it doesn't have an embedded version of a server for .Net so the lowest we can get is + // the `Message` + + var jsonEventFormatter = new JsonEventFormatter(); + var key = "Test"; + var cloudEvent = CreateTestCloudEvent(); + cloudEvent[Partitioning.PartitionKeyAttribute] = key; + + var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, jsonEventFormatter); + + Assert.True(message.IsCloudEvent()); + + var messageCopy = SimulateMessageTransport(message); + + Assert.NotNull(messageCopy); + Assert.Equal(key, messageCopy.Key); + Assert.True(messageCopy.IsCloudEvent()); + var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, null); + + VerifyTestCloudEvent(receivedCloudEvent); + } + + [Fact] + public void KafkaBinaryGuidKeyedStructuredMessageTest() + { + // In order to test the most extreme case of key management we will simulate + // using Guid Keys serialized in their binary form in kafka that are converted + // back to their string representation in the cloudEvent. + var partitionKeyAdapter = new PartitionKeyAdapters.BinaryGuidPartitionKeyAdapter(); + var jsonEventFormatter = new JsonEventFormatter(); + var key = Guid.NewGuid(); + var cloudEvent = CreateTestCloudEvent(); + cloudEvent[Partitioning.PartitionKeyAttribute] = key.ToString(); + + var message = cloudEvent.ToKafkaMessage( + ContentMode.Structured, + jsonEventFormatter, + partitionKeyAdapter); + + Assert.True(message.IsCloudEvent()); + + var messageCopy = SimulateMessageTransport(message); + + Assert.NotNull(messageCopy); + Assert.True(messageCopy.IsCloudEvent()); + + var receivedCloudEvent = messageCopy.ToCloudEvent( + jsonEventFormatter, + null, + partitionKeyAdapter); + + Assert.NotNull(message.Key); + // The key should be the original Guid in the binary representation. + Assert.Equal(key, new Guid(messageCopy.Key!)); + VerifyTestCloudEvent(receivedCloudEvent); + } + + [Fact] + public void KafkaNullKeyedStructuredMessageTest() + { + // It will test the serialization using Confluent's Confluent.Kafka.Null type for the key. + // As the default behavior without adapter is to skip the key it will work properly. + var partitionKeyAdapter = new PartitionKeyAdapters.NullPartitionKeyAdapter(); + var jsonEventFormatter = new JsonEventFormatter(); + var cloudEvent = CreateTestCloudEvent(); + // Even if the key is established in the cloud event it won't flow. + cloudEvent[Partitioning.PartitionKeyAttribute] = "Test"; + + var message = cloudEvent.ToKafkaMessage( + ContentMode.Structured, + jsonEventFormatter, + partitionKeyAdapter); + + Assert.True(message.IsCloudEvent()); + + var messageCopy = SimulateMessageTransport(message); + + Assert.NotNull(messageCopy); + Assert.True(messageCopy.IsCloudEvent()); + + var receivedCloudEvent = messageCopy.ToCloudEvent( + jsonEventFormatter, + null, + partitionKeyAdapter); + + //The Message key will continue to be null. + Assert.Null(message.Key); + VerifyTestCloudEvent(receivedCloudEvent); + } + [Fact] public void KafkaBinaryMessageTest() { From ed04c9f983b030d97459fc43969ea6abe6754f77 Mon Sep 17 00:00:00 2001 From: Jon Abaunza Date: Thu, 26 Sep 2024 13:22:35 +0200 Subject: [PATCH 2/2] fix: Add copyright notice, use file-scoped namespaces, avoid binary breaking change Signed-off-by: Jon Abaunza --- .../CloudNative.CloudEvents.Kafka.csproj | 2 +- .../KafkaExtensions.cs | 12 +++++ .../BinaryGuidPartitionKeyAdapter.cs | 53 ++++++++++--------- .../IPartitionKeyAdapter.cs | 45 ++++++++-------- .../NullPartitionKeyAdapter.cs | 39 +++++++------- .../StringPartitionKeyAdapter.cs | 41 +++++++------- .../Kafka/KafkaTest.cs | 1 - 7 files changed, 108 insertions(+), 85 deletions(-) diff --git a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj index 6009427..776918c 100644 --- a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj +++ b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj @@ -4,7 +4,7 @@ netstandard2.0;netstandard2.1;net8.0 Kafka extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;kafka - 9.0 + 10.0 enable diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs index cfa4008..6517ac9 100644 --- a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs +++ b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs @@ -25,6 +25,7 @@ public static class KafkaExtensions internal const string KafkaContentTypeAttributeName = "content-type"; private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion"; + /// /// Indicates whether this message holds a single CloudEvent. /// @@ -33,6 +34,17 @@ public static class KafkaExtensions /// /// The message to check for the presence of a CloudEvent. Must not be null. /// true, if the request is a CloudEvent + public static bool IsCloudEvent(this Message message) => IsCloudEvent(message); + + /// + /// Indicates whether this message holds a single CloudEvent. + /// + /// + /// This method returns false for batch requests, as they need to be parsed differently. + /// + /// The message to check for the presence of a CloudEvent. Must not be null. + /// The type of key of the Kafka message. + /// true, if the request is a CloudEvent public static bool IsCloudEvent(this Message message) => GetHeaderValue(message, SpecVersionKafkaHeader) is object || MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName)); diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs index 434b1d9..a402651 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs @@ -1,36 +1,39 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + using System; -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that converts to and from Guids in binary representation. +/// +public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter { - /// - /// Partion Key Adapter that converts to and from Guids in binary representation. - /// - public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter + /// + public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue) { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue) + if (keyValue == null) { - if (keyValue == null) - { - attributeValue = null; - return false; - } - - attributeValue = new Guid(keyValue).ToString(); - return true; + attributeValue = null; + return false; } - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue) - { - if (string.IsNullOrEmpty(attributeValue)) - { - keyValue = default; - return false; - } + attributeValue = new Guid(keyValue).ToString(); + return true; + } - keyValue = Guid.Parse(attributeValue).ToByteArray(); - return true; + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue) + { + if (string.IsNullOrEmpty(attributeValue)) + { + keyValue = default; + return false; } + + keyValue = Guid.Parse(attributeValue).ToByteArray(); + return true; } } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs index 81b8ac1..bfa6ecc 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs @@ -1,26 +1,29 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Defines the methods of the adapters responsible for transforming from cloud event +/// PartitionKey Attribute to Kafka Message Key. +/// +/// The type of Kafka Message Key. +public interface IPartitionKeyAdapter { /// - /// Defines the methods of the adapters responsible for transforming from cloud event - /// PartitionKey Attribute to Kafka Message Key. + /// Converts a Message Key to PartionKey Attribute Value. /// - /// - public interface IPartitionKeyAdapter - { - /// - /// Converts a Message Key to PartionKey Attribute Value. - /// - /// The key value to transform. - /// The transformed attribute value (output). - /// Whether the attribute should be set. - bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue); + /// The key value to transform. + /// The transformed attribute value (output). + /// Whether the attribute should be set. + bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue); - /// - /// Converts a PartitionKey Attribute value to a Message Key. - /// - /// The attribute value to transform. - /// The transformed key value (output) - /// Whether the key should be set. - bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue); - } + /// + /// Converts a PartitionKey Attribute value to a Message Key. + /// + /// The attribute value to transform. + /// The transformed key value (output) + /// Whether the key should be set. + bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue); } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs index 50d88b8..1aca2fb 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs @@ -1,23 +1,26 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that skips handling the key. +/// +/// The type of Kafka Message Key. +public class NullPartitionKeyAdapter : IPartitionKeyAdapter { - /// - /// Partion Key Adapter that skips handling the key. - /// - /// The type of Kafka Message Key - public class NullPartitionKeyAdapter : IPartitionKeyAdapter + /// + public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue) { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue) - { - attributeValue = null; - return false; - } + attributeValue = null; + return false; + } - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue) - { - keyValue = default; - return false; - } + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue) + { + keyValue = default; + return false; } } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs index bcd0f9c..5d82124 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs @@ -1,22 +1,25 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters -{ - /// - /// Partion Key Adapter that skips handling the key. - /// - public class StringPartitionKeyAdapter : IPartitionKeyAdapter - { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue) - { - attributeValue = keyValue; - return true; - } +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue) - { - keyValue = attributeValue; - return true; - } +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that skips handling the key. +/// +public class StringPartitionKeyAdapter : IPartitionKeyAdapter +{ + /// + public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue) + { + attributeValue = keyValue; + return true; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue) + { + keyValue = attributeValue; + return true; } } diff --git a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs index 413e511..1debc00 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs @@ -140,7 +140,6 @@ public void KafkaBinaryGuidKeyedStructuredMessageTest() public void KafkaNullKeyedStructuredMessageTest() { // It will test the serialization using Confluent's Confluent.Kafka.Null type for the key. - // As the default behavior without adapter is to skip the key it will work properly. var partitionKeyAdapter = new PartitionKeyAdapters.NullPartitionKeyAdapter(); var jsonEventFormatter = new JsonEventFormatter(); var cloudEvent = CreateTestCloudEvent();