diff --git a/src/KafkaFlow.Abstractions/ActivityOperationType.cs b/src/KafkaFlow.Abstractions/ActivityOperationType.cs new file mode 100644 index 000000000..ed260fa58 --- /dev/null +++ b/src/KafkaFlow.Abstractions/ActivityOperationType.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow +{ + /// Activity operation names enum values + public enum ActivityOperationType + { + /// PUBLISH + Publish, + + /// PROCESS + Process, + } +} diff --git a/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs new file mode 100644 index 000000000..52bdd4c9f --- /dev/null +++ b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs @@ -0,0 +1,28 @@ +using System.Diagnostics; +using System.Reflection; + +namespace KafkaFlow +{ + /// + /// ActivitySource properties + /// + public static class ActivitySourceAccessor + { + internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); + + private const string _activityString = "otel_activity"; + + private static readonly ActivitySource _activitySource = new("KafkaFlow.OpenTelemetry", Version); + + /// + /// Gets the name of the OpenTelemetry Activity that is used as a key + /// in MessageContext.Items dictionary + /// + public static string ActivityString => _activityString; + + /// + /// Gets the ActivitySource name that is used in KafkaFlow + /// + public static ActivitySource ActivitySource => _activitySource; + } +} diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj index 50e3dd839..23b71c74b 100644 --- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj +++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj @@ -7,4 +7,8 @@ Contains all KafkaFlow extendable interfaces + + + + diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs index 4a73f4f4d..495d1c995 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs @@ -1,5 +1,6 @@ namespace KafkaFlow.IntegrationTests.Core.Middlewares { + using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Handlers; @@ -7,6 +8,8 @@ internal class GzipMiddleware : IMessageMiddleware { public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { + using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal); + MessageStorage.Add((byte[]) context.Message.Value); await next(context); } diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 855083632..6fa680a52 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -56,7 +56,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.IsNull(producerSpan.ParentId); @@ -64,6 +64,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); } + [TestMethod] + public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.IsNull(producerSpan.ParentId); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId); + } + [TestMethod] public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer() { @@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); @@ -181,9 +210,9 @@ await Policy .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); } - private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync() { - Activity producerSpan = null, consumerSpan = null; + Activity producerSpan = null, consumerSpan = null, internalSpan = null; await Policy .HandleResult(isAvailable => !isAvailable) @@ -192,11 +221,12 @@ await Policy { producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal); return Task.FromResult(producerSpan != null && consumerSpan != null); }); - return (producerSpan, consumerSpan); + return (producerSpan, consumerSpan, internalSpan); } } } diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs similarity index 71% rename from src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs rename to src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs index 9a965dee5..fde551cbf 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs @@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry using System; using System.Collections.Generic; using System.Diagnostics; - using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; - internal static class ActivitySourceAccessor + internal static class ActivityAccessor { - internal const string ActivityString = "otel_activity"; internal const string ExceptionEventKey = "exception"; internal const string MessagingSystemId = "kafka"; internal const string AttributeMessagingOperation = "messaging.operation"; internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; - internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName(); - internal static readonly string ActivitySourceName = AssemblyName.Name; - internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); - internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version); public static void SetGenericTags(Activity activity) { diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 10fa794e9..f83317823 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context) { try { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString; - // Extract the PropagationContext of the upstream parent from the message headers. var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties); Baggage.Current = parentContext.Baggage; - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity; + + if (parentContext.ActivityContext.IsValid()) + { + activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags); + } foreach (var item in Baggage.Current) { activity?.AddBaggage(item.Key, item.Value); } - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); - - ActivitySourceAccessor.SetGenericTags(activity); + ActivityAccessor.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { @@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -67,11 +65,11 @@ public static Task OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex); activity?.AddEvent(exceptionEvent); - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity) { var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString); + activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower()); activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic); activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition); } } diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 3630157c1..9610a5fe7 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -10,7 +10,6 @@ internal static class OpenTelemetryProducerEventsHandler { - private const string PublishString = "publish"; private const string AttributeMessagingDestinationName = "messaging.destination.name"; private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; @@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context) { try { - var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString; - - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity; // Depending on Sampling (and whether a listener is registered or not), the // activity above may not be created. @@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context) if (activity != null) { - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); - contextToInject = activity.Context; } else if (Activity.Current != null) @@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context) // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties); - ActivitySourceAccessor.SetGenericTags(activity); + ActivityAccessor.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { @@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -77,11 +69,11 @@ public static Task OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex); activity?.AddEvent(exceptionEvent); - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex private static void SetProducerTags(IMessageContext context, Activity activity) { - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString); + activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower()); activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic); activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); } } } diff --git a/src/KafkaFlow/ActivityFactory.cs b/src/KafkaFlow/ActivityFactory.cs new file mode 100644 index 000000000..8b006e0bc --- /dev/null +++ b/src/KafkaFlow/ActivityFactory.cs @@ -0,0 +1,17 @@ +using System.Diagnostics; + +namespace KafkaFlow +{ + internal static class ActivityFactory + { + public static Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind) + { + var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower(); + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind); + } + } +} diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index cce7308ea..1add26ac1 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Diagnostics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -106,6 +107,8 @@ public void OnTaskCompleted(Action handler) private async Task ProcessMessageAsync(ConsumeResult message, CancellationToken cancellationToken) { + var activity = ActivityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer); + try { var context = new MessageContext( @@ -119,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca this.Id), null); + context.Items.Add(ActivitySourceAccessor.ActivityString, activity); + try { var scope = this.dependencyResolver.CreateScope(); diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index 1aa6b346b..09c7843cd 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -8,6 +8,7 @@ + diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 85fd3096e..d378630cb 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Producers { using System; + using System.Diagnostics; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; @@ -40,8 +41,12 @@ public async Task> ProduceAsync( using var scope = this.dependencyResolver.CreateScope(); + var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer); + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + this.AddActivityToMessageContext(messageContext, activity); + await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); try @@ -99,8 +104,12 @@ public void Produce( { var scope = this.dependencyResolver.CreateScope(); + var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer); + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + this.AddActivityToMessageContext(messageContext, activity); + this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); this.middlewareExecutor @@ -354,5 +363,13 @@ private MessageContext CreateMessageContext( null, new ProducerContext(topic)); } + + private void AddActivityToMessageContext(MessageContext messageContext, Activity activity) + { + if(activity != null) + { + messageContext.Items.Add(ActivitySourceAccessor.ActivityString, activity); + } + } } }