Skip to content

Commit

Permalink
modified log messages to write plugin name because ESDB does not use …
Browse files Browse the repository at this point in the history
…source context

improved plugin version handling
improved diagnostics listener
  • Loading branch information
RagingKore committed Jun 11, 2024
1 parent 2be3496 commit 927f1bb
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 104 deletions.
96 changes: 43 additions & 53 deletions src/EventStore.Plugins/Diagnostics/DiagnosticsListeners.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace EventStore.Plugins.Diagnostics;
Expand All @@ -13,7 +14,7 @@ public MultiSourceDiagnosticsListener(string[] sources, int capacity = 10, OnSou
foreach (var source in sources)
Listeners.TryAdd(source, new(source, capacity, data => onEvent?.Invoke(source, data)));
}

Dictionary<string, SingleSourceDiagnosticsListener> Listeners { get; } = new();

public IEnumerable<object> CollectedEvents(string source) =>
Expand All @@ -26,7 +27,7 @@ public void ClearCollectedEvents(string source) {
if (Listeners.TryGetValue(source, out var listener))
listener.ClearCollectedEvents();
}

public void ClearAllCollectedEvents() {
foreach (var listener in Listeners.Values)
listener.ClearCollectedEvents();
Expand All @@ -35,17 +36,17 @@ public void ClearAllCollectedEvents() {
public void Dispose() {
foreach (var listener in Listeners.Values)
listener.Dispose();

Listeners.Clear();
}

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, params string[] sources) =>
new(sources, 10, onEvent);

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, int capacity, params string[] sources) =>
new(sources, capacity, onEvent);
public static MultiSourceDiagnosticsListener Start(params string[] sources) =>

public static MultiSourceDiagnosticsListener Start(params string[] sources) =>
new(sources);

public static MultiSourceDiagnosticsListener Start(int capacity, params string[] sources) =>
Expand All @@ -62,62 +63,62 @@ public SingleSourceDiagnosticsListener(string source, int capacity = 10, Action<
onEvent?.Invoke(data.Value);
});
}

GenericDiagnosticsListener Listener { get; }

List<object> ValidEvents => Listener.CollectedEvents
.Where(x => x.Value is not null)
.Select(x => x.Value!)
.ToList();

public string Source => Listener.Source;
public int Capacity => Listener.Capacity;

public IReadOnlyList<object> CollectedEvents => ValidEvents;

public bool HasCollectedEvents => Listener.HasCollectedEvents;

public void ClearCollectedEvents() => Listener.ClearCollectedEvents();

public IEnumerator<object> GetEnumerator() => ValidEvents.GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

public void Dispose() => Listener.Dispose();
public static SingleSourceDiagnosticsListener Start(string source, int capacity) =>

public static SingleSourceDiagnosticsListener Start(string source, int capacity) =>
new(source, capacity);
public static SingleSourceDiagnosticsListener Start(string source) =>

public static SingleSourceDiagnosticsListener Start(string source) =>
new(source);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, string source) =>
new(source, 10, onEvent);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, int capacity, string source) =>
new(source, capacity, onEvent);
}

