Skip to content

Commit

Permalink
Adding ES Security (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankita10r authored Aug 24, 2022
1 parent e9e3b1e commit 3253508
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@
import io.zeebe.exporter.ElasticsearchMetrics;
import io.zeebe.util.VersionUtil;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -29,10 +37,14 @@
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand All @@ -59,6 +71,18 @@ public class ElasticsearchClient {
@Value("${reporting.enabled}")
private Boolean reportingEnabled;

@Value("${elasticsearch.security.enabled}")
private Boolean securityEnabled;

@Value("${elasticsearch.sslVerification}")
private Boolean sslVerify;

@Value("${elasticsearch.username}")
private String username;

@Value("${elasticsearch.password}")
private String password;

@Autowired
private TaskScheduler taskScheduler;

Expand Down Expand Up @@ -260,42 +284,59 @@ private boolean putIndexTemplate(PutIndexTemplateRequest putIndexTemplateRequest
.indices()
.putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT)
.isAcknowledged();
} catch (IOException e) {
}catch (ElasticsearchException exception){
throw new ElasticsearchExporterException("Failed to Connect ES", exception);
}
catch (IOException e) {
throw new ElasticsearchExporterException("Failed to put index template", e);
}
}

private RestHighLevelClient createClient() {
HttpHost httpHost = urlToHttpHost(elasticUrl);

// use single thread for rest client
RestClientBuilder builder =
RestClient.builder(httpHost).setHttpClientConfigCallback(this::setHttpClientConfigCallback);

return new RestHighLevelClient(builder);
RestClientBuilder builder;
SSLContext sslContext = null;
if(securityEnabled) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
if (sslVerify) {
SSLContextBuilder sslBuilder = null;
try {
sslBuilder = SSLContexts.custom().loadTrustMaterial(null, (x509Certificates, s) -> true);
sslContext = sslBuilder.build();
} catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
e.printStackTrace();
}
HttpHost httpHost = urlToHttpHost(elasticUrl);
SSLContext finalSslContext = sslContext;
builder = RestClient.builder(httpHost)
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setSSLContext(finalSslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider));
} else {
HttpHost httpHost = urlToHttpHost(elasticUrl);
builder = RestClient.builder(httpHost)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
} else {
HttpHost httpHost = urlToHttpHost(elasticUrl);
builder =
RestClient.builder(httpHost).setHttpClientConfigCallback(this::setHttpClientConfigCallback);
}
return new RestHighLevelClient(builder);
}

private HttpAsyncClientBuilder setHttpClientConfigCallback(HttpAsyncClientBuilder builder) {
builder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());

// TODO authentication
// if (configuration.authentication.isPresent()) {
// setupBasicAuthentication(builder);
// }

return builder;
}

// private void setupBasicAuthentication(HttpAsyncClientBuilder builder) {
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(
// AuthScope.ANY,
// new UsernamePasswordCredentials(
// configuration.authentication.username, configuration.authentication.password));
//
// builder.setDefaultCredentialsProvider(credentialsProvider);
// }

private static HttpHost urlToHttpHost(String url) {
URI uri;
try {
Expand Down
12 changes: 10 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ importer:
kafka:
topic: "zeebe-export"
elasticsearch:
url: "http://ph-ee-elasticsearch:9200/"
url: "https://ph-ee-elasticsearch:9200/"
bulk-size: 20
index-prefix: "zeebe-record"


elasticsearch:
security:
enabled: false
sslVerification: false
username: "elastic"
password: "somepassword"

logging:
level:
ROOT: ERROR
ROOT: INFO
pattern:
console: "%clr(%d{dd-MM-yyyy HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([%35.35t]){faint} %clr(%-28.28logger{28}){cyan} %clr(:){faint}%X{BUSINESS-LOG} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"

Expand Down

0 comments on commit 3253508

Please sign in to comment.