Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide support for Kafka message keys different than "string?" #314

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Kafka extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;kafka</PackageTags>
<LangVersion>8.0</LangVersion>
<LangVersion>10.0</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

Expand Down
68 changes: 53 additions & 15 deletions src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using CloudNative.CloudEvents.Core;
using CloudNative.CloudEvents.Extensions;
using CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
Expand All @@ -24,6 +25,7 @@ public static class KafkaExtensions
internal const string KafkaContentTypeAttributeName = "content-type";
private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion";


/// <summary>
/// Indicates whether this message holds a single CloudEvent.
/// </summary>
Expand All @@ -32,7 +34,18 @@ public static class KafkaExtensions
/// </remarks>
/// <param name="message">The message to check for the presence of a CloudEvent. Must not be null.</param>
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent(this Message<string?, byte[]> message) =>
public static bool IsCloudEvent(this Message<string?, byte[]> message) => IsCloudEvent<string?>(message);

/// <summary>
/// Indicates whether this message holds a single CloudEvent.
/// </summary>
/// <remarks>
/// This method returns false for batch requests, as they need to be parsed differently.
/// </remarks>
/// <param name="message">The message to check for the presence of a CloudEvent. Must not be null.</param>
/// <typeparam name="TKey">The type of key of the Kafka message.</typeparam>
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent<TKey>(this Message<TKey, byte[]> message) =>
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName));

Expand All @@ -56,6 +69,21 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
return ToCloudEvent(message, formatter, extensionAttributes, new StringPartitionKeyAdapter());
}

/// <summary>
/// Converts this Kafka message into a CloudEvent object.
/// </summary>
/// <param name="message">The Kafka message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <param name="partitionKeyAdapter">The PartitionKey Adapter responsible for determining wether to set the partitionKey attribute and its value.</param>
/// <typeparam name="TKey">The type of key of the Kafka message.</typeparam>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent<TKey>(this Message<TKey, byte[]> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
{
Validation.CheckNotNull(message, nameof(message));
Validation.CheckNotNull(formatter, nameof(formatter));
Expand Down Expand Up @@ -109,16 +137,11 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
formatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
}

InitPartitioningKey(message, cloudEvent);
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
}

private static void InitPartitioningKey(Message<string?, byte[]> message, CloudEvent cloudEvent)
{
if (!string.IsNullOrEmpty(message.Key))
if (partitionKeyAdapter.ConvertKeyToPartitionKeyAttributeValue(message.Key, out var partitionKeyAttributeValue))
{
cloudEvent[Partitioning.PartitionKeyAttribute] = message.Key;
cloudEvent[Partitioning.PartitionKeyAttribute] = partitionKeyAttributeValue;
}
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
}

/// <summary>
Expand All @@ -136,12 +159,22 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message<string?, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
=> ToKafkaMessage(cloudEvent, contentMode, formatter, new StringPartitionKeyAdapter());

/// <summary>
/// Converts a CloudEvent to a Kafka message.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
/// <param name="partitionKeyAdapter">The partition key adapter responsible for transforming the cloud event partitioning key into the desired Kafka key type.</param>
/// <typeparam name="TKey">The Kafka Key type to be used </typeparam>
public static Message<TKey, byte[]> ToKafkaMessage<TKey>(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(formatter, nameof(formatter));

var headers = MapHeaders(cloudEvent);
string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
byte[] value;
string? contentTypeHeaderValue;

Expand All @@ -163,12 +196,17 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
{
headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue));
}
return new Message<string?, byte[]>
var message = new Message<TKey, byte[]>
{
Headers = headers,
Value = value,
Key = key
Value = value
};
if (partitionKeyAdapter.ConvertPartitionKeyAttributeValueToKey((string?)cloudEvent[Partitioning.PartitionKeyAttribute], out var keyValue)
&& keyValue != null)
{
message.Key = keyValue;
}
return message;
}

private static Headers MapHeaders(CloudEvent cloudEvent)
Expand All @@ -191,4 +229,4 @@ private static Headers MapHeaders(CloudEvent cloudEvent)
return headers;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using System;

namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;

/// <summary>
/// Partion Key Adapter that converts to and from Guids in binary representation.
/// </summary>
public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter<byte[]?>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue)
{
if (keyValue == null)
{
attributeValue = null;
return false;
}

attributeValue = new Guid(keyValue).ToString();
return true;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue)
{
if (string.IsNullOrEmpty(attributeValue))
{
keyValue = default;
return false;
}

keyValue = Guid.Parse(attributeValue).ToByteArray();
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;

/// <summary>
/// Defines the methods of the adapters responsible for transforming from cloud event
/// PartitionKey Attribute to Kafka Message Key.
/// </summary>
/// <typeparam name="TKey">The type of Kafka Message Key.</typeparam>
public interface IPartitionKeyAdapter<TKey>
{
/// <summary>
/// Converts a Message Key to PartionKey Attribute Value.
/// </summary>
/// <param name="keyValue">The key value to transform.</param>
/// <param name="attributeValue">The transformed attribute value (output).</param>
/// <returns>Whether the attribute should be set.</returns>
bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue);

/// <summary>
/// Converts a PartitionKey Attribute value to a Message Key.
/// </summary>
/// <param name="attributeValue">The attribute value to transform.</param>
/// <param name="keyValue">The transformed key value (output)</param>
/// <returns>Whether the key should be set.</returns>
bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;

/// <summary>
/// Partion Key Adapter that skips handling the key.
/// </summary>
/// <typeparam name="TKey">The type of Kafka Message Key.</typeparam>
public class NullPartitionKeyAdapter<TKey> : IPartitionKeyAdapter<TKey>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue)
{
attributeValue = null;
return false;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue)
{
keyValue = default;
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;

/// <summary>
/// Partion Key Adapter that skips handling the key.
/// </summary>
public class StringPartitionKeyAdapter : IPartitionKeyAdapter<string?>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue)
{
attributeValue = keyValue;
return true;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue)
{
keyValue = attributeValue;
return true;
}
}
Loading