Skip to content

Commit

Permalink
Merge branch 'master' into db-777-encryption-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
timothycoleman committed Jun 10, 2024
2 parents b997425 + ea86e9b commit 7cf9733
Show file tree
Hide file tree
Showing 11 changed files with 673 additions and 140 deletions.
1 change: 0 additions & 1 deletion src/EventStore.Plugins/ConfigParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace EventStore.Plugins;

// move this to ComercialHA
public static class ConfigParser {
/// <summary>
/// Deserializes a section of configuration from a given config file into the provided settings type
Expand Down
202 changes: 202 additions & 0 deletions src/EventStore.Plugins/Diagnostics/DiagnosticsListeners.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
using System.Collections;
using System.Diagnostics;

namespace EventStore.Plugins.Diagnostics;

public delegate void OnSourceEvent(string source, object data);

/// <summary>
/// Generic listener that can subscribe to multiple sources, ignores the default diagnostics model and always returns just the value and only if its not null.
/// </summary>
public class MultiSourceDiagnosticsListener : IDisposable {
public MultiSourceDiagnosticsListener(string[] sources, int capacity = 10, OnSourceEvent? onEvent = null) {
foreach (var source in sources)
Listeners.TryAdd(source, new(source, capacity, data => onEvent?.Invoke(source, data)));
}

Dictionary<string, SingleSourceDiagnosticsListener> Listeners { get; } = new();

public IEnumerable<object> CollectedEvents(string source) =>
Listeners.TryGetValue(source, out var listener) ? (IEnumerable<object>)listener : [];

public bool HasCollectedEvents(string source) =>
Listeners.TryGetValue(source, out var listener) && listener.HasCollectedEvents;

public void ClearCollectedEvents(string source) {
if (Listeners.TryGetValue(source, out var listener))
listener.ClearCollectedEvents();
}

public void ClearAllCollectedEvents() {
foreach (var listener in Listeners.Values)
listener.ClearCollectedEvents();
}

public void Dispose() {
foreach (var listener in Listeners.Values)
listener.Dispose();

Listeners.Clear();
}

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, params string[] sources) =>
new(sources, 10, onEvent);

public static MultiSourceDiagnosticsListener Start(OnSourceEvent onEvent, int capacity, params string[] sources) =>
new(sources, capacity, onEvent);

public static MultiSourceDiagnosticsListener Start(params string[] sources) =>
new(sources);

public static MultiSourceDiagnosticsListener Start(int capacity, params string[] sources) =>
new(sources, capacity);
}

/// <summary>
/// Generic listener that ignores the default diagnostics model and always returns just the value and only if its not null.
/// </summary>
public class SingleSourceDiagnosticsListener : IEnumerable<object>, IDisposable {
public SingleSourceDiagnosticsListener(string source, int capacity = 10, Action<object>? onEvent = null) {
Listener = new(source, capacity, data => {
if (data.Value is not null)
onEvent?.Invoke(data.Value);
});
}

GenericDiagnosticsListener Listener { get; }

List<object> ValidEvents => Listener.CollectedEvents
.Where(x => x.Value is not null)
.Select(x => x.Value!)
.ToList();

public string Source => Listener.Source;
public int Capacity => Listener.Capacity;

public IReadOnlyList<object> CollectedEvents => ValidEvents;

public bool HasCollectedEvents => Listener.HasCollectedEvents;

public void ClearCollectedEvents() => Listener.ClearCollectedEvents();

public IEnumerator<object> GetEnumerator() => ValidEvents.GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

public void Dispose() => Listener.Dispose();

public static SingleSourceDiagnosticsListener Start(string source, int capacity) =>
new(source, capacity);

public static SingleSourceDiagnosticsListener Start(string source) =>
new(source);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, string source) =>
new(source, 10, onEvent);

public static SingleSourceDiagnosticsListener Start(Action<object> onEvent, int capacity, string source) =>
new(source, capacity, onEvent);
}

