From 379a7ea2da3140f1b1aff4313143729a5773513e Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 25 Jan 2023 18:12:46 +0100 Subject: [PATCH] Fix #3 add readmes that are included in the nuget package (#9) --- README.md | 16 +-- elastic-ingest-dotnet.sln | 7 ++ src/Directory.Build.props | 3 +- src/Elastic.Channels/README.md | 84 ++++++++++++++++ src/Elastic.Ingest.Apm/README.md | 11 +++ src/Elastic.Ingest.Elasticsearch/README.md | 97 +++++++++++++++++++ .../CustomOtlpTraceExporter.cs | 9 +- .../Elastic.Ingest.OpenTelemetry.csproj | 7 +- src/Elastic.Ingest.OpenTelemetry/README.md | 7 ++ src/Elastic.Ingest.Transport/README.md | 7 ++ 10 files changed, 231 insertions(+), 17 deletions(-) create mode 100644 src/Elastic.Channels/README.md create mode 100644 src/Elastic.Ingest.Apm/README.md create mode 100644 src/Elastic.Ingest.Elasticsearch/README.md create mode 100644 src/Elastic.Ingest.OpenTelemetry/README.md create mode 100644 src/Elastic.Ingest.Transport/README.md diff --git a/README.md b/README.md index 186d991..0481d7d 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,16 @@ -# `Elastic.Ingest` +# `Elastic.Ingest.*` +This repository houses various `Elastic.Ingest.*` packages that utilize `Elastic.Channels` to send bulk data to various (Elastic) endpoints. -## Usage - +### Projects -```c# -``` +* [Elastic.Channels](src/Elastic.Channels/README.md) - core library that implements a batching `System.Threading.Channels.ChannelWriter` +* [Elastic.Ingest.Transport](src/Elastic.Ingest.Transport/README.md) - core library that ships common setup for pushing data utilizing [Elastic.Transport](https://github.com/elastic/elastic-transport-net) +* [Elastic.Ingest.Elasticsearch](src/Elastic.Ingest.Elasticsearch/README.md) - exposes `DataStreamChannel` and `IndexChannel` to push data to Elasticsearch with great ease. +#### in development +* [Elastic.Ingest.APM](src/Elastic.Ingest.Apm/README.md) - Pushes APM data to apm-server over the V2 intake API. Still under development. -### Projects +#### No plans of releasing +* [Elastic.Ingest.OpenTelemetry](src/Elastic.Ingest.OpenTelemetry/README.md) - a toy implementation of `Elastic.Channels` that pushes `Activities` over `OTLP` diff --git a/elastic-ingest-dotnet.sln b/elastic-ingest-dotnet.sln index c6147da..249d421 100644 --- a/elastic-ingest-dotnet.sln +++ b/elastic-ingest-dotnet.sln @@ -49,6 +49,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{B6 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Channels.Example", "examples\Elastic.Channels.Example\Elastic.Channels.Example.csproj", "{D584C7ED-A2F5-472F-9DEE-5F36B88B558E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.OpenTelemetry", "src\Elastic.Ingest.OpenTelemetry\Elastic.Ingest.OpenTelemetry.csproj", "{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -64,6 +66,7 @@ Global {8AFDD165-F5B1-4555-97E3-A376B30236D3} = {B284B3C8-2592-4B5D-B287-207285E4B7F9} {DF02EDE5-1DBD-487B-BFA3-006407B6392D} = {8A402CB0-CB84-4F7D-9F97-EFEC23F36814} {D584C7ED-A2F5-472F-9DEE-5F36B88B558E} = {B67CBB46-74C1-47EB-9E41-D55C5E0E0D85} + {92F87F85-3028-4E98-A1C7-6CCEC4392AB4} = {A60DDBBB-4BF4-4B3B-A13A-E0B409917433} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {9F529525-E8E3-463D-A920-4D6E34150FC5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -102,5 +105,9 @@ Global {D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Debug|Any CPU.Build.0 = Debug|Any CPU {D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Release|Any CPU.ActiveCfg = Release|Any CPU {D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Release|Any CPU.Build.0 = Release|Any CPU + {92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 190c5d5..a55a2d4 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -17,10 +17,11 @@ nuget-icon.png True - + README.md + nuget-icon.png True diff --git a/src/Elastic.Channels/README.md b/src/Elastic.Channels/README.md new file mode 100644 index 0000000..07ccfb1 --- /dev/null +++ b/src/Elastic.Channels/README.md @@ -0,0 +1,84 @@ +# Elastic.Channels + +Provides an specialized `System.Threading.Channels.ChannelWriter` implementation that makes it easy +to consume data pushed to that thread in batches. + +The batches will emit either when a certain maximum is hit or when a batch's lifecycle exceeds a certain age. + +This allows data of various rates to pushed in the same manner while different implementations to send the batched data to receivers can be implemented. + +This package serves mainly as a core library with abstract classes +and does not ship any useful implementations. + +It ships with a `NoopBufferedChannel` implementation that does nothing in its `Send` implementation for unit test and benchmark purposes. + + +## BufferedChannelBase<> + +An abstract class that requires implementers to implement: + +```csharp +protected abstract Task Send(IReadOnlyCollection buffer); +``` + +Any implementation allows data to pushed to it through: + +```csharp +var e = new TEvent(); +if (await channel.WaitToWriteAsync(e)) + written++; +``` + +## ChannelOptionsBase<> + +Implementers of `BufferedChannelBase<>` must also create their own implementation of `ChannelOptionsBase<>`. This to ensure each channel implementation creates an appropriately named options class. + + +## Quick minimal implementation + +```chsarp + +public class Event { } +public class Response { } + +public class NoopChannelOptions + : ChannelOptionsBase { } + +public class NoopBufferedChannel + : BufferedChannelBase +{ + + public NoopBufferedChannel(NoopChannelOptions options) + : base(options) { } + + protected override Task Send(IReadOnlyCollection buffer) + { + return Task.FromResult(new Response()); + } +} +``` + +Now once we instantiate an `NoopBufferedChannel` we can use it push data to it. + +```csharp +var e = new Event(); +if (await noopChannel.WaitToWriteAsync(e)) + written++; +``` + + +## BufferOptions + +Each `ChannelOptionsBase<>` implementation takes and exposes a `BufferOptions` instance. This controls the buffering behavior of `BufferedChannelBase<>`. + + +| Option | Description | +|-----------------------------|------------------------------------------------------------------------------------------------------------------------------| +| `MaxInFlightMessages` | The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped | +| `MaxConsumerBufferSize` | The number of events a local buffer should reach before sending the events in a single call to Elasticsearch. | +| `MaxRetries` | The maximum number of retries over `Send` | +| `MaxConsumerBufferLifetime` | The maximum age of buffer before its flushed | +| `ConcurrentConsumers` | Controls how many concurrent `Send` operations may occur | +| `BackOfPeriod` | Func that calculates an appropriate backoff time for a retry | +| `BufferFlushCallback` | Called `once` whenever a buffer is flushed, excluding retries | +| `WaitHandle` | Inject a waithandle that will be signalled after each flush, excluding retries. | diff --git a/src/Elastic.Ingest.Apm/README.md b/src/Elastic.Ingest.Apm/README.md new file mode 100644 index 0000000..5c64051 --- /dev/null +++ b/src/Elastic.Ingest.Apm/README.md @@ -0,0 +1,11 @@ +# Elastic.Ingest.APM + +A `Elastic.Channel` implementation of `BufferedChannelBase` that allows APM data to be written to `apm-server` over the V2 intake API. + + +Utilizes `Elastic.Transport` through `Elastic.Ingest.Transport`. + + +This project is currently still under development and not pushed to Nuget. + +We are still working on finishing this implementation as a possible replacement for the PayloadSender that's currently part of `Elastic.Apm` \ No newline at end of file diff --git a/src/Elastic.Ingest.Elasticsearch/README.md b/src/Elastic.Ingest.Elasticsearch/README.md new file mode 100644 index 0000000..e1a5989 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/README.md @@ -0,0 +1,97 @@ +# Elastic.Ingest.Elasticsearch + +`Elastic.Channels` implementations of `BufferedChannelBase` that allows data to pushed to either indices or data streams + + +## `DataStreamChannel` + +A channel that specializes to writing data with a timestamp to Elasticsearch data streams. E.g given the following document. + +```csharp +public class TimeSeriesDocument +{ + [JsonPropertyName("@timestamp")] + public DateTimeOffset Timestamp { get; set; } + + [JsonPropertyName("message")] + public string Message { get; set; } +} + +``` + +A channel can be created to push data to the `logs-dotnet-default` data stream. + +```csharp +var dataStream = new DataStreamName("logs", "dotnet"); +var bufferOptions = new BufferOptions { } +var options = new DataStreamChannelOptions(transport) +{ + DataStream = dataStream, + BufferOptions = bufferOptions +}; +var channel = new DataStreamChannel(options); +``` + +NOTE: read more about Elastic's data stream naming convention here: +https://www.elastic.co/blog/an-introduction-to-the-elastic-data-stream-naming-scheme + +we can now push data to Elasticsearch using the `DataStreamChannel` +```csharp +var doc = new TimeSeriesDocument +{ + Timestamp = DateTimeOffset.Now, + Message = "Hello World!", +} +channel.TryWrite(doc); +``` + +# `IndexChannel` + +A channel that specializes in writing catalog data to Elastic indices. +Catalog data is typically data that has an id of sorts. + +Given the following minimal document + +```csharp +public class CatalogDocument +{ + [JsonPropertyName("id")] + public string Id { get; set; } + + [JsonPropertyName("title")] + public string Title { get; set; } + + [JsonPropertyName("created")] + public DateTimeOffset Created { get; set; } +} +``` + +We can create an `IndexChannel<>` to push `CatalogDocument` instances. + +```csharp +var options = new IndexChannelOptions(transport) +{ + IndexFormat = "catalog-data-{0:yyyy.MM.dd}", + BulkOperationIdLookup = c => c.Id, + TimestampLookup = c => c.Created, +}; +var channel = new IndexChannel(options); +``` + +now we can push data using: + +```csharp +var doc = new CatalogDocument +{ + Created = date, + Title = "Hello World!", + Id = "hello-world" +} +channel.TryWrite(doc); +``` + +This will push data to `catalog-data-2023.01.1` because `TimestampLookup` yields `Created` to `IndexFormat`. + +`IndexFormat` can also simply be a fixed string to write to an Elasticsearch alias/index. + +`BulkOperationIdLookup` determines if the document should be pushed to Elasticsearch using a `create` or `index` operation. diff --git a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs index 4cc8886..491ce8f 100644 --- a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs +++ b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs @@ -10,6 +10,7 @@ using OpenTelemetry; using OpenTelemetry.Exporter; using OpenTelemetry.Resources; +using Elastic.Channels; namespace Elastic.Ingest.OpenTelemetry { @@ -33,11 +34,7 @@ public CustomOtlpTraceExporter(OtlpExporterOptions options, TraceChannelOptions } - public class TraceBufferOptions : BufferOptions - { - } - - public class TraceChannelOptions : ChannelOptionsBase + public class TraceChannelOptions : ChannelOptionsBase { public string? ServiceName { get; set; } public Uri? Endpoint { get; set; } @@ -49,7 +46,7 @@ public class TraceExportResult public ExportResult Result { get; internal set; } } - public class TraceChannel : ChannelBase + public class TraceChannel : BufferedChannelBase { public TraceChannel(TraceChannelOptions options) : base(options) { var o = new OtlpExporterOptions(); diff --git a/src/Elastic.Ingest.OpenTelemetry/Elastic.Ingest.OpenTelemetry.csproj b/src/Elastic.Ingest.OpenTelemetry/Elastic.Ingest.OpenTelemetry.csproj index 180d21e..5369c6b 100644 --- a/src/Elastic.Ingest.OpenTelemetry/Elastic.Ingest.OpenTelemetry.csproj +++ b/src/Elastic.Ingest.OpenTelemetry/Elastic.Ingest.OpenTelemetry.csproj @@ -2,16 +2,15 @@ netstandard2.1 - Elasticsearch Buffer backed data shipper - TODO - TODO + Offers an easy to use ChannelWriter implementation to push activities over OTLP to OpenTelemetry endpoints + elastic, channels, apm, ingest, opentelemetry latest enable false - + diff --git a/src/Elastic.Ingest.OpenTelemetry/README.md b/src/Elastic.Ingest.OpenTelemetry/README.md new file mode 100644 index 0000000..cb35018 --- /dev/null +++ b/src/Elastic.Ingest.OpenTelemetry/README.md @@ -0,0 +1,7 @@ +# Elastic.Ingest.OpenTelemetry + +A `Elastic.Channel` implementation of `BufferedChannelBase` that allows OpenTelemetry data to be written over OTLP. + +This is not currently published to NuGet with no current plans to ever do so. + +This project currently only exists as a proof of concept to validate the concepts of `Elastic.Channels` \ No newline at end of file diff --git a/src/Elastic.Ingest.Transport/README.md b/src/Elastic.Ingest.Transport/README.md new file mode 100644 index 0000000..8790f75 --- /dev/null +++ b/src/Elastic.Ingest.Transport/README.md @@ -0,0 +1,7 @@ +# Elastic.Ingest.Transport + +An abstract `Elastic.Channels` implementation of `BufferedChannelBase` that allows implementes to quickly utilize `Elastic.Transport` to send data over HTTP(S) to one or many receiving endpoints. + +This is a core library that does not ship any useful implementation. + +See e.g `Elastic.Ingest.Elasticsearch` for a concrete implementation to push data to Elasticsearch \ No newline at end of file