Skip to content

Commit

Permalink
fix #1 include better integration tests for IndexChannel and DataStre…
Browse files Browse the repository at this point in the history
…amChannel (#2)

* fix #1 include better integration tests for IndexChannel and DataStreamChannel

* include shared Elastic DotSettings file

* update license headers

* remove unwanted linefeed in shared settings to apply standard license header
  • Loading branch information
Mpdreamz authored Jan 23, 2023
1 parent 67c2f62 commit 8e13878
Show file tree
Hide file tree
Showing 39 changed files with 761 additions and 91 deletions.
Empty file modified .github/add-license-headers.sh
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion elastic-ingest-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.Transport",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.Tests", "tests\Elastic.Ingest.Tests\Elastic.Ingest.Tests.csproj", "{F6A52569-8F07-4E1E-935E-7AE9D664EF77}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.IntegrationTests", "tests\Elastic.Ingest.IntegrationTests\Elastic.Ingest.IntegrationTests.csproj", "{E09CEEE6-DD3A-4603-890D-6364C7E4639F}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.Elasticsearch.IntegrationTests", "tests\Elastic.Ingest.Elasticsearch.IntegrationTests\Elastic.Ingest.Elasticsearch.IntegrationTests.csproj", "{E09CEEE6-DD3A-4603-890D-6364C7E4639F}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "scripts", "build\scripts\scripts.fsproj", "{8AFDD165-F5B1-4555-97E3-A376B30236D3}"
EndProject
Expand Down
430 changes: 430 additions & 0 deletions elastic-ingest-dotnet.sln.DotSettings

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Apm/ApmChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using Elastic.Ingest.Apm.Model;
using Elastic.Ingest.Transport;
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Apm/Helpers/Time.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;

namespace Elastic.Ingest.Apm.Helpers
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Apm/Model/IngestResponse.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Collections.Generic;
using System.Text.Json.Serialization;
using Elastic.Transport;
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Apm/Model/Transaction.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Collections.Generic;
using System.Text.Json.Serialization;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Collections.Generic;
using Elastic.Ingest.Elasticsearch.Serialization;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Transport;

namespace Elastic.Ingest.Elasticsearch.DataStreams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Linq;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Ingest.Elasticsearch.Serialization;

namespace Elastic.Ingest.Elasticsearch
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.IO;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
Expand Down
5 changes: 4 additions & 1 deletion src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using Elastic.Ingest.Elasticsearch.Serialization;

Expand All @@ -12,7 +15,7 @@ protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event)
var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value);

var index = string.Format(Options.Index, indexTime);
var index = string.Format(Options.IndexFormat, indexTime);
var id = Options.BulkOperationIdLookup?.Invoke(@event);
return
!string.IsNullOrWhiteSpace(id)
Expand Down
17 changes: 16 additions & 1 deletion src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using Elastic.Transport;

Expand All @@ -10,17 +13,29 @@ public IndexChannelOptions(HttpTransport transport) : base(transport) { }
/// <summary>
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter
/// 0.
/// <para> Defaults to "dotnet-{0:yyyy.MM.dd}"</para>
/// <para> If no {0} parameter is defined the index name is effectively fixed</para>
/// </summary>
public string Index { get; set; } = "dotnet-{0:yyyy.MM.dd}";
public string IndexFormat { get; set; } = "dotnet-{0:yyyy.MM.dd}";

/// <summary>
/// Gets or sets the offset to use for the index <c>DateTimeOffset</c>. Default value is null, which uses the system local
/// offset. Use "00:00" for UTC.
/// </summary>
public TimeSpan? IndexOffset { get; set; }

/// <summary>
/// Provide a per document <c>DateTimeOffset</c> to be used as the date passed as parameter 0 to <see cref="IndexFormat"/>
/// </summary>
public Func<TEvent, DateTimeOffset?> TimestampLookup { get; set; } = null!;

/// <summary>
/// If the document provides an Id this allows you to set a per document `_id`.
/// <para>If an `_id` is defined an `_index` bulk operation will be created.</para>
/// <para>Otherwise (the default) `_create` bulk operation will be issued for the document.</para>
/// <para>Read more about bulk operations here:</para>
/// <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-request-body</para>
/// </summary>
public Func<TEvent, string> BulkOperationIdLookup { get; set; } = null!;
}
}
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/IsExternalInit.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
// ReSharper disable once CheckNamespace
namespace System.Runtime.CompilerServices
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Text.Json;
using System.Text.Json.Serialization;
Expand Down
5 changes: 4 additions & 1 deletion src/Elastic.Ingest.OpenTelemetry/CustomActivityExporter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Diagnostics;
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Diagnostics;
using OpenTelemetry;

namespace Elastic.Ingest.OpenTelemetry
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest.Transport/TransportChannelOptionsBase.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using Elastic.Transport;

