Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: broken traces #467

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/ActivityOperationType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow
{
/// <summary>Activity operation names enum values</summary>
public enum ActivityOperationType
{
/// <summary>PUBLISH</summary>
Publish,

/// <summary>PROCESS</summary>
Process,
}
}
28 changes: 28 additions & 0 deletions src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Diagnostics;

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md)

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Field '_activityString' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Field '_activitySource' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
using System.Reflection;

Check warning on line 2 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md)

namespace KafkaFlow
{
/// <summary>
/// ActivitySource properties
/// </summary>
public static class ActivitySourceAccessor
{
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();

private const string _activityString = "otel_activity";

Check warning on line 13 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Field '_activityString' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md)

private static readonly ActivitySource _activitySource = new("KafkaFlow.OpenTelemetry", Version);

Check warning on line 15 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Field '_activitySource' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md)

/// <summary>
/// Gets the name of the OpenTelemetry Activity that is used as a key
/// in MessageContext.Items dictionary
/// </summary>
public static string ActivityString => _activityString;

/// <summary>
/// Gets the ActivitySource name that is used in KafkaFlow
/// </summary>
public static ActivitySource ActivitySource => _activitySource;
}
}
4 changes: 4 additions & 0 deletions src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@
<Description>Contains all KafkaFlow extendable interfaces</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;

internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal);

MessageStorage.Add((byte[]) context.Message.Value);
await next(context);
}
Expand Down
40 changes: 35 additions & 5 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,43 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
// Arrange
var provider = await this.GetServiceProvider();
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
Expand Down Expand Up @@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Expand Down Expand Up @@ -181,9 +210,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;
Activity producerSpan = null, consumerSpan = null, internalSpan = null;

await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
Expand All @@ -192,11 +221,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);

return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return (producerSpan, consumerSpan);
return (producerSpan, consumerSpan, internalSpan);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class ActivitySourceAccessor
internal static class ActivityAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
{
Expand Down
28 changes: 13 additions & 15 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString;

// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

if (parentContext.ActivityContext.IsValid())
{
activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags);
}

foreach (var item in Baggage.Current)
{
activity?.AddBaggage(item.Key, item.Value);
}

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -67,11 +65,11 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower());
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
}
}
Expand Down
24 changes: 8 additions & 16 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

internal static class OpenTelemetryProducerEventsHandler
{
private const string PublishString = "publish";
private const string AttributeMessagingDestinationName = "messaging.destination.name";
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
Expand All @@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

// Depending on Sampling (and whether a listener is registered or not), the
// activity above may not be created.
Expand All @@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context)

if (activity != null)
{
context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

contextToInject = activity.Context;
}
else if (Activity.Current != null)
Expand All @@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -77,11 +69,11 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex

private static void SetProducerTags(IMessageContext context, Activity activity)
{
activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower());
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow/ActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Diagnostics;

namespace KafkaFlow
{
internal static class ActivityFactory
{
public static Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
{
var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower();

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind);
}
}
}
5 changes: 5 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -106,6 +107,8 @@ public void OnTaskCompleted(Action handler)

private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, CancellationToken cancellationToken)
{
var activity = ActivityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer);

try
{
var context = new MessageContext(
Expand All @@ -119,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, Ca
this.Id),
null);

context.Items.Add(ActivitySourceAccessor.ActivityString, activity);

try
{
var scope = this.dependencyResolver.CreateScope();
Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow/KafkaFlow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.1.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

Expand Down
Loading
Loading