Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(elasticsearch sink): Support Amazon OpenSearch Serverless #21676

Merged
merged 27 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/16252_aws_opensearch_serverless.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The Elasticsearch sink can now write to Amazon OpenSearch Serverless.

authors: handlerbot AvihaiSam
14 changes: 12 additions & 2 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aws_config::{
};
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_sigv4::{
http_request::{SignableBody, SignableRequest, SigningSettings},
http_request::{PayloadChecksumKind, SignableBody, SignableRequest, SigningSettings},
sign::v4,
};
use aws_smithy_async::rt::sleep::TokioSleep;
Expand Down Expand Up @@ -255,6 +255,7 @@ pub async fn sign_request(
request: &mut http::Request<Bytes>,
credentials_provider: &SharedCredentialsProvider,
region: &Option<Region>,
payload_checksum_sha256: bool,
) -> crate::Result<()> {
let headers = request
.headers()
Expand All @@ -276,12 +277,21 @@ pub async fn sign_request(

let credentials = credentials_provider.provide_credentials().await?;
let identity = Identity::new(credentials, None);

let mut signing_settings = SigningSettings::default();

// Include the x-amz-content-sha256 header when calculating the AWS v4 signature;
// this is required by some AWS services, e.g. S3 and OpenSearch Serverless
if payload_checksum_sha256 {
signing_settings.payload_checksum_kind = PayloadChecksumKind::XAmzSha256;
}

let signing_params_builder = v4::SigningParams::builder()
.identity(&identity)
.region(region.as_ref().map(|r| r.as_ref()).unwrap_or(""))
.name(service_name)
.time(SystemTime::now())
.settings(SigningSettings::default());
.settings(signing_settings);

let signing_params = signing_params_builder
.build()
Expand Down
82 changes: 64 additions & 18 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::{
http::{HttpClient, MaybeAuth},
sinks::{
elasticsearch::{
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig,
OpenSearchServiceType, ParseError,
},
util::auth::Auth,
util::{http::RequestConfig, UriSerde},
Expand All @@ -31,6 +32,7 @@ pub struct ElasticsearchCommon {
pub base_url: String,
pub bulk_uri: Uri,
pub auth: Option<Auth>,
pub service_type: OpenSearchServiceType,
pub mode: ElasticsearchCommonMode,
pub request_builder: ElasticsearchRequestBuilder,
pub tls_settings: TlsSettings,
Expand Down Expand Up @@ -87,6 +89,13 @@ impl ElasticsearchCommon {
None => None,
};

if config.opensearch_service_type == OpenSearchServiceType::Serverless {
match &config.auth {
Some(ElasticsearchAuthConfig::Aws(_)) => (),
_ => return Err(ParseError::OpenSearchServerlessRequiresAwsAuth.into()),
}
}

let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

let mode = config.common_mode()?;
Expand Down Expand Up @@ -143,15 +152,32 @@ impl ElasticsearchCommon {
metric_config.metric_tag_values,
);

let version = if let Some(version) = *version {
let service_type = config.opensearch_service_type;

let version = if service_type == OpenSearchServiceType::Serverless {
if config.api_version != ElasticsearchApiVersion::Auto {
return Err(ParseError::ServerlessElasticsearchApiVersionMustBeAuto.into());
}
// Amazon OpenSearch Serverless does not support the cluster-version API; hardcode
// well-known API version
8
handlerbot marked this conversation as resolved.
Show resolved Hide resolved
} else if let Some(version) = *version {
version
} else {
let ver = match config.api_version {
ElasticsearchApiVersion::V6 => 6,
ElasticsearchApiVersion::V7 => 7,
ElasticsearchApiVersion::V8 => 8,
ElasticsearchApiVersion::Auto => {
match get_version(&base_url, &auth, &request, &tls_settings, proxy_config).await
match get_version(
&base_url,
&auth,
&service_type,
&request,
&tls_settings,
proxy_config,
)
.await
{
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
Expand Down Expand Up @@ -202,6 +228,7 @@ impl ElasticsearchCommon {

Ok(Self {
auth,
service_type,
base_url,
bulk_uri,
mode,
Expand Down Expand Up @@ -250,34 +277,52 @@ impl ElasticsearchCommon {
}

pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
match get(
&self.base_url,
&self.auth,
&self.request,
client,
"/_cluster/health",
)
.await?
.status()
{
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
if self.service_type == OpenSearchServiceType::Serverless {
warn!(message = "Amazon OpenSearch Serverless does not support healthchecks. Skipping healthcheck...");
Ok(())
} else {
match get(
&self.base_url,
&self.auth,
&self.service_type,
&self.request,
client,
"/_cluster/health",
)
.await?
.status()
{
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}
}
}

#[cfg(feature = "aws-core")]
pub async fn sign_request(
service_type: &OpenSearchServiceType,
request: &mut http::Request<Bytes>,
credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
region: &Option<aws_types::region::Region>,
) -> crate::Result<()> {
crate::aws::sign_request("es", request, credentials_provider, region).await
// Amazon OpenSearch Serverless requires the x-amz-content-sha256 header when calculating
// the AWS v4 signature:
// https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-clients.html#serverless-signing
crate::aws::sign_request(
service_type.as_str(),
request,
credentials_provider,
region,
*service_type == OpenSearchServiceType::Serverless,
)
.await
}

async fn get_version(
base_url: &str,
auth: &Option<Auth>,
service_type: &OpenSearchServiceType,
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
Expand All @@ -292,7 +337,7 @@ async fn get_version(
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(base_url, auth, request, client, "/")
let response = get(base_url, auth, service_type, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

Expand All @@ -316,6 +361,7 @@ async fn get_version(
async fn get(
base_url: &str,
auth: &Option<Auth>,
service_type: &OpenSearchServiceType,
request: &RequestConfig,
client: HttpClient,
path: &str,
Expand All @@ -338,7 +384,7 @@ async fn get(
region,
} => {
let region = region.clone();
sign_request(&mut request, provider, &Some(region)).await?;
sign_request(service_type, &mut request, provider, &Some(region)).await?;
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,34 @@ use vrl::value::Kind;
/// The field name for the timestamp required by data stream mode
pub const DATA_STREAM_TIMESTAMP_KEY: &str = "@timestamp";

/// The Amazon OpenSearch service type, either managed or serverless; primarily, selects the
/// correct AWS service to use when calculating the AWS v4 signature + disables features
/// unsupported by serverless: Elasticsearch API version autodetection, health checks
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum OpenSearchServiceType {
/// Elasticsearch or OpenSearch Managed domain
Managed,
/// OpenSearch Serverless collection
Serverless,
}

impl OpenSearchServiceType {
pub const fn as_str(&self) -> &'static str {
match self {
OpenSearchServiceType::Managed => "es",
OpenSearchServiceType::Serverless => "aoss",
}
}
}

impl Default for OpenSearchServiceType {
fn default() -> Self {
Self::Managed
}
}

/// Configuration for the `elasticsearch` sink.
#[configurable_component(sink("elasticsearch", "Index observability events in Elasticsearch."))]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -75,6 +103,8 @@ pub struct ElasticsearchConfig {
pub doc_type: String,

/// The API version of Elasticsearch.
///
/// Amazon OpenSearch Serverless requires this option to be set to `auto` (the default).
#[serde(default)]
#[configurable(derived)]
pub api_version: ElasticsearchApiVersion,
Expand Down Expand Up @@ -152,6 +182,10 @@ pub struct ElasticsearchConfig {
#[cfg(feature = "aws-core")]
pub aws: Option<crate::aws::RegionOrEndpoint>,

/// Amazon OpenSearch service type
#[serde(default)]
pub opensearch_service_type: OpenSearchServiceType,

#[serde(default)]
#[configurable(derived)]
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -214,6 +248,7 @@ impl Default for ElasticsearchConfig {
query: None,
#[cfg(feature = "aws-core")]
aws: None,
opensearch_service_type: Default::default(),
tls: None,
endpoint_health: None,
bulk: BulkConfig::default(), // the default mode is Bulk
Expand Down
10 changes: 9 additions & 1 deletion src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ impl ElasticsearchCommon {
Auth::Aws {
credentials_provider: provider,
region,
} => sign_request(&mut request, provider, &Some(region.clone())).await?,
} => {
sign_request(
&OpenSearchServiceType::Managed,
&mut request,
provider,
&Some(region.clone()),
)
.await?
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::{
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))]
#[configurable(metadata(
docs::enum_tag_description = "The authentication strategy to use.\n\nAmazon OpenSearch Serverless requires this option to be set to `aws`."
))]
pub enum ElasticsearchAuthConfig {
/// HTTP Basic Authentication.
Basic {
Expand Down Expand Up @@ -319,4 +321,8 @@ pub enum ParseError {
ExternalVersioningWithoutDocumentID,
#[snafu(display("Your version field will be ignored because you use internal versioning"))]
ExternalVersionIgnoredWithInternalVersioning,
#[snafu(display("Amazon OpenSearch Serverless requires `api_version` value to be `auto`"))]
ServerlessElasticsearchApiVersionMustBeAuto,
#[snafu(display("Amazon OpenSearch Serverless requires `auth.strategy` value to be `aws`"))]
OpenSearchServerlessRequiresAwsAuth,
}
5 changes: 4 additions & 1 deletion src/sinks/elasticsearch/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl ElasticsearchService {
pub struct HttpRequestBuilder {
pub bulk_uri: Uri,
pub auth: Option<Auth>,
pub service_type: crate::sinks::elasticsearch::OpenSearchServiceType,
pub compression: Compression,
pub http_request_config: RequestConfig,
}
Expand All @@ -101,9 +102,10 @@ impl HttpRequestBuilder {
pub fn new(common: &ElasticsearchCommon, config: &ElasticsearchConfig) -> HttpRequestBuilder {
HttpRequestBuilder {
bulk_uri: common.bulk_uri.clone(),
http_request_config: config.request.clone(),
auth: common.auth.clone(),
service_type: common.service_type.clone(),
compression: config.compression,
http_request_config: config.request.clone(),
}
}

Expand Down Expand Up @@ -142,6 +144,7 @@ impl HttpRequestBuilder {
region,
} => {
crate::sinks::elasticsearch::sign_request(
&self.service_type,
&mut request,
provider,
&Some(region.clone()),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/prometheus/remote_write/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn sign_request(
credentials_provider: &SharedCredentialsProvider,
region: &Option<Region>,
) -> crate::Result<()> {
crate::aws::sign_request("aps", request, credentials_provider, region).await
crate::aws::sign_request("aps", request, credentials_provider, region, false).await
}

pub(super) async fn build_request(
Expand Down
27 changes: 23 additions & 4 deletions website/cue/reference/components/sinks/base/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ base: components: sinks: elasticsearch: configuration: {
}
}
api_version: {
description: "The API version of Elasticsearch."
required: false
description: """
The API version of Elasticsearch.

Amazon OpenSearch Serverless requires this option to be set to `auto` (the default).
"""
required: false
type: string: {
default: "auto"
enum: {
Expand Down Expand Up @@ -166,8 +170,12 @@ base: components: sinks: elasticsearch: configuration: {
type: string: examples: ["wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"]
}
strategy: {
description: "The authentication strategy to use."
required: true
description: """
The authentication strategy to use.

Amazon OpenSearch Serverless requires this option to be set to `aws`.
"""
required: true
type: string: enum: {
aws: "Amazon OpenSearch Service-specific authentication."
basic: "HTTP Basic Authentication."
Expand Down Expand Up @@ -550,6 +558,17 @@ base: components: sinks: elasticsearch: configuration: {
}
}
}
opensearch_service_type: {
description: "Amazon OpenSearch service type"
required: false
type: string: {
default: "managed"
enum: {
managed: "Elasticsearch or OpenSearch Managed domain"
serverless: "OpenSearch Serverless collection"
}
}
}
pipeline: {
description: "The name of the pipeline to apply."
required: false
Expand Down
Loading