Skip to content

Commit

Permalink
Update Elastic.Transport to 0.5.2 (#8405)
Browse files Browse the repository at this point in the history
  • Loading branch information
flobernd authored and github-actions[bot] committed Nov 14, 2024
1 parent 7a9d63e commit 3e09fd9
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public ElasticsearchClientProductRegistration(Type markerType) : base(markerType

public override string ServiceIdentifier => "esv";

public override string DefaultMimeType => null; // Prevent base 'ElasticsearchProductRegistration' from sending the compatibility header
public override string? DefaultContentType => null; // Prevent base 'ElasticsearchProductRegistration' from sending the compatibility header

public override MetaHeaderProvider MetaHeaderProvider => _metaHeaderProvider;

/// <summary>
/// Elastic.Clients.Elasticsearch handles 404 in its <see cref="ElasticsearchResponse.IsValidResponse" />, we do not want the low level client throwing
/// exceptions
/// when <see cref="ITransportConfiguration.ThrowExceptions" /> is enabled for 404's. The client is in charge of
/// when <see cref="IRequestConfiguration.ThrowExceptions" /> is enabled for 404's. The client is in charge of
/// composing paths
/// so a 404 never signals a wrong URL but a missing entity.
/// </summary>
Expand Down Expand Up @@ -77,7 +77,7 @@ public class ApiVersionMetaHeaderProducer : MetaHeaderProducer

public override string HeaderName => "Elastic-Api-Version";

public override string ProduceHeaderValue(RequestData requestData) => _apiVersion;
public override string ProduceHeaderValue(RequestData requestData, bool isAsync) => _apiVersion;

public ApiVersionMetaHeaderProducer(VersionInfo version)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<Nullable>annotations</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Elastic.Transport" Version="0.4.26" />
<PackageReference Include="Elastic.Transport" Version="0.5.2" />
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="Tests" Key="$(ExposedPublicKey)" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public ElasticsearchClientProductRegistration(Type markerType) : base(markerType
/// <summary>
/// Elastic.Clients.Elasticsearch handles 404 in its <see cref="ElasticsearchResponse.IsValidResponse" />, we do not want the low level client throwing
/// exceptions
/// when <see cref="ITransportConfiguration.ThrowExceptions" /> is enabled for 404's. The client is in charge of
/// when <see cref="IRequestConfiguration.ThrowExceptions" /> is enabled for 404's. The client is in charge of
/// composing paths
/// so a 404 never signals a wrong URL but a missing entity.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<Nullable>annotations</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Elastic.Transport" Version="0.4.26" />
<PackageReference Include="Elastic.Transport" Version="0.5.2" />
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="Tests" Key="$(ExposedPublicKey)" />
Expand Down
20 changes: 14 additions & 6 deletions src/Elastic.Clients.Elasticsearch/_Shared/Api/BulkRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ namespace Elastic.Clients.Elasticsearch;

public partial class BulkRequest : IStreamSerializable
{
internal Request Self => this;
private static readonly IRequestConfiguration RequestConfigSingleton = new RequestConfiguration
{
Accept = "application/json",
ContentType = "application/x-ndjson"
};

public BulkOperationsCollection Operations { get; set; }
internal Request Self => this;

internal override string ContentType => "application/x-ndjson";
protected internal override IRequestConfiguration RequestConfig => RequestConfigSingleton;

internal override string Accept => "application/json";
public BulkOperationsCollection Operations { get; set; }

public void Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
{
Expand Down Expand Up @@ -87,9 +91,13 @@ public async Task SerializeAsync(Stream stream, IElasticsearchClientSettings set

public sealed partial class BulkRequestDescriptor : IStreamSerializable
{
internal override string ContentType => "application/x-ndjson";
private static readonly IRequestConfiguration RequestConfigSingleton = new RequestConfiguration
{
Accept = "application/json",
ContentType = "application/x-ndjson"
};

internal override string Accept => "application/json";
protected internal override IRequestConfiguration RequestConfig => RequestConfigSingleton;

private readonly BulkOperationsCollection _operations = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
#if ELASTICSEARCH_SERVERLESS
namespace Elastic.Clients.Elasticsearch.Serverless.Esql;
#else

namespace Elastic.Clients.Elasticsearch.Esql;
#endif

internal sealed class EsqlResponseBuilder : CustomResponseBuilder
internal sealed class EsqlResponseBuilder : TypedResponseBuilder<EsqlQueryResponse>
{
public override object DeserializeResponse(Serializer serializer, ApiCallDetails response, Stream stream)
protected override EsqlQueryResponse? Build(ApiCallDetails apiCallDetails, RequestData requestData,
Stream responseStream,
string contentType, long contentLength)
{
var bytes = stream switch
var bytes = responseStream switch
{
MemoryStream ms => ms.ToArray(),
_ => BytesFromStream(stream)
_ => BytesFromStream(responseStream)
};

return new EsqlQueryResponse { Data = bytes };
Expand All @@ -37,13 +38,14 @@ static byte[] BytesFromStream(Stream stream)
}
}

public override async Task<object> DeserializeResponseAsync(Serializer serializer, ApiCallDetails response, Stream stream,
CancellationToken ctx = new CancellationToken())
protected override async Task<EsqlQueryResponse?> BuildAsync(ApiCallDetails apiCallDetails, RequestData requestData,
Stream responseStream,
string contentType, long contentLength, CancellationToken cancellationToken = default)
{
var bytes = stream switch
var bytes = responseStream switch
{
MemoryStream ms => ms.ToArray(),
_ => await BytesFromStreamAsync(stream, ctx).ConfigureAwait(false)
_ => await BytesFromStreamAsync(responseStream, cancellationToken).ConfigureAwait(false)
};

return new EsqlQueryResponse { Data = bytes };
Expand All @@ -62,10 +64,3 @@ static async Task<byte[]> BytesFromStreamAsync(Stream stream, CancellationToken
}
}
}

public sealed partial class EsqlQueryRequestParameters
{
private static readonly EsqlResponseBuilder ResponseBuilder = new();

public EsqlQueryRequestParameters() => CustomResponseBuilder = ResponseBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#if ELASTICSEARCH_SERVERLESS
namespace Elastic.Clients.Elasticsearch.Serverless;
#else

namespace Elastic.Clients.Elasticsearch;
#endif

Expand Down Expand Up @@ -165,14 +164,12 @@ private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestPar

ValueTask<TResponse> SendRequest()
{
var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);

return isAsync
? new ValueTask<TResponse>(_transport
.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken))
: new ValueTask<TResponse>(_transport
.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData));
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, in openTelemetryData, request.RequestConfig, cancellationToken))
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, in openTelemetryData, request.RequestConfig));
}

