From 8172b54c65a765923e1dcb4f3d82d4ea7411c8ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Thu, 9 Nov 2023 10:39:45 +0000 Subject: [PATCH] fix: opentelemetry broken traces Activity.Current is a static variable that uses AsyncLocal internally, which means that it flows into async calls, but not back out into the caller (message consumer and producer). As so, the activity.current is null after the produce and consume of the message, which means that all the spans created after it will be generate a new trace because the context is not being propagated. --- .../ActivityOperationType.cs | 12 ++++++ .../ActivitySourceAccessor.cs | 28 +++++++++++++ .../KafkaFlow.Abstractions.csproj | 4 ++ .../Core/Middlewares/GzipMiddleware.cs | 3 ++ .../OpenTelemetryTests.cs | 40 ++++++++++++++++--- ...ySourceAccessor.cs => ActivityAccessor.cs} | 8 +--- .../OpenTelemetryConsumerEventsHandler.cs | 28 ++++++------- .../OpenTelemetryProducerEventsHandler.cs | 24 ++++------- src/KafkaFlow/ActivityFactory.cs | 17 ++++++++ src/KafkaFlow/Consumers/ConsumerWorker.cs | 5 +++ src/KafkaFlow/KafkaFlow.csproj | 1 + src/KafkaFlow/Producers/MessageProducer.cs | 17 ++++++++ 12 files changed, 144 insertions(+), 43 deletions(-) create mode 100644 src/KafkaFlow.Abstractions/ActivityOperationType.cs create mode 100644 src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs rename src/KafkaFlow.OpenTelemetry/{ActivitySourceAccessor.cs => ActivityAccessor.cs} (71%) create mode 100644 src/KafkaFlow/ActivityFactory.cs diff --git a/src/KafkaFlow.Abstractions/ActivityOperationType.cs b/src/KafkaFlow.Abstractions/ActivityOperationType.cs new file mode 100644 index 000000000..ed260fa58 --- /dev/null +++ b/src/KafkaFlow.Abstractions/ActivityOperationType.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow +{ + /// Activity operation names enum values + public enum ActivityOperationType + { + /// PUBLISH + Publish, + + /// PROCESS + Process, + } +} diff --git a/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs new file mode 100644 index 000000000..52bdd4c9f --- /dev/null +++ b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs @@ -0,0 +1,28 @@ +using System.Diagnostics; +using System.Reflection; + +namespace KafkaFlow +{ + /// + /// ActivitySource properties + /// + public static class ActivitySourceAccessor + { + internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); + + private const string _activityString = "otel_activity"; + + private static readonly ActivitySource _activitySource = new("KafkaFlow.OpenTelemetry", Version); + + /// + /// Gets the name of the OpenTelemetry Activity that is used as a key + /// in MessageContext.Items dictionary + /// + public static string ActivityString => _activityString; + + /// + /// Gets the ActivitySource name that is used in KafkaFlow + /// + public static ActivitySource ActivitySource => _activitySource; + } +} diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj index 50e3dd839..23b71c74b 100644 --- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj +++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj @@ -7,4 +7,8 @@ Contains all KafkaFlow extendable interfaces + + + + diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs index 4a73f4f4d..495d1c995 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs @@ -1,5 +1,6 @@ namespace KafkaFlow.IntegrationTests.Core.Middlewares { + using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Handlers; @@ -7,6 +8,8 @@ internal class GzipMiddleware : IMessageMiddleware { public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { + using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal); + MessageStorage.Add((byte[]) context.Message.Value); await next(context); } diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 855083632..6fa680a52 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -56,7 +56,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.IsNull(producerSpan.ParentId); @@ -64,6 +64,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); } + [TestMethod] + public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.IsNull(producerSpan.ParentId); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId); + } + [TestMethod] public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer() { @@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp await producer.ProduceAsync(null, message); // Assert - var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); @@ -181,9 +210,9 @@ await Policy .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); } - private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync() { - Activity producerSpan = null, consumerSpan = null; + Activity producerSpan = null, consumerSpan = null, internalSpan = null; await Policy .HandleResult(isAvailable => !isAvailable) @@ -192,11 +221,12 @@ await Policy { producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal); return Task.FromResult(producerSpan != null && consumerSpan != null); }); - return (producerSpan, consumerSpan); + return (producerSpan, consumerSpan, internalSpan); } } } diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs similarity index 71% rename from src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs rename to src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs index 9a965dee5..fde551cbf 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivityAccessor.cs @@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry using System; using System.Collections.Generic; using System.Diagnostics; - using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; - internal static class ActivitySourceAccessor + internal static class ActivityAccessor { - internal const string ActivityString = "otel_activity"; internal const string ExceptionEventKey = "exception"; internal const string MessagingSystemId = "kafka"; internal const string AttributeMessagingOperation = "messaging.operation"; internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; - internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName(); - internal static readonly string ActivitySourceName = AssemblyName.Name; - internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); - internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version); public static void SetGenericTags(Activity activity) { diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 10fa794e9..f83317823 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context) { try { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString; - // Extract the PropagationContext of the upstream parent from the message headers. var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties); Baggage.Current = parentContext.Baggage; - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity; + + if (parentContext.ActivityContext.IsValid()) + { + activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags); + } foreach (var item in Baggage.Current) { activity?.AddBaggage(item.Key, item.Value); } - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); - - ActivitySourceAccessor.SetGenericTags(activity); + ActivityAccessor.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { @@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -67,11 +65,11 @@ public static Task OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex); activity?.AddEvent(exceptionEvent); - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity) { var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString); + activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower()); activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic); activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition); } } diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 3630157c1..9610a5fe7 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -10,7 +10,6 @@ internal static class OpenTelemetryProducerEventsHandler { - private const string PublishString = "publish"; private const string AttributeMessagingDestinationName = "messaging.destination.name"; private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; @@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context) { try { - var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString; - - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity; // Depending on Sampling (and whether a listener is registered or not), the // activity above may not be created. @@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context) if (activity != null) { - context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); - contextToInject = activity.Context; } else if (Activity.Current != null) @@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context) // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties); - ActivitySourceAccessor.SetGenericTags(activity); + ActivityAccessor.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { @@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -77,11 +69,11 @@ public static Task OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex); activity?.AddEvent(exceptionEvent); - activity?.Dispose(); + activity?.Stop(); } return Task.CompletedTask; @@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex private static void SetProducerTags(IMessageContext context, Activity activity) { - activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString); + activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower()); activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic); activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); - activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); + activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); } } } diff --git a/src/KafkaFlow/ActivityFactory.cs b/src/KafkaFlow/ActivityFactory.cs new file mode 100644 index 000000000..8b006e0bc --- /dev/null +++ b/src/KafkaFlow/ActivityFactory.cs @@ -0,0 +1,17 @@ +using System.Diagnostics; + +namespace KafkaFlow +{ + internal static class ActivityFactory + { + public static Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind) + { + var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower(); + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind); + } + } +} diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index cce7308ea..1add26ac1 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Diagnostics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -106,6 +107,8 @@ public void OnTaskCompleted(Action handler) private async Task ProcessMessageAsync(ConsumeResult message, CancellationToken cancellationToken) { + var activity = ActivityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer); + try { var context = new MessageContext( @@ -119,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca this.Id), null); + context.Items.Add(ActivitySourceAccessor.ActivityString, activity); + try { var scope = this.dependencyResolver.CreateScope(); diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index 1aa6b346b..09c7843cd 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -8,6 +8,7 @@ + diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 85fd3096e..d378630cb 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Producers { using System; + using System.Diagnostics; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; @@ -40,8 +41,12 @@ public async Task> ProduceAsync( using var scope = this.dependencyResolver.CreateScope(); + var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer); + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + this.AddActivityToMessageContext(messageContext, activity); + await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); try @@ -99,8 +104,12 @@ public void Produce( { var scope = this.dependencyResolver.CreateScope(); + var activity = ActivityFactory.Start(topic, ActivityOperationType.Publish, ActivityKind.Producer); + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + this.AddActivityToMessageContext(messageContext, activity); + this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); this.middlewareExecutor @@ -354,5 +363,13 @@ private MessageContext CreateMessageContext( null, new ProducerContext(topic)); } + + private void AddActivityToMessageContext(MessageContext messageContext, Activity activity) + { + if(activity != null) + { + messageContext.Items.Add(ActivitySourceAccessor.ActivityString, activity); + } + } } }