Skip to content

Commit

Permalink
Add initial benchmarks and profiling projects
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon committed May 3, 2023
1 parent 8eef0e1 commit 66b07e7
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using BenchmarkDotNet.Engines;
using Performance.Common;

namespace Elastic.Ingest.Elasticsearch.Benchmarks;

[SimpleJob(RunStrategy.Monitoring, invocationCount: 20, id: "BulkIngestionJob")]
public class BulkIngestion
{
private static readonly int MaxExportSize = 5_000;

private readonly ManualResetEvent _waitHandle = new(false);
private StockData[] _data = Array.Empty<StockData>();
private IndexChannelOptions<StockData>? _options;

#if DEBUG
private long _responses;
#endif

//[Params(100_000)]
public int DocumentsToIndex { get; set; } = 100_000;

[ParamsAllValues]
public bool DisableDiagnostics { get; set; }

[GlobalSetup]
public void Setup()
{
_data = StockData.CreateSampleData(DocumentsToIndex);

var transport = new DefaultHttpTransport(
new TransportConfiguration(
new SingleNodePool(new("http://localhost:9200")),
new InMemoryConnection(StockData.CreateSampleDataSuccessWithFilterPathResponseBytes(MaxExportSize))));

_options = new IndexChannelOptions<StockData>(transport)
{
BufferOptions = new Channels.BufferOptions
{
OutboundBufferMaxSize = MaxExportSize
},
DisableDiagnostics = DisableDiagnostics,
IndexFormat = "stock-data-v8",
OutboundChannelExitedCallback = () => _waitHandle.Set(),
#if DEBUG
ExportResponseCallback = (response, a) =>
{
Interlocked.Add(ref _responses, a.Count);
Console.WriteLine(_responses);
},
PublishToOutboundChannelCallback = () => Console.WriteLine("PUBLISHED")
#endif
};
}

[Benchmark]
public void BulkAll()
{
var channel = new IndexChannel<StockData>(_options!);

channel.TryWriteMany(_data);
channel.TryComplete();

_waitHandle.WaitOne();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<DebugSymbols>true</DebugSymbols>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Ingest.Elasticsearch\Elastic.Ingest.Elasticsearch.csproj" />
<ProjectReference Include="..\Performance.Common\Performance.Common.csproj" />
</ItemGroup>

</Project>
28 changes: 28 additions & 0 deletions benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

global using Elastic.Transport;
global using Elastic.Ingest.Elasticsearch.Benchmarks;
global using Elastic.Ingest.Elasticsearch.Indices;
global using BenchmarkDotNet.Attributes;

using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running;
using System.Globalization;

#if DEBUG
// MANUALLY RUN A BENCHMARKED METHOD DURING DEBUGGING
//var bm = new BulkIngestion();
//bm.Setup();
//await bm.BulkAllAsync();
//Console.WriteLine("DONE");
#else
var config = ManualConfig.Create(DefaultConfig.Instance);
config.SummaryStyle = new SummaryStyle(CultureInfo.CurrentCulture, true, BenchmarkDotNet.Columns.SizeUnit.B, null!, ratioStyle: BenchmarkDotNet.Columns.RatioStyle.Percentage);
config.AddDiagnoser(MemoryDiagnoser.Default);

BenchmarkRunner.Run<BulkIngestion>(config);
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FUTURE: Automate benchmarks and export to Elasticsearch as part of CI (GitHub actions) such that we can identify and flag any potential regressions and track improvements over time.
FUTURE: Introduce micro benchmarks for specific areas of ingestion as we target optimisations.

BulkIngestion.BulkAll
---------------------

An end-to-end (in-memory) macro benchmark of indexing many events, including channel creation.
This has some variance between runs and is configured with the monitoring strategy.

Baseline for commit 8eef0e1dfbee8c5198ec0ab975b5d826e2356fcd (from 2023-05-02) in main.

50k events

| Method | DisableDiagnostics | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|-------- |------------------- |----------:|-----------:|------------:|----------:|----------:|---------:|--------------:|
| BulkAll | False | 91.34 ms | 4.409 ms | 12.79 ms | 6187.5000 | 2562.5000 | 437.5000 | 57612712 B |
| BulkAll | True | 92.43 ms | 5.235 ms | 15.27 ms | 6875.0000 | 2750.0000 | 625.0000 | 63323152 B |

| Method | DisableDiagnostics | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|-------- |------------------- |----------:|-----------:|------------:|----------:|----------:|---------:|--------------:|
| BulkAll | False | 68.40 ms | 18.58 ms | 12.29 ms | 7300.0000 | 2900.0000 | 800.0000 | 65352481 B |
| BulkAll | True | 65.23 ms | 20.96 ms | 13.86 ms | 6800.0000 | 3000.0000 | 700.0000 | 61077358 B |

100k events

| Method | DisableDiagnostics | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|-------- |------------------- |----------:|-----------:|------------:|----------:|----------:|---------:|--------------:|
| BulkAll | False | 83.54 ms | 15.04 ms | 9.947 ms | 7600.0000 | 2800.0000 | 300.0000 | 78001143 B |
| BulkAll | True | 74.30 ms | 11.69 ms | 7.734 ms | 7500.0000 | 2800.0000 | 300.0000 | 76354498 B |

| Method | DisableDiagnostics | Mean [ms] | Error [ms] | StdDev [ms] | Gen0 | Gen1 | Gen2 | Allocated [B] |
|-------- |------------------- |----------:|-----------:|------------:|----------:|----------:|---------:|--------------:|
| BulkAll | False | 86.79 ms | 18.78 ms | 12.423 ms | 8150.0000 | 3250.0000 | 300.0000 | 83103930 B |
| BulkAll | True | 94.95 ms | 14.55 ms | 9.627 ms | 7250.0000 | 2950.0000 | 250.0000 | 74070402 B |
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Profiler.Api" Version="1.3.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Ingest.Elasticsearch\Elastic.Ingest.Elasticsearch.csproj" />
<ProjectReference Include="..\Performance.Common\Performance.Common.csproj" />
</ItemGroup>

</Project>
82 changes: 82 additions & 0 deletions benchmarks/Elastic.Ingest.Elasticsearch.Profiling/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Channels;
using Elastic.Ingest.Elasticsearch.Indices;
using Performance.Common;
using Elastic.Transport;
using JetBrains.Profiler.Api;

#if DEBUG
long responses = 0;
#endif

var disableDiagnostics = true;

var waitHandle = new ManualResetEvent(false);
var stockData = StockData.CreateSampleData(100_000);

var cloudId = Environment.GetEnvironmentVariable("APM_CLOUD_ID");
var elasticPassword = Environment.GetEnvironmentVariable("APM_CLOUD_PASSWORD");

if (string.IsNullOrEmpty(cloudId) || string.IsNullOrEmpty(elasticPassword))
throw new Exception("Missing environment variables for cloud connection");

MemoryProfiler.ForceGc();
MemoryProfiler.CollectAllocations(true);

var configuration = new TransportConfiguration(cloudId, new BasicAuthentication("elastic", elasticPassword))
.ServerCertificateValidationCallback((a,b,c,d) => true); // Trust the local certificate if we're passing through Fiddler with SSL decryption

var transport = new DefaultHttpTransport(configuration);

MemoryProfiler.GetSnapshot("Before");

var channelOptions = new IndexChannelOptions<StockData>(transport)
{
BufferOptions = new BufferOptions { OutboundBufferMaxSize = 5_000 },
DisableDiagnostics = disableDiagnostics,
IndexFormat = "stock-data-v8",
OutboundChannelExitedCallback = () =>
{
waitHandle.Set();
},
#if DEBUG
ExportResponseCallback = (response, a) =>
{
Interlocked.Add(ref responses, a.Count);
Console.WriteLine(responses);
},
PublishToOutboundChannelCallback = () => Console.WriteLine("PUBLISHED")
#endif
};

var indexChannel = new IndexChannel<StockData>(channelOptions);

MemoryProfiler.GetSnapshot("Before write");

#if DEBUG
Console.WriteLine("Write data.");
#endif

indexChannel.TryWriteMany(stockData);
indexChannel.TryComplete();

#if DEBUG
Console.WriteLine("Awaiting completion.");
#endif

waitHandle.WaitOne();

#if DEBUG
Console.WriteLine("Completed.");
#endif

MemoryProfiler.GetSnapshot("After write and flush");

if (!disableDiagnostics)
{
var diagnostics = indexChannel.ToString();
MemoryProfiler.GetSnapshot("After diagnostics");
}
9 changes: 9 additions & 0 deletions benchmarks/Performance.Common/Performance.Common.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
90 changes: 90 additions & 0 deletions benchmarks/Performance.Common/StockData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Text;

namespace Performance.Common;

public sealed class StockData
{
private static readonly byte[] FilterPathStartResponseBytes = Encoding.UTF8.GetBytes("{\"items\":[");
private static readonly byte[] FilterPathItemResponseBytes = Encoding.UTF8.GetBytes("{\"create\":{\"status\":201}}");
private static readonly byte Comma = (byte)',';
private static readonly byte[] EndResponseBytes = Encoding.UTF8.GetBytes("]}");

public DateTime Date { get; init; }
public double Open { get; init; }
public double Close { get; init; }
public double High { get; init; }
public double Low { get; init; }
public int Volume { get; init; }
public string? Symbol { get; init; }

public static StockData ParseFromFileLine(string dataLine)
{
var columns = dataLine.Split(',', StringSplitOptions.TrimEntries);

var date = DateTime.Parse(columns[0]);

_ = float.TryParse(columns[1], out var open);
_ = float.TryParse(columns[1], out var high);
_ = float.TryParse(columns[1], out var low);
_ = float.TryParse(columns[1], out var close);

var volume = int.Parse(columns[5]);
var symbol = columns[6];

return new StockData
{
Date = date,
Open = open,
Close = close,
High = high,
Low = low,
Volume = volume,
Symbol = symbol
};
}

public static StockData[] CreateSampleData(long count)
{
var data = new StockData[100_000];

for (var i = 0; i < count; i++)
{
data[i] = ParseFromFileLine("2013-02-08,15.07,15.12,14.63,14.75,8407500,AAL");
}

return data;
}

public static byte[] CreateSampleDataSuccessWithFilterPathResponseBytes(long count)
{
var responseBytesSize = ((FilterPathItemResponseBytes.Length + 1) * count) - 1 + FilterPathStartResponseBytes.Length + EndResponseBytes.Length;
var responseBytes = new byte[responseBytesSize];

FilterPathStartResponseBytes.CopyTo(responseBytes, 0);

var offset = FilterPathStartResponseBytes.Length;

for (var i = 0; i < count; i++)
{
FilterPathItemResponseBytes.CopyTo(responseBytes, offset);

if (i < count - 1)
{
responseBytes[offset + FilterPathItemResponseBytes.Length] = Comma;
offset += (FilterPathItemResponseBytes.Length + 1);
}
else
{
offset += (FilterPathItemResponseBytes.Length);
}
}

EndResponseBytes.CopyTo(responseBytes, offset);

return responseBytes;
}
}
Loading

0 comments on commit 66b07e7

Please sign in to comment.