Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEV-63] Fix Diagnostics dispatching and collection #45

Merged
merged 7 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/EventStore.Plugins/ConfigParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace EventStore.Plugins;

// move this to ComercialHA
public static class ConfigParser {
/// <summary>
/// Deserializes a section of configuration from a given config file into the provided settings type
Expand Down
202 changes: 202 additions & 0 deletions src/EventStore.Plugins/Diagnostics/DiagnosticsListeners.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
using System.Collections;
using System.Diagnostics;

namespace EventStore.Plugins.Diagnostics;

public delegate void OnSourceEvent(string source, object data);

/// <summary>
/// Generic listener that can subscribe to multiple sources, ignores the default diagnostics model and always returns just the value and only if its not null.
/// </summary>
public class MultiSourceDiagnosticsListener : IDisposable {
public MultiSourceDiagnosticsListener(string[] sources, int capacity = 10, OnSourceEvent? onEvent = null) {
foreach (var source in sources)
Listeners.TryAdd(source, new(source, capacity, data => onEvent?.Invoke(source, data)));
}

Dictionary<string, SingleSourceDiagnosticsListener> Listeners { get; } = new();

public IEnumerable<object> CollectedEvents(string source) =>
Listeners.TryGetValue(source, out var listener) ? (IEnumerable<object>)listener : [];

public bool HasCollectedEvents(string source) =>
Listeners.TryGetValue(source, out var listener) && listener.HasCollectedEvents;

public void ClearCollectedEvents(string source) {
if (Listeners.TryGetValue(source, out var listener))
listener.ClearCollectedEvents();
}

public void ClearAllCollectedEvents() {
foreach (var listener in Listeners.Values)
listener.ClearCollectedEvents();
}

public void Dispose() {
foreach (var listener in Listeners.Values)
listener.Dispose();

Listeners.Clear();
}

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, params string[] sources) =>
new(sources, 10, onEvent);

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, int capacity, params string[] sources) =>
new(sources, capacity, onEvent);

public static MultiSourceDiagnosticsListener Start(params string[] sources) =>
new(sources);

public static MultiSourceDiagnosticsListener Start(int capacity, params string[] sources) =>
new(sources, capacity);
}

/// <summary>
/// Generic listener that ignores the default diagnostics model and always returns just the value and only if its not null.
/// </summary>
public class SingleSourceDiagnosticsListener : IEnumerable<object>, IDisposable {
public SingleSourceDiagnosticsListener(string source, int capacity = 10, Action<object>? onEvent = null) {
Listener = new(source, capacity, data => {
if (data.Value is not null)
onEvent?.Invoke(data.Value);
});
}

GenericDiagnosticsListener Listener { get; }

List<object> ValidEvents => Listener.CollectedEvents
.Where(x => x.Value is not null)
.Select(x => x.Value!)
.ToList();

public string Source => Listener.Source;
public int Capacity => Listener.Capacity;

public IReadOnlyList<object> CollectedEvents => ValidEvents;

public bool HasCollectedEvents => Listener.HasCollectedEvents;

public void ClearCollectedEvents() => Listener.ClearCollectedEvents();

public IEnumerator<object> GetEnumerator() => ValidEvents.GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

public void Dispose() => Listener.Dispose();

public static SingleSourceDiagnosticsListener Start(string source, int capacity) =>
new(source, capacity);

public static SingleSourceDiagnosticsListener Start(string source) =>
new(source);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, string source) =>
new(source, 10, onEvent);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, int capacity, string source) =>
new(source, capacity, onEvent);
}