async ValueTask<TResponse> SendRequestWithProductCheck()
Expand All @@ -198,73 +195,59 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
{
// Attach product check header

var hadRequestConfig = false;
HeadersList? originalHeaders = null;

if (request.RequestParameters.RequestConfiguration is null)
request.RequestParameters.RequestConfiguration = new RequestConfiguration();
else
{
originalHeaders = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse;
hadRequestConfig = true;
}

request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = request.RequestParameters.RequestConfiguration.ResponseHeadersToParse.Count == 0
? new HeadersList("x-elastic-product")
: new HeadersList(request.RequestParameters.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product");
// TODO: The copy constructor should accept null values
var requestConfig = (request.RequestConfig is null)
? new RequestConfiguration()
{
ResponseHeadersToParse = new HeadersList("x-elastic-product")
}
: new RequestConfiguration(request.RequestConfig)
{
ResponseHeadersToParse = (request.RequestConfig.ResponseHeadersToParse is { Count: > 0 })
? new HeadersList(request.RequestConfig.ResponseHeadersToParse, "x-elastic-product")
: new HeadersList("x-elastic-product")
};

// Send request

var (resolvedUrl, _, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request, forceConfiguration);
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryData = PrepareOpenTelemetryData<TRequest, TRequestParameters>(request, resolvedRouteValues);

TResponse response;

if (isAsync)
{
response = await _transport
.RequestAsync<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData, cancellationToken)
.RequestAsync<TResponse>(endpointPath, postData, in openTelemetryData, requestConfig, cancellationToken)
.ConfigureAwait(false);
}
else
{
response = _transport
.Request<TResponse>(request.HttpMethod, resolvedUrl, postData, request.RequestParameters, in openTelemetryData);
response = _transport.Request<TResponse>(endpointPath, postData, in openTelemetryData, requestConfig);
}

// Evaluate product check result

var hasSuccessStatusCode = response.ApiCallDetails.HttpStatusCode is >= 200 and <= 299;
if (hasSuccessStatusCode)
{
var productCheckSucceeded = response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) &&
values.FirstOrDefault(x => x.Equals("Elasticsearch", StringComparison.Ordinal)) is not null;

_productCheckStatus = productCheckSucceeded
? (int)ProductCheckStatus.Succeeded
: (int)ProductCheckStatus.Failed;

if (_productCheckStatus == (int)ProductCheckStatus.Failed)
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
}