/// <summary>
/// Generic listener that also collects the last N events and can be used to subscribe to a single source.
/// </summary>
class GenericDiagnosticsListener : IDisposable, IEnumerable<KeyValuePair<string, object?>> {
class GenericDiagnosticsListener : IDisposable {
static readonly object Locker = new();

public GenericDiagnosticsListener(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) {
if (string.IsNullOrWhiteSpace(source))
throw new ArgumentException("Source cannot be null or whitespace.", nameof(source));

ArgumentOutOfRangeException.ThrowIfNegative(capacity);

Source = source;
Capacity = capacity;
Queue = new(capacity);

var observer = new GenericObserver<KeyValuePair<string, object?>>(data => {
if (capacity > 0)
Queue.Enqueue(data);

try {
onEvent?.Invoke(data);
}
Expand All @@ -140,63 +141,52 @@ void OnNewListener(DiagnosticListener listener) {
}
}
}

FixedSizedQueue<KeyValuePair<string, object?>> Queue { get; }


FixedSizedConcurrentQueue<KeyValuePair<string, object?>> Queue { get; }
IDisposable? ListenerSubscription { get; }
IDisposable? NetworkSubscription { get; set; }

public string Source { get; }
public int Capacity { get; }

public IReadOnlyList<KeyValuePair<string, object?>> CollectedEvents => Queue.ToList();
public bool HasCollectedEvents => Queue.Count != 0;
public IReadOnlyList<KeyValuePair<string, object?>> CollectedEvents => Queue.ToArray();

public bool HasCollectedEvents => Queue.TryPeek(out _);

public void ClearCollectedEvents() => Queue.Clear();

public void Dispose() {
NetworkSubscription?.Dispose();
ListenerSubscription?.Dispose();
ClearCollectedEvents();
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator() => Queue.ToList().GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

class GenericObserver<T>(Action<T>? onNext, Action? onCompleted = null) : IObserver<T> {
public void OnNext(T value) => _onNext(value);
public void OnCompleted() => _onCompleted();

public void OnError(Exception error) { }

readonly Action<T> _onNext = onNext ?? (_ => { });
readonly Action _onCompleted = onCompleted ?? (() => { });
}
class FixedSizedQueue<T>(int maxSize) : Queue<T> {

class FixedSizedConcurrentQueue<T>(int maxSize) : ConcurrentQueue<T> {
readonly object _locker = new();

public new void Enqueue(T item) {
lock (_locker) {
base.Enqueue(item);
if (Count > maxSize)
Dequeue(); // Throw away
}
}

public new void Clear() {
lock (_locker) {
base.Clear();
TryDequeue(out _); // Throw away
}
}
}

public static GenericDiagnosticsListener Start(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, capacity, onEvent);

public static GenericDiagnosticsListener Start(string source, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, 10, onEvent);
}

public static GenericDiagnosticsListener Start(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, capacity, onEvent);

public static GenericDiagnosticsListener Start(string source, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, 10, onEvent);
}
69 changes: 40 additions & 29 deletions src/EventStore.Plugins/Plugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ protected Plugin(
.Replace("Subsystems", "", OrdinalIgnoreCase)
.Replace("Subsystem", "", OrdinalIgnoreCase);

Version = version
?? pluginType.Assembly.GetName().Version?.ToString()
?? "1.0.0.0-preview";
Version = GetPluginVersion(version, pluginType);

LicensePublicKey = licensePublicKey;

Expand All @@ -47,6 +45,19 @@ protected Plugin(

IsEnabledResult = (true, "");
Configuration = null!;

return;

static string GetPluginVersion(string? pluginVersion, Type pluginType) {
const string emptyVersion = "0.0.0.0";
const string defaultVersion = "1.0.0.0-preview";

var version = pluginVersion
?? pluginType.Assembly.GetName().Version?.ToString()
?? emptyVersion;

return version != emptyVersion ? version : defaultVersion;
}
}

protected Plugin(PluginOptions options) : this(
Expand Down Expand Up @@ -75,7 +86,7 @@ protected Plugin(PluginOptions options) : this(

/// <inheritdoc />
public KeyValuePair<string, object?>[] DiagnosticsTags { get; }

/// <inheritdoc />
public bool Enabled => IsEnabledResult.Enabled;

Expand All @@ -89,67 +100,67 @@ public virtual void ConfigureApplication(IApplicationBuilder app, IConfiguration
/// </summary>
/// <param name="configuration">The configuration of the application.<br/></param>
public virtual (bool Enabled, string EnableInstructions) IsEnabled(IConfiguration configuration) => IsEnabledResult;

public void Disable(string reason) => IsEnabledResult = (false, reason);

void IPlugableComponent.ConfigureServices(IServiceCollection services, IConfiguration configuration) {
Configuration = configuration;
IsEnabledResult = IsEnabled(configuration);

if (Enabled)
ConfigureServices(services, configuration);

PublishDiagnosticsData(new() { ["enabled"] = Enabled }, Partial);
}

void IPlugableComponent.ConfigureApplication(IApplicationBuilder app, IConfiguration configuration) {
var logger = app.ApplicationServices.GetRequiredService<ILoggerFactory>().CreateLogger(GetType());

// if the plugin is disabled, just get out
if (!Enabled) {
logger.LogInformation(
"{Version} plugin disabled. {EnableInstructions}",
Version, IsEnabledResult.EnableInstructions
"{PluginName} {Version} plugin disabled. {EnableInstructions}",
Name, Version, IsEnabledResult.EnableInstructions
);

return;
}

// if the plugin is enabled, but the license is invalid, throw an exception and effectivly disable the plugin
var license = app.ApplicationServices.GetService<License>();
if (Enabled && LicensePublicKey is not null && (license is null || !license.IsValid(LicensePublicKey))) {
var ex = new PluginLicenseException(Name);

IsEnabledResult = (false, ex.Message);

PublishDiagnosticsData(new() { ["enabled"] = Enabled }, Partial);

logger.LogInformation(
"{Version} plugin disabled. {EnableInstructions}",
Version, IsEnabledResult.EnableInstructions
"{PluginName} {Version} plugin disabled. {EnableInstructions}",
Name, Version, IsEnabledResult.EnableInstructions
);

throw ex;
}

// there is still a chance to disable the plugin when configuring the application
// this is useful when the plugin is enabled, but some conditions that can only be checked here are not met
ConfigureApplication(app, Configuration);

// at this point we know if the plugin is enabled and configured or not
if (Enabled)
logger.LogInformation("{Version} plugin enabled.", Version);
logger.LogInformation("{PluginName} {Version} plugin enabled.", Name, Version);
else {
logger.LogInformation(
"{Version} plugin disabled. {EnableInstructions}",
Version, IsEnabledResult.EnableInstructions
"{PluginName} {Version} plugin disabled. {EnableInstructions}",
Name, Version, IsEnabledResult.EnableInstructions
);
}

// finally publish diagnostics data
PublishDiagnosticsData(new() { ["enabled"] = Enabled }, Partial);
}

/// <summary>
/// Publishes diagnostics data as a snapshot.<br/>
/// Uses the <see cref="PluginDiagnosticsData"/> container.<br/>
Expand All @@ -164,7 +175,7 @@ protected internal void PublishDiagnosticsData(Dictionary<string, object?> event
Data = eventData,
CollectionMode = mode
};

DiagnosticListener.Write(nameof(PluginDiagnosticsData), value);
}

Expand All @@ -177,9 +188,9 @@ protected internal void PublishDiagnosticsData(Dictionary<string, object?> event
/// <param name="eventData">The data to publish.</param>
/// <param name="mode">The mode of data collection for a plugin event.</param>
protected internal void PublishDiagnosticsData(string eventName, Dictionary<string, object?> eventData, PluginDiagnosticsDataCollectionMode mode = Event) {
if (eventName == nameof(PluginDiagnosticsData))
if (eventName == nameof(PluginDiagnosticsData))
throw new ArgumentException("Event name cannot be PluginDiagnosticsData", nameof(eventName));

DiagnosticListener.Write(
eventName,
new PluginDiagnosticsData{
Expand All @@ -190,7 +201,7 @@ protected internal void PublishDiagnosticsData(string eventName, Dictionary<stri
}
);
}

/// <summary>
/// Publishes diagnostics events. <br/>
/// </summary>
Expand All @@ -201,4 +212,4 @@ protected internal void PublishDiagnosticsEvent<T>(T pluginEvent) =>

/// <inheritdoc />
public void Dispose() => DiagnosticListener.Dispose();
}
}
Loading

0 comments on commit 927f1bb

Please sign in to comment.