/// <summary>
/// Generic listener that also collects the last N events and can be used to subscribe to a single source.
/// </summary>
class GenericDiagnosticsListener : IDisposable, IEnumerable<KeyValuePair<string, object?>> {
static readonly object Locker = new();

public GenericDiagnosticsListener(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) {
if (string.IsNullOrWhiteSpace(source))
throw new ArgumentException("Source cannot be null or whitespace.", nameof(source));

ArgumentOutOfRangeException.ThrowIfNegative(capacity);

Source = source;
Capacity = capacity;
Queue = new(capacity);

var observer = new GenericObserver<KeyValuePair<string, object?>>(data => {
if (capacity > 0)
Queue.Enqueue(data);

try {
onEvent?.Invoke(data);
}
catch {
// stay on target
}
});

ListenerSubscription = DiagnosticListener.AllListeners
.Subscribe(new GenericObserver<DiagnosticListener>(OnNewListener));

return;

void OnNewListener(DiagnosticListener listener) {
if (listener.Name != source) return;

lock (Locker) {
NetworkSubscription?.Dispose();
NetworkSubscription = listener.Subscribe(observer);
}
}
}

FixedSizedQueue<KeyValuePair<string, object?>> Queue { get; }
IDisposable? ListenerSubscription { get; }
IDisposable? NetworkSubscription { get; set; }

public string Source { get; }
public int Capacity { get; }

public IReadOnlyList<KeyValuePair<string, object?>> CollectedEvents => Queue.ToList();

public bool HasCollectedEvents => Queue.Count != 0;

public void ClearCollectedEvents() => Queue.Clear();

public void Dispose() {
NetworkSubscription?.Dispose();
ListenerSubscription?.Dispose();
ClearCollectedEvents();
}

public IEnumerator<KeyValuePair<string, object?>> GetEnumerator() => Queue.ToList().GetEnumerator();

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

class GenericObserver<T>(Action<T>? onNext, Action? onCompleted = null) : IObserver<T> {
public void OnNext(T value) => _onNext(value);
public void OnCompleted() => _onCompleted();

public void OnError(Exception error) { }

readonly Action<T> _onNext = onNext ?? (_ => { });
readonly Action _onCompleted = onCompleted ?? (() => { });
}

class FixedSizedQueue<T>(int maxSize) : Queue<T> {
readonly object _locker = new();

public new void Enqueue(T item) {
lock (_locker) {
base.Enqueue(item);
if (Count > maxSize)
Dequeue(); // Throw away
}
}

public new void Clear() {
lock (_locker) {
base.Clear();
}
}
}

public static GenericDiagnosticsListener Start(string source, int capacity = 10, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, capacity, onEvent);

public static GenericDiagnosticsListener Start(string source, Action<KeyValuePair<string, object?>>? onEvent = null) =>
new(source, 10, onEvent);
}


98 changes: 86 additions & 12 deletions src/EventStore.Plugins/Diagnostics/PluginDiagnosticsData.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,91 @@
namespace EventStore.Plugins.Diagnostics;

/// <summary>
/// Represents the mode of data collection for a plugin event.
/// </summary>
public enum PluginDiagnosticsDataCollectionMode {
/// <summary>
/// Appends multiple events regardless or their type.
/// </summary>
Event,

/// <summary>
/// Override previously collected event data.
/// </summary>
Snapshot,

/// <summary>
/// Merges with previously collected event data.
/// </summary>
Partial
}

/// <summary>
/// Represents diagnostic data of a plugin.
/// By default it is a snapshot and will override previously collected data, by event name.
/// </summary>
/// <param name="Source">The source of the event that matches the DiagnosticsName</param>
/// <param name="EventName">The name of the event. The default is PluginDiagnosticsData.</param>
/// <param name="Data">The data associated with the event in the form of a dictionary.</param>
/// <param name="Timestamp">When the event occurred.</param>
/// <param name="IsSnapshot">Whether the event is a snapshot and should override previously collected data, by event name. Default value is true.</param>
public readonly record struct PluginDiagnosticsData(
string Source,
string EventName,
Dictionary<string, object?> Data,
DateTimeOffset Timestamp,
bool IsSnapshot = true
);
public readonly record struct PluginDiagnosticsData() : IComparable<PluginDiagnosticsData>, IComparable {
public static PluginDiagnosticsData None { get; } = new() { Data = null! };

/// <summary>
/// The source of the event that matches the DiagnosticsName.
/// </summary>
public string Source { get; init; } = string.Empty;

/// <summary>
/// The name of the event. The default is PluginDiagnosticsData.
/// </summary>
public string EventName { get; init; } = nameof(PluginDiagnosticsData);

/// <summary>
/// The data associated with the event in the form of a dictionary.
/// </summary>
public required Dictionary<string, object?> Data { get; init; }

/// <summary>
/// When the event occurred.
/// </summary>
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;

/// <summary>
/// Represents the mode of data collection for a plugin event.
/// </summary>
public PluginDiagnosticsDataCollectionMode CollectionMode { get; init; } = PluginDiagnosticsDataCollectionMode.Event;

/// <summary>
/// Gets the value associated with the specified key.
/// </summary>
public T GetValue<T>(string key, T defaultValue) =>
Data.TryGetValue(key, out var value) &&
value is T typedValue ? typedValue : defaultValue;

/// <summary>
/// Gets the value associated with the specified key.
/// </summary>
public T? GetValue<T>(string key) =>
Data.TryGetValue(key, out var value) ? (T?)value : default;

public int CompareTo(PluginDiagnosticsData other) {
var sourceComparison = string.Compare(Source, other.Source, StringComparison.Ordinal);
if (sourceComparison != 0) return sourceComparison;

var eventNameComparison = string.Compare(EventName, other.EventName, StringComparison.Ordinal);
if (eventNameComparison != 0) return eventNameComparison;

return Timestamp.CompareTo(other.Timestamp);
}

public int CompareTo(object? obj) {
if (ReferenceEquals(null, obj)) return 1;

return obj is PluginDiagnosticsData other ? CompareTo(other) : throw new ArgumentException($"Object must be of type {nameof(PluginDiagnosticsData)}");
}

public static bool operator <(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) < 0;

public static bool operator >(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) > 0;

public static bool operator <=(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) <= 0;

public static bool operator >=(PluginDiagnosticsData left, PluginDiagnosticsData right) => left.CompareTo(right) >= 0;
}
Loading

0 comments on commit 7cf9733

Please sign in to comment.