Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix against TimeSpan.Zero for max lifetime #72

Merged
merged 9 commits into from
Sep 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ public void BulkAll()
channel.TryWriteMany(_data);
channel.TryComplete();

_waitHandle.WaitOne();
// _waitHandle.WaitOne();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean for this to remain uncommented?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted changes to the benchmark project, wasn't needed in this PR 👍

}
}
23 changes: 1 addition & 22 deletions benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,8 @@
using System.Globalization;
using Elastic.Ingest.Elasticsearch.Benchmarks.Benchmarks;

#if DEBUG
stevejgordon marked this conversation as resolved.
Show resolved Hide resolved
// MANUALLY RUN A BENCHMARKED METHOD DURING DEBUGGING

//var bm = new BulkIngestion();
//bm.Setup();
//await bm.BulkAllAsync();
//Console.WriteLine("DONE");

var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
bm.Setup();
await bm.WriteToStreamAsync();

var length = bm.MemoryStream.Length;

bm.MemoryStream.Position = 0;
var sr = new StreamReader(bm.MemoryStream);
var json = sr.ReadToEnd();

Console.ReadKey();
#else
var config = ManualConfig.Create(DefaultConfig.Instance);
config.SummaryStyle = new SummaryStyle(CultureInfo.CurrentCulture, true, BenchmarkDotNet.Columns.SizeUnit.B, null!, ratioStyle: BenchmarkDotNet.Columns.RatioStyle.Percentage);
config.AddDiagnoser(MemoryDiagnoser.Default);

BenchmarkRunner.Run<BulkRequestCreationWithTemplatedIndexNameBenchmarks>(config);
#endif
BenchmarkRunner.Run<BulkIngestionBenchmarks>(config);
30 changes: 16 additions & 14 deletions examples/Elastic.Channels.Continuous/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Threading.Channels;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;

Expand All @@ -13,30 +14,31 @@

var options = new NoopBufferedChannel.NoopChannelOptions
{
BufferOptions = new BufferOptions()
//TrackConcurrency = true,
BufferOptions = new BufferOptions
{
OutboundBufferMaxSize = 10_000,
InboundBufferMaxSize = 10_000_000,
OutboundBufferMaxLifetime = TimeSpan.Zero,
InboundBufferMaxSize = 1_000_000,
OutboundBufferMaxSize = 1_000_000
},
ExportBufferCallback = () => Console.Write("."),
ExportExceptionCallback = e => Console.Write("!"),
PublishToInboundChannelFailureCallback = () => Console.Write("I"),
PublishToOutboundChannelFailureCallback = () => Console.Write("O"),
ExportExceptionCallback = e => Console.Write("!")

};
Console.WriteLine("2");
var channel = new DiagnosticsBufferedChannel(options);
Console.WriteLine($"Begin: ({channel.OutboundStarted}) {channel.MaxConcurrency} {channel.BatchExportOperations} -> {channel.InflightEvents}");
await Parallel.ForEachAsync(Enumerable.Range(0, int.MaxValue), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }, async (i, ctx) =>
{
var e = new NoopBufferedChannel.NoopEvent { Id = i };
var written = false;
//Console.Write('.');
var ready = await channel.WaitToWriteAsync(ctx);
if (ready) written = channel.TryWrite(e);
if (!written)
if (await channel.WaitToWriteAsync(e))
{
Console.WriteLine();

}

if (i % 10_000 == 0)
{
Console.Clear();
Console.WriteLine(channel);
Console.WriteLine(i);
Environment.Exit(1);
}
});
11 changes: 10 additions & 1 deletion src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ public class BufferOptions
/// </summary>
public int OutboundBufferMaxSize { get; set; } = 1_000;

private TimeSpan _outboundBufferMaxLifetime = TimeSpan.FromSeconds(5);
private readonly TimeSpan _outboundBufferMinLifetime = TimeSpan.FromSeconds(1);


/// <summary>
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>.
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> regardless of it's current size
/// <para>Defaults to <c>5 seconds</c></para>
/// <para>Any value less than <c>1 second</c> will be rounded back up to <c>1 second</c></para>
/// </summary>
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
public TimeSpan OutboundBufferMaxLifetime
{
get => _outboundBufferMaxLifetime;
set => _outboundBufferMaxLifetime = value >= _outboundBufferMinLifetime ? value : _outboundBufferMaxLifetime;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring we can never set max life time to any thing less then 1s.

Otherwise each publish will cause an export, akin to setting OutboundBufferMaxSize to 1.

}

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
Expand Down
Loading