Skip to content

Commit

Permalink
Prefer index name on URL for fixed index names and data streams (#35)
Browse files Browse the repository at this point in the history
* Refactor and optimize index and data stream bulk request data

* Add benchmarks

* Rename class

* Add templated index name benchmark

* Remove comparison code

* Fix URLs

* Fixup

* Add tests

* Fix stock data sample array length

* Finalise benchmarks and fix results

* Fix BOM

* Fix formatting
  • Loading branch information
stevejgordon authored May 5, 2023
1 parent feb057d commit e2b11b0
Show file tree
Hide file tree
Showing 16 changed files with 524 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Elastic.Ingest.Elasticsearch.Benchmarks;

[SimpleJob(RunStrategy.Monitoring, invocationCount: 10, id: "BulkIngestionJob")]
public class BulkIngestion
public class BulkIngestionBenchmarks
{
private static readonly int MaxExportSize = 5_000;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;
using Performance.Common;

namespace Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks;

public class BulkRequestCreationForDataStreamBenchmarks
{
private static readonly int DocumentsToIndex = 1_000;

private DataStreamChannelOptions<StockData>? _options;
private HttpTransport? _transport;
private TransportConfiguration? _transportConfiguration;
private StockData[] _data = Array.Empty<StockData>();
private readonly BulkOperationHeader _bulkOperationHeader = new CreateOperation();

public Stream MemoryStream { get; } = new MemoryStream();

[GlobalSetup]
public void Setup()
{
_transportConfiguration = new TransportConfiguration(
new SingleNodePool(new("http://localhost:9200")),
new InMemoryConnection(StockData.CreateSampleDataSuccessWithFilterPathResponseBytes(DocumentsToIndex)));

_transport = new DefaultHttpTransport(_transportConfiguration);

_options = new DataStreamChannelOptions<StockData>(_transport)
{
BufferOptions = new Channels.BufferOptions
{
OutboundBufferMaxSize = DocumentsToIndex
}
};

_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader);
var requestData = new RequestData(Elastic.Transport.HttpMethod.POST, "/_bulk", PostData.ReadOnlyMemory(bytes), _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Ingest.Elasticsearch.Serialization;
using Performance.Common;

namespace Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks;

public class BulkRequestCreationWithFixedIndexNameBenchmarks
{
private static readonly int DocumentsToIndex = 1_000;

private IndexChannelOptions<StockData>? _options;
private HttpTransport? _transport;
private TransportConfiguration? _transportConfiguration;
private StockData[] _data = Array.Empty<StockData>();

public Stream MemoryStream { get; } = new MemoryStream();

[GlobalSetup]
public void Setup()
{
_transportConfiguration = new TransportConfiguration(
new SingleNodePool(new("http://localhost:9200")),
new InMemoryConnection(StockData.CreateSampleDataSuccessWithFilterPathResponseBytes(DocumentsToIndex)));

_transport = new DefaultHttpTransport(_transportConfiguration);

_options = new IndexChannelOptions<StockData>(_transport)
{
BufferOptions = new Channels.BufferOptions
{
OutboundBufferMaxSize = DocumentsToIndex
},
IndexFormat = "stock-data-v8"
};

_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true));
var requestData = new RequestData(Elastic.Transport.HttpMethod.POST, "/_bulk", PostData.ReadOnlyMemory(bytes), _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Ingest.Elasticsearch.Serialization;
using Performance.Common;

namespace Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks;

public class BulkRequestCreationWithTemplatedIndexNameBenchmarks
{
private static readonly int DocumentsToIndex = 1_000;

private IndexChannelOptions<StockData>? _options;
private HttpTransport? _transport;
private TransportConfiguration? _transportConfiguration;
private StockData[] _data = Array.Empty<StockData>();

public Stream MemoryStream { get; } = new MemoryStream();

[GlobalSetup]
public void Setup()
{
_transportConfiguration = new TransportConfiguration(
new SingleNodePool(new("http://localhost:9200")),
new InMemoryConnection(StockData.CreateSampleDataSuccessWithFilterPathResponseBytes(DocumentsToIndex)));

_transport = new DefaultHttpTransport(_transportConfiguration);

_options = new IndexChannelOptions<StockData>(_transport)
{
BufferOptions = new Channels.BufferOptions
{
OutboundBufferMaxSize = DocumentsToIndex
}
};

_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task DynamicIndexName_WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false));
var requestData = new RequestData(Elastic.Transport.HttpMethod.POST, "/_bulk", PostData.ReadOnlyMemory(bytes), _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory);
await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<DebugSymbols>true</DebugSymbols>
</PropertyGroup>
Expand Down
16 changes: 15 additions & 1 deletion benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@
using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running;
using System.Globalization;
using Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks;

#if DEBUG
// MANUALLY RUN A BENCHMARKED METHOD DURING DEBUGGING

//var bm = new BulkIngestion();
//bm.Setup();
//await bm.BulkAllAsync();
//Console.WriteLine("DONE");

var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
bm.Setup();
await bm.WriteToStreamAsync_OLD();

var length = bm.MemoryStream.Length;

bm.MemoryStream.Position = 0;
var sr = new StreamReader(bm.MemoryStream);
var json = sr.ReadToEnd();

Console.ReadKey();
#else
var config = ManualConfig.Create(DefaultConfig.Instance);
config.SummaryStyle = new SummaryStyle(CultureInfo.CurrentCulture, true, BenchmarkDotNet.Columns.SizeUnit.B, null!, ratioStyle: BenchmarkDotNet.Columns.RatioStyle.Percentage);
config.AddDiagnoser(MemoryDiagnoser.Default);

BenchmarkRunner.Run<BulkIngestion>(config);
BenchmarkRunner.Run<BulkRequestCreationWithTemplatedIndexNameBenchmarks>(config);
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,38 @@ Baseline for commit 8eef0e1dfbee8c5198ec0ab975b5d826e2356fcd (from 2023-05-02) i
| BulkAll | False | True | 116.5 ms | 47.78 ms | 31.60 ms | 11600.0000 | 2400.0000 | 2000.0000 | 97934119 B |
| BulkAll | True | False | 150.9 ms | 78.36 ms | 51.83 ms | 34800.0000 | 5400.0000 | 1700.0000 | 82012232 B |
| BulkAll | True | True | 150.3 ms | 66.08 ms | 43.71 ms | 20900.0000 | 3700.0000 | 2000.0000 | 91253012 B |


BulkRequestCreationWithFixedIndexNameBenchmarks
-----------------------------------------------

| Method | Mean [ms] | Error [ms] | StdDev [ms] | Ratio | RatioSD | Gen0 | Gen1 | Gen2 | Allocated [B] | Alloc Ratio |
|----------------------- |----------:|-----------:|------------:|---------:|--------:|---------:|---------:|---------:|--------------:|------------:|
| WriteToStreamAsync_OLD | 2.666 ms | 0.0553 ms | 0.1578 ms | baseline | | 121.0938 | 121.0938 | 121.0938 | 653139 B | |
| WriteToStreamAsync | 2.447 ms | 0.0487 ms | 0.1148 ms | -8% | 7.5% | 121.0938 | 121.0938 | 121.0938 | 573139 B | -12% |

Previous Request Body = 205000B
New Request Body = 181000B
Saves 24B per operation in the request body

BulkRequestCreationWithFixedIndexNameBenchmarks (1k events)
-----------------------------------------------------------

| Method | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|---------------------------------- |----------:|-----------:|------------:|---------:|---------:|---------:|--------------:|
| FixedIndexName_WriteToStreamAsync | 2.351 ms | 0.0481 ms | 0.1364 ms | 121.0938 | 121.0938 | 121.0938 | 573139 B |

BulkRequestCreationWithTemplatedIndexNameBenchmarks (1k events)
---------------------------------------------------------------

| Method | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|------------------------------------ |----------:|-----------:|------------:| ---------:|---------:|---------:|--------------:|
| DynamicIndexName_WriteToStreamAsync | 2.836 ms | 0.0564 ms | 0.1002 ms | 121.0938 | 121.0938 | 121.0938 | 661139 B |


BulkRequestCreationForDataStreamBenchmarks (1k events)
------------------------------------------------------

| Method | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|------------------- |----------:|-----------:|------------:|---------:|---------:|---------:|--------------:|
| WriteToStreamAsync | 2.261 ms | 0.0442 ms | 0.1041 ms | 121.0938 | 121.0938 | 121.0938 | 525139 B |
6 changes: 3 additions & 3 deletions benchmarks/Performance.Common/StockData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static StockData ParseFromFileLine(string dataLine)

public static StockData[] CreateSampleData(long count)
{
var data = new StockData[100_000];
var data = new StockData[count];

for (var i = 0; i < count; i++)
{
Expand All @@ -75,11 +75,11 @@ public static byte[] CreateSampleDataSuccessWithFilterPathResponseBytes(long cou
if (i < count - 1)
{
responseBytes[offset + FilterPathItemResponseBytes.Length] = Comma;
offset += (FilterPathItemResponseBytes.Length + 1);
offset += FilterPathItemResponseBytes.Length + 1;
}
else
{
offset += (FilterPathItemResponseBytes.Length);
offset += FilterPathItemResponseBytes.Length;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using System.Collections.Generic;
using System.Linq;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
Expand All @@ -15,15 +14,19 @@ namespace Elastic.Ingest.Elasticsearch.DataStreams;
public class DataStreamChannel<TEvent> : ElasticsearchChannelBase<TEvent, DataStreamChannelOptions<TEvent>>
{
private readonly CreateOperation _fixedHeader;
private readonly string _url;

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options) : this(options, null) { }

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners) : base(options, callbackListeners)
{
var target = Options.DataStream.ToString();
_fixedHeader = new CreateOperation { Index = target };
var dataStream = Options.DataStream.ToString();

_url = $"/{dataStream}{base.BulkUrl}";

_fixedHeader = new CreateOperation();
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.CreateBulkOperationHeader"/>
Expand All @@ -34,6 +37,9 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateWildcard"/>
protected override string TemplateWildcard => Options.DataStream.GetNamespaceWildcard();

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent, TChannelOptions}.BulkUrl"/>
protected override string BulkUrl => _url;

/// <summary>
/// Gets a default index template for the current <see cref="DataStreamChannel{TEvent}"/>
/// </summary>
Expand Down
Loading

0 comments on commit e2b11b0

Please sign in to comment.