Skip to content

Commit

Permalink
Complete the implementation of the EventTracingConnectionLifetime cla…
Browse files Browse the repository at this point in the history
…ss and supporting classes.
  • Loading branch information
SeanFarrow committed Mar 30, 2024
1 parent 7fdc788 commit 9abe946
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 9 deletions.
25 changes: 25 additions & 0 deletions src/Marten/Internal/OpenTelemetry/MartenTracing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.Collections.Generic;
using System.Diagnostics;

namespace Marten.Internal.OpenTelemetry;

internal static class MartenTracing
{
// See https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/ for more information
public static string MartenTenantId = "MartenTentantId";
public static string MartenCorelationId = "MartenCorelationId";

internal static ActivitySource ActivitySource { get; } = new(
"Marten",
typeof(MartenTracing).Assembly.GetName().Version!.ToString());

public static Activity? StartConnectionActivity(Activity? parentActivity =null, IEnumerable<KeyValuePair<string, object?>>? tags =null)

Check warning on line 16 in src/Marten/Internal/OpenTelemetry/MartenTracing.cs

View workflow job for this annotation

GitHub Actions / build

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.

Check warning on line 16 in src/Marten/Internal/OpenTelemetry/MartenTracing.cs

View workflow job for this annotation

GitHub Actions / build

The annotation for nullable reference types should only be used in code within a '#nullable' annotations context.
{
return StartActivity("connection", parentActivity, tags);
}

public static Activity StartActivity(string spanName, Activity? parentActivity =null, IEnumerable<KeyValuePair<string, object?>>? tags =null, ActivityKind activityKind =ActivityKind.Internal)
{
return ActivitySource.CreateActivity(spanName, activityKind, parentActivity?.ParentId, tags);
}
}
211 changes: 202 additions & 9 deletions src/Marten/Internal/Sessions/EventTracingConnectionLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Marten.Services;
using Npgsql;

namespace Marten.Internal.Sessions;

internal class EventTracingConnectionLifetime : IConnectionLifetime
internal class EventTracingConnectionLifetime :
IConnectionLifetime
{
private const string MartenCommandExecutionStarted = "MartenCommandExecutionStarted";
private const string MartenCommandExecutionFailed = "MartenCommandExecutionFailed";
private const string MartenBatchExecutionStarted = "MartenBatchExecutionStarted";
private const string MartenBatchExecutionFailed = "MartenBatchExecutionFailed";
private const string MartenBatchPagesExecutionStarted = "MartenBatchPagesExecutionStarted";
private const string MartenBatchPagesExecutionFailed = "MartenBatchPagesExecutionFailed";

private readonly IConnectionLifetime _innerConnectionLifetime;
private readonly OpenTelemetryOptions _openTelemetryOptions;
private readonly Activity _databaseActivity;
Expand Down Expand Up @@ -53,41 +62,225 @@ public void Dispose()
public int CommandTimeout { get; }
public int Execute(NpgsqlCommand cmd)
{
return _innerConnectionLifetime.Execute(cmd);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenCommandExecutionStarted));
}

try
{
return _innerConnectionLifetime.Execute(cmd);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenCommandExecutionFailed, DateTimeOffset.UtcNow, new ActivityTagsCollection(new KeyValuePair<string, object>[] {new KeyValuePair<string, object>("ExceptionType", e.GetType()) })));
}

throw;
}
}

public Task<int> ExecuteAsync(NpgsqlCommand command, CancellationToken token = new CancellationToken())
{
return _innerConnectionLifetime.ExecuteAsync(command, token);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent("Database command execution started"));
}

try
{
return _innerConnectionLifetime.ExecuteAsync(command, token);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenCommandExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionType", e.GetType())
})));
}

throw;
}
}

public DbDataReader ExecuteReader(NpgsqlCommand command)
{
return _innerConnectionLifetime.ExecuteReader(command);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent("Database command execution started"));
}

try
{
return _innerConnectionLifetime.ExecuteReader(command);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenCommandExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionType", e.GetType())
})));
}

throw;
}
}

public Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command, CancellationToken token = default)
{
return _innerConnectionLifetime.ExecuteReaderAsync(command, token);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent("Database command execution started"));
}

try
{
return _innerConnectionLifetime.ExecuteReaderAsync(command, token);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenCommandExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionType", e.GetType())
})));
}

throw;
}
}