/// <summary>
/// Generic listener that also collects the last N events and can be used to subscribe to a single source.
/// </summary>
class GenericDiagnosticsListener : IDisposable, IEnumerable<KeyValuePair<string, object?>> {
static readonly object Locker = new();

public GenericDiagnosticsListener(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) {
if (string.IsNullOrWhiteSpace(source))
throw new ArgumentException("Source cannot be null or whitespace.", nameof(source));

ArgumentOutOfRangeException.ThrowIfNegative(capacity);

Source = source;
Capacity = capacity;
Queue = new(capacity);

var observer = new GenericObserver<KeyValuePair<string, object?>>(data => {
if (capacity > 0)
Queue.Enqueue(data);

try {
onEvent?.Invoke(data);
}
catch {
// stay on target
}
});

ListenerSubscription = DiagnosticListener.AllListeners
.Subscribe(new GenericObserver<DiagnosticListener>(OnNewListener));

return;

void OnNewListener(DiagnosticListener listener) {
if (listener.Name != source) return;

lock (Locker) {
NetworkSubscription?.Dispose();
NetworkSubscription = listener.Subscribe(observer);
}
}
}

FixedSizedQueue<KeyValuePair<string, object?>> Queue { get; }
IDisposable? ListenerSubscription { get; }
IDisposable? NetworkSubscription { get; set; }

public string Source { get; }
public int Capacity { get; }

public IReadOnlyList<KeyValuePair<string, object?>> CollectedEvents => Queue.ToList();

public bool HasCollectedEvents => Queue.Count != 0;

public void ClearCollectedEvents() => Queue.Clear();

public void Dispose() {
NetworkSubscription?.Dispose();
ListenerSubscription?.Dispose();
ClearCollectedEvents();
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator() => Queue.ToList().GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

class GenericObserver<T>(Action<T>? onNext, Action? onCompleted = null) : IObserver<T> {
public void OnNext(T value) => _onNext(value);
public void OnCompleted() => _onCompleted();

public void OnError(Exception error) { }

readonly Action<T> _onNext = onNext ?? (_ => { });
readonly Action _onCompleted = onCompleted ?? (() => { });
}

class FixedSizedQueue<T>(int maxSize) : Queue<T> {
readonly object _locker = new();

public new void Enqueue(T item) {
lock (_locker) {
base.Enqueue(item);
if (Count > maxSize)
Dequeue(); // Throw away
}
}

public new void Clear() {
lock (_locker) {
base.Clear();
}
}
}

public static GenericDiagnosticsListener Start(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, capacity, onEvent);

public static GenericDiagnosticsListener Start(string source, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, 10, onEvent);
}


98 changes: 86 additions & 12 deletions src/EventStore.Plugins/Diagnostics/PluginDiagnosticsData.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,91 @@
namespace EventStore.Plugins.Diagnostics;

/// <summary>
/// Represents the mode of data collection for a plugin event.
/// </summary>
public enum PluginDiagnosticsDataCollectionMode {
/// <summary>
/// Appends multiple events regardless or their type.
/// </summary>
Event,

/// <summary>
/// Override previously collected event data.
/// </summary>
Snapshot,

/// <summary>
/// Merges with previously collected event data.
/// </summary>
Partial
}

/// <summary>
/// Represents diagnostic data of a plugin.
/// By default it is a snapshot and will override previously collected data, by event name.
/// </summary>
/// <param name="Source">The source of the event that matches the DiagnosticsName</param>
/// <param name="EventName">The name of the event. The default is PluginDiagnosticsData.</param>
/// <param name="Data">The data associated with the event in the form of a dictionary.</param>
/// <param name="Timestamp">When the event occurred.</param>
/// <param name="IsSnapshot">Whether the event is a snapshot and should override previously collected data, by event name. Default value is true.</param>
public readonly record struct PluginDiagnosticsData(
string Source,
string EventName,
Dictionary<string, object?> Data,
DateTimeOffset Timestamp,
bool IsSnapshot = true
);
public readonly record struct PluginDiagnosticsData() : IComparable<PluginDiagnosticsData>, IComparable {
public static PluginDiagnosticsData None { get; } = new() { Data = null! };

/// <summary>
/// The source of the event that matches the DiagnosticsName.
/// </summary>
public string Source { get; init; } = string.Empty;

/// <summary>
/// The name of the event. The default is PluginDiagnosticsData.
/// </summary>
public string EventName { get; init; } = nameof(PluginDiagnosticsData);

/// <summary>
/// The data associated with the event in the form of a dictionary.
/// </summary>
public required Dictionary<string, object?> Data { get; init; }

/// <summary>
/// When the event occurred.
/// </summary>
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;

/// <summary>
/// Represents the mode of data collection for a plugin event.
/// </summary>
public PluginDiagnosticsDataCollectionMode CollectionMode { get; init; } = PluginDiagnosticsDataCollectionMode.Event;

/// <summary>
/// Gets the value associated with the specified key.
/// </summary>
public T GetValue<T>(string key, T defaultValue) =>
Data.TryGetValue(key, out var value) &&
value is T typedValue ? typedValue : defaultValue;

/// <summary>
/// Gets the value associated with the specified key.
/// </summary>
public T? GetValue<T>(string key) =>
Data.TryGetValue(key, out var value) ? (T?)value : default;

public int CompareTo(PluginDiagnosticsData other) {
var sourceComparison = string.Compare(Source, other.Source, StringComparison.Ordinal);
if (sourceComparison != 0) return sourceComparison;

var eventNameComparison = string.Compare(EventName, other.EventName, StringComparison.Ordinal);
if (eventNameComparison != 0) return eventNameComparison;

return Timestamp.CompareTo(other.Timestamp);
}

public int CompareTo(object? obj) {
if (ReferenceEquals(null, obj)) return 1;

return obj is PluginDiagnosticsData other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(PluginDiagnosticsData)}");
}

public static bool operator <(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) < 0;

public static bool operator >(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) > 0;

public static bool operator <=(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) <= 0;

public static bool operator >=(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) >= 0;
}
Loading
Loading