diff --git a/src/KafkaFlow.Abstractions/ActivityOperationType.cs b/src/KafkaFlow.Abstractions/ActivityOperationType.cs
new file mode 100644
index 000000000..ed260fa58
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/ActivityOperationType.cs
@@ -0,0 +1,12 @@
+namespace KafkaFlow
+{
+ /// Activity operation names enum values
+ public enum ActivityOperationType
+ {
+ /// PUBLISH
+ Publish,
+
+ /// PROCESS
+ Process,
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
new file mode 100644
index 000000000..52bdd4c9f
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
@@ -0,0 +1,28 @@
+using System.Diagnostics;
+using System.Reflection;
+
+namespace KafkaFlow
+{
+ ///
+ /// ActivitySource properties
+ ///
+ public static class ActivitySourceAccessor
+ {
+ internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
+
+ private const string _activityString = "otel_activity";
+
+ private static readonly ActivitySource _activitySource = new("KafkaFlow.OpenTelemetry", Version);
+
+ ///
+ /// Gets the name of the OpenTelemetry Activity that is used as a key
+ /// in MessageContext.Items dictionary
+ ///
+ public static string ActivityString => _activityString;
+
+ ///
+ /// Gets the ActivitySource name that is used in KafkaFlow
+ ///
+ public static ActivitySource ActivitySource => _activitySource;
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
index 50e3dd839..23b71c74b 100644
--- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
+++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
@@ -7,4 +7,8 @@
Contains all KafkaFlow extendable interfaces
+
+
+
+
diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
index 4a73f4f4d..495d1c995 100644
--- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
+++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
@@ -1,5 +1,6 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
+ using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;
@@ -7,6 +8,8 @@ internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
+ using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal);
+
MessageStorage.Add((byte[]) context.Message.Value);
await next(context);
}
diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
index 855083632..6fa680a52 100644
--- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
+++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
@@ -56,7 +56,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);
// Assert
- var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
@@ -64,6 +64,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}
+ [TestMethod]
+ public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
+ {
+ // Arrange
+ var provider = await this.GetServiceProvider();
+ MessageStorage.Clear();
+
+ using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddSource("KafkaFlow.OpenTelemetry")
+ .AddInMemoryExporter(this.exportedItems)
+ .Build();
+
+ var producer = provider.GetRequiredService>();
+ var message = this.fixture.Create();
+
+ // Act
+ await producer.ProduceAsync(null, message);
+
+ // Assert
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
+
+ Assert.IsNotNull(this.exportedItems);
+ Assert.IsNull(producerSpan.ParentId);
+ Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
+ Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
+ Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
+ Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
+ }
+
[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
@@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);
// Assert
- var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
@@ -181,9 +210,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}
- private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
+ private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
- Activity producerSpan = null, consumerSpan = null;
+ Activity producerSpan = null, consumerSpan = null, internalSpan = null;
await Policy
.HandleResult(isAvailable => !isAvailable)
@@ -192,11 +221,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
+ internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);
return Task.FromResult(producerSpan != null && consumerSpan != null);
});
- return (producerSpan, consumerSpan);
+ return (producerSpan, consumerSpan, internalSpan);
}
}
}
diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs
similarity index 71%
rename from src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
rename to src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs
index 9a965dee5..fde551cbf 100644
--- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
+++ b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs
@@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry
using System;
using System.Collections.Generic;
using System.Diagnostics;
- using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;
- internal static class ActivitySourceAccessor
+ internal static class ActivityAccessor
{
- internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
- internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
- internal static readonly string ActivitySourceName = AssemblyName.Name;
- internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
- internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);
public static void SetGenericTags(Activity activity)
{
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
index 10fa794e9..f83317823 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
@@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context)
{
try
{
- var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString;
-
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;
- // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
- // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
- // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
- var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
+ var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;
+
+ if (parentContext.ActivityContext.IsValid())
+ {
+ activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags);
+ }
foreach (var item in Baggage.Current)
{
activity?.AddBaggage(item.Key, item.Value);
}
- context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);
-
- ActivitySourceAccessor.SetGenericTags(activity);
+ ActivityAccessor.SetGenericTags(activity);
if (activity != null && activity.IsAllDataRequested)
{
@@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
- activity?.Dispose();
+ activity?.Stop();
}
return Task.CompletedTask;
@@ -67,11 +65,11 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
- var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
+ var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);
activity?.AddEvent(exceptionEvent);
- activity?.Dispose();
+ activity?.Stop();
}
return Task.CompletedTask;
@@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
+ activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower());
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey);
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
+ activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
+ activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
}
}
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
index 3630157c1..9610a5fe7 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
@@ -10,7 +10,6 @@
internal static class OpenTelemetryProducerEventsHandler
{
- private const string PublishString = "publish";
private const string AttributeMessagingDestinationName = "messaging.destination.name";
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
@@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context)
{
try
{
- var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString;
-
- // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
- // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
- // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
- var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer);
+ var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;
// Depending on Sampling (and whether a listener is registered or not), the
// activity above may not be created.
@@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context)
if (activity != null)
{
- context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);
-
contextToInject = activity.Context;
}
else if (Activity.Current != null)
@@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);
- ActivitySourceAccessor.SetGenericTags(activity);
+ ActivityAccessor.SetGenericTags(activity);
if (activity != null && activity.IsAllDataRequested)
{
@@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
- activity?.Dispose();
+ activity?.Stop();
}
return Task.CompletedTask;
@@ -77,11 +69,11 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
- var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
+ var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);
activity?.AddEvent(exceptionEvent);
- activity?.Dispose();
+ activity?.Stop();
}
return Task.CompletedTask;
@@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex
private static void SetProducerTags(IMessageContext context, Activity activity)
{
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString);
+ activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower());
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
- activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
+ activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
+ activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
}
}
}
diff --git a/src/KafkaFlow/ActivityFactory.cs b/src/KafkaFlow/ActivityFactory.cs
new file mode 100644
index 000000000..8b006e0bc
--- /dev/null
+++ b/src/KafkaFlow/ActivityFactory.cs
@@ -0,0 +1,17 @@
+using System.Diagnostics;
+
+namespace KafkaFlow
+{
+ internal static class ActivityFactory
+ {
+ public static Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
+ {
+ var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower();
+
+ // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
+ // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
+ return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind);
+ }
+ }
+}
diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs
index cce7308ea..1add26ac1 100644
--- a/src/KafkaFlow/Consumers/ConsumerWorker.cs
+++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
+ using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -106,6 +107,8 @@ public void OnTaskCompleted(Action handler)
private async Task ProcessMessageAsync(ConsumeResult message, CancellationToken cancellationToken)
{
+ var activity = ActivityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer);
+
try
{
var context = new MessageContext(
@@ -119,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca
this.Id),
null);
+ context.Items.Add(ActivitySourceAccessor.ActivityString, activity);
+
try
{
var scope = this.dependencyResolver.CreateScope();
diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj
index 1aa6b346b..09c7843cd 100644
--- a/src/KafkaFlow/KafkaFlow.csproj
+++ b/src/KafkaFlow/KafkaFlow.csproj
@@ -8,6 +8,7 @@
+
diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs
index 85fd3096e..d378630cb 100644
--- a/src/KafkaFlow/Producers/MessageProducer.cs
+++ b/src/KafkaFlow/Producers/MessageProducer.cs
@@ -1,6 +1,7 @@
namespace KafkaFlow.Producers
{
using System;
+ using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
@@ -40,8 +41,12 @@ public async Task> ProduceAsync(
using var scope = this.dependencyResolver.CreateScope();
+ var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer);
+
var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);
+ this.AddActivityToMessageContext(messageContext, activity);
+
await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));
try
@@ -99,8 +104,12 @@ public void Produce(
{
var scope = this.dependencyResolver.CreateScope();
+ var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer);
+
var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers);
+ this.AddActivityToMessageContext(messageContext, activity);
+
this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));
this.middlewareExecutor
@@ -354,5 +363,13 @@ private MessageContext CreateMessageContext(
null,
new ProducerContext(topic));
}
+
+ private void AddActivityToMessageContext(MessageContext messageContext, Activity activity)
+ {
+ if(activity != null)
+ {
+ messageContext.Items.Add(ActivitySourceAccessor.ActivityString, activity);
+ }
+ }
}
}