if (request.RequestParameters.RequestConfiguration is null)
return response;

// Reset request configuration

if (!hadRequestConfig)
request.RequestParameters.RequestConfiguration = null;
else if (originalHeaders is { Count: > 0 })
request.RequestParameters.RequestConfiguration.ResponseHeadersToParse = originalHeaders.Value;

if (!hasSuccessStatusCode)
{
// The product check is unreliable for non success status codes.
// We have to re-try on the next request.
_productCheckStatus = (int)ProductCheckStatus.NotChecked;

return response;
}

var productCheckSucceeded = response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) &&
values.FirstOrDefault(x => x.Equals("Elasticsearch", StringComparison.Ordinal)) is not null;

_productCheckStatus = productCheckSucceeded
? (int)ProductCheckStatus.Succeeded
: (int)ProductCheckStatus.Failed;

if (_productCheckStatus == (int)ProductCheckStatus.Failed)
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);

return response;
}
}
Expand Down Expand Up @@ -304,69 +287,21 @@ private static OpenTelemetryData PrepareOpenTelemetryData<TRequest, TRequestPara
return openTelemetryData;
}

private (string resolvedUrl, string urlTemplate, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request,
Action<IRequestConfiguration>? forceConfiguration)
private (EndpointPath endpointPath, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
request.ThrowIfNull(nameof(request), "A request is required.");

if (forceConfiguration is not null)
ForceConfiguration(request, forceConfiguration);

if (request.ContentType is not null)
ForceContentType<TRequest, TRequestParameters>(request, request.ContentType);

if (request.Accept is not null)
ForceAccept<TRequest, TRequestParameters>(request, request.Accept);

var (resolvedUrl, urlTemplate, routeValues) = request.GetUrl(ElasticsearchClientSettings);
var (resolvedUrl, _, routeValues) = request.GetUrl(ElasticsearchClientSettings);
var pathAndQuery = request.RequestParameters.CreatePathWithQueryStrings(resolvedUrl, ElasticsearchClientSettings);

var postData =
request.HttpMethod == HttpMethod.GET ||
request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody
? null
: PostData.Serializable(request);

return (resolvedUrl, urlTemplate, routeValues, postData);
}

private static void ForceConfiguration<TRequestParameters>(Request<TRequestParameters> request, Action<IRequestConfiguration> forceConfiguration)
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
forceConfiguration(configuration);
request.RequestParameters.RequestConfiguration = configuration;
}