namespace Elastic.Ingest.Transport
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest/ChannelBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Ingest/ChannelOptionsBase.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.IO;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Elasticsearch.Managed;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Transport;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Elastic.Ingest.Elasticsearch.IntegrationTests
{
public class DataStreamIngestionTests : IntegrationTestBase
{
public DataStreamIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output)
{
}

[Fact]
public async Task EnsureDocumentsEndUpInDataStream()
{
// logs-* will use data streams by default in Elasticsearch.
var targetDataStream = new DataStreamName("logs", "dotnet");
var slim = new CountdownEvent(1);
var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
{
DataStream = targetDataStream,
BufferOptions = new ElasticsearchBufferOptions<TimeSeriesDocument>
{
WaitHandle = slim, MaxConsumerBufferSize = 1,
}
};
var ecsChannel = new DataStreamChannel<TimeSeriesDocument>(options);

var dataStream =
await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()));
dataStream.DataStreams.Should().BeNullOrEmpty();

ecsChannel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" });
if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception("ecs document was not persisted within 10 seconds");

var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString());
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<TimeSeriesDocument>(s => s.Indices(targetDataStream.ToString()));
searchResult.Total.Should().Be(1);

var storedDocument = searchResult.Documents.First();
storedDocument.Message.Should().Be("hello-world");

var hit = searchResult.Hits.First();
hit.Index.Should().StartWith($".ds-{targetDataStream}-");

// the following throws in the 8.0.4 version of the client
// The JSON value could not be converted to Elastic.Clients.Elasticsearch.HealthStatus. Path: $.data_stre...
// await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString())
var getDataStream =
await Client.Transport.RequestAsync<StringResponse>(HttpMethod.GET, $"/_data_stream/{targetDataStream}");

getDataStream.ApiCallDetails.HttpStatusCode.Should()
.Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation);

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Elasticsearch.Managed;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Transport;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Elastic.Ingest.Elasticsearch.IntegrationTests
{
public class IndexIngestionTests : IntegrationTestBase
{
public IndexIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output)
{
}

[Fact]
public async Task EnsureDocumentsEndUpInIndex()
{
var indexPrefix = "catalog-data-";
var slim = new CountdownEvent(1);
var options = new IndexChannelOptions<CatalogDocument>(Client.Transport)
{
IndexFormat = indexPrefix + "{0:yyyy.MM.dd}",
BulkOperationIdLookup = c => c.Id,
TimestampLookup = c => c.Created,
BufferOptions = new ElasticsearchBufferOptions<CatalogDocument>
{
WaitHandle = slim, MaxConsumerBufferSize = 1,
}
};
var ecsChannel = new IndexChannel<CatalogDocument>(options);

var date = DateTimeOffset.Now;
var indexName = string.Format(options.IndexFormat, date);

var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName));
index.Indices.Should().BeNullOrEmpty();

ecsChannel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" });
if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception("ecs document was not persisted within 10 seconds");

var refreshResult = await Client.Indices.RefreshAsync(indexName);
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<CatalogDocument>(s => s.Indices(indexName));
searchResult.Total.Should().Be(1);

var storedDocument = searchResult.Documents.First();
storedDocument.Id.Should().Be("hello-world");
storedDocument.Title.Should().Be("Hello World!");

var hit = searchResult.Hits.First();
hit.Index.Should().Be(indexName);

index = await Client.Indices.GetAsync(new GetIndexRequest(indexName));
index.Indices.Should().NotBeNullOrEmpty();

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Linq;
using Elastic.Clients.Elasticsearch;
using Elastic.Elasticsearch.Xunit;
using Elastic.Transport;
using Xunit;
using Xunit.Abstractions;

[assembly: TestFramework("Elastic.Elasticsearch.Xunit.Sdk.ElasticTestFramework", "Elastic.Elasticsearch.Xunit")]

namespace Elastic.Ingest.Elasticsearch.IntegrationTests
{
/// <summary> Declare our cluster that we want to inject into our test classes </summary>
public class IngestionCluster : XunitClusterBase
{
public IngestionCluster() : base(new XunitClusterConfiguration("8.3.1") { StartingPortNumber = 9202 }) { }

public ElasticsearchClient CreateClient(ITestOutputHelper output) =>
this.GetOrAddClient(c =>
{
var hostName = (System.Diagnostics.Process.GetProcessesByName("mitmproxy").Any()
? "ipv4.fiddler"
: "localhost");
var nodes = NodesUris(hostName);
var connectionPool = new StaticNodePool(nodes);
var settings = new ElasticsearchClientSettings(connectionPool)
.Proxy(new Uri("http://ipv4.fiddler:8080"), (string)null, (string)null)
.OnRequestCompleted(d =>
{
try { output.WriteLine(d.DebugInformation);}
catch { }
})
.EnableDebugMode();
return new ElasticsearchClient(settings);
});
}
}
Loading

0 comments on commit 8e13878

Please sign in to comment.