public DbDataReader ExecuteReader(NpgsqlBatch batch)
{
return _innerConnectionLifetime.ExecuteReader(batch);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchExecutionStarted));
}

try
{
return _innerConnectionLifetime.ExecuteReader(batch);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchExecutionFailed, DateTimeOffset.UtcNow, new ActivityTagsCollection(new [] { new KeyValuePair<string, object>("ExceptionType", e.GetType())})));
}

throw;
}
}

public Task<DbDataReader> ExecuteReaderAsync(NpgsqlBatch batch, CancellationToken token = default)
{
return _innerConnectionLifetime.ExecuteReaderAsync(batch, token);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchExecutionStarted));
}

try
{
return _innerConnectionLifetime.ExecuteReaderAsync(batch, token);
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchExecutionFailed, DateTimeOffset.UtcNow, new ActivityTagsCollection(new[] { new KeyValuePair<string, object>("ExceptionType", e.GetType()) })));
}

throw;
}
}

public void ExecuteBatchPages(IReadOnlyList<OperationPage> pages, List<Exception> exceptions)
{
_innerConnectionLifetime.ExecuteBatchPages(pages, exceptions);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionStarted));
}

try
{
_innerConnectionLifetime.ExecuteBatchPages(pages, exceptions);
}
catch (AggregateException e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
var innerExceptionTypes = e.InnerExceptions.Select(t => t.GetType());
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionTypes", innerExceptionTypes)
})));
}

throw;
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionType", e.GetType())
})));
}

throw;
}
}

public Task ExecuteBatchPagesAsync(IReadOnlyList<OperationPage> pages, List<Exception> exceptions, CancellationToken token)
{
return _innerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token);
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionStarted));
}

try
{
return _innerConnectionLifetime.ExecuteBatchPagesAsync(pages, exceptions, token);
}
catch (AggregateException e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
var innerExceptionTypes = e.InnerExceptions.Select(t => t.GetType());
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionTypes", innerExceptionTypes)
})));
}

throw;
}
catch (Exception e)
{
if (_openTelemetryOptions.TrackConnectionEvents)
{
_databaseActivity.AddEvent(new ActivityEvent(MartenBatchPagesExecutionFailed, DateTimeOffset.UtcNow,
new ActivityTagsCollection(new[]
{
new KeyValuePair<string, object>("ExceptionType", e.GetType())
})));
}

throw;
}
}
}
3 changes: 3 additions & 0 deletions src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,8 @@
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<Folder Include="Internal\OpenTelemetry\" />
</ItemGroup>
<Import Project="../../Analysis.Build.props" />
</Project>
21 changes: 21 additions & 0 deletions src/Marten/Services/SessionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Marten.Exceptions;
using Marten.Internal.OpenTelemetry;
using Marten.Internal.Sessions;
using Marten.Storage;
using Npgsql;
using static Marten.Internal.OpenTelemetry.MartenTracing;
using IsolationLevel = System.Data.IsolationLevel;

namespace Marten.Services;
Expand Down Expand Up @@ -98,6 +101,23 @@ internal IConnectionLifetime Initialize(DocumentStore store, CommandRunnerMode m
throw new DefaultTenantUsageDisabledException();
}

var innerConnectionLifetime =GetInnerConnectionLifetime(store, mode);

var corelationId = Activity.Current is not null ? Activity.Current.RootId : Guid.NewGuid().ToString();
var tags = new Dictionary<string, object>
{
{ MartenCorelationId, corelationId },
{ MartenTenantId, Tenant.TenantId }
};

return OpenTelemetryOptions.TrackConnectionEvents
? new EventTracingConnectionLifetime(innerConnectionLifetime, OpenTelemetryOptions,
StartConnectionActivity(Activity.Current, tags))
: innerConnectionLifetime;
}

private IConnectionLifetime GetInnerConnectionLifetime(DocumentStore store, CommandRunnerMode mode)
{
if (!OwnsTransactionLifecycle && mode != CommandRunnerMode.ReadOnly)
{
Mode = CommandRunnerMode.External;
Expand All @@ -118,6 +138,7 @@ internal IConnectionLifetime Initialize(DocumentStore store, CommandRunnerMode m
{
return new TransactionalConnection(this){CommandTimeout = Timeout ?? store.Options.CommandTimeout};
}

{
return new AutoClosingLifetime(this, store.Options);
}
Expand Down

0 comments on commit 9abe946

Please sign in to comment.