private static void ForceContentType<TRequest, TRequestParameters>(TRequest request, string contentType)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
configuration.Accept = contentType;
configuration.ContentType = contentType;
request.RequestParameters.RequestConfiguration = configuration;
}

private static void ForceAccept<TRequest, TRequestParameters>(TRequest request, string acceptType)
where TRequest : Request<TRequestParameters>
where TRequestParameters : RequestParameters, new()
{
var configuration = request.RequestParameters.RequestConfiguration ?? new RequestConfiguration();
configuration.Accept = acceptType;
request.RequestParameters.RequestConfiguration = configuration;
}

internal static void ForceJson(IRequestConfiguration requestConfiguration)
{
requestConfiguration.Accept = RequestData.DefaultMimeType;
requestConfiguration.ContentType = RequestData.DefaultMimeType;
}

internal static void ForceTextPlain(IRequestConfiguration requestConfiguration)
{
requestConfiguration.Accept = RequestData.MimeTypeTextPlain;
requestConfiguration.ContentType = RequestData.MimeTypeTextPlain;
return (new EndpointPath(request.HttpMethod, pathAndQuery), routeValues, postData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
using System.Reflection;

#if ELASTICSEARCH_SERVERLESS
using Elastic.Clients.Elasticsearch.Serverless.Esql;
using Elastic.Clients.Elasticsearch.Serverless.Fluent;
#else
using Elastic.Clients.Elasticsearch.Esql;
using Elastic.Clients.Elasticsearch.Fluent;
#endif

Expand Down Expand Up @@ -104,9 +106,9 @@ public ElasticsearchClientSettings(
/// <inheritdoc cref="IElasticsearchClientSettings" />
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class
ElasticsearchClientSettingsBase<TConnectionSettings> : ConnectionConfigurationBase<TConnectionSettings>,
IElasticsearchClientSettings
public abstract class ElasticsearchClientSettingsBase<TConnectionSettings> :
ConnectionConfigurationBase<TConnectionSettings>,
IElasticsearchClientSettings
where TConnectionSettings : ElasticsearchClientSettingsBase<TConnectionSettings>, IElasticsearchClientSettings
{
private readonly FluentDictionary<Type, string> _defaultIndices;
Expand Down Expand Up @@ -381,19 +383,21 @@ public ConnectionConfiguration(NodePool nodePool, IRequestInvoker requestInvoker
/// <inheritdoc cref="TransportClientConfigurationValues" />
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public abstract class
ConnectionConfigurationBase<TConnectionConfiguration> : TransportConfigurationBase<TConnectionConfiguration>,
TransportClientConfigurationValues
where TConnectionConfiguration : ConnectionConfigurationBase<TConnectionConfiguration>,
public abstract class ConnectionConfigurationBase<TConnectionConfiguration> :
TransportConfigurationDescriptorBase<TConnectionConfiguration>,
TransportClientConfigurationValues
where TConnectionConfiguration : ConnectionConfigurationBase<TConnectionConfiguration>, TransportClientConfigurationValues
{
private bool _includeServerStackTraceOnError;

protected ConnectionConfigurationBase(NodePool nodePool, IRequestInvoker requestInvoker,
Serializer? serializer,
ProductRegistration registration = null)
: base(nodePool, requestInvoker, serializer, registration ?? new ElasticsearchProductRegistration(typeof(ElasticsearchClient))) =>
UserAgent(ConnectionConfiguration.DefaultUserAgent);
: base(nodePool, requestInvoker, serializer, registration ?? new ElasticsearchProductRegistration(typeof(ElasticsearchClient)))
{
UserAgent(ConnectionConfiguration.DefaultUserAgent);
ResponseBuilder(new EsqlResponseBuilder());
}

bool TransportClientConfigurationValues.IncludeServerStackTraceOnError => _includeServerStackTraceOnError;

Expand Down
Loading

0 comments on commit 3e09fd9

Please sign in to comment.