From 4105917cba60ade8248ee8c707661a9738ac8105 Mon Sep 17 00:00:00 2001 From: QU3B1M Date: Wed, 22 Jan 2025 10:57:53 -0300 Subject: [PATCH] Move generic search-related code from SearchThread to Search util class --- .../jobscheduler/SearchThread.java | 152 +++--------------- .../wazuh/commandmanager/utils/Search.java | 137 +++++++++++++--- 2 files changed, 141 insertions(+), 148 deletions(-) diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java index 3bbbfef3..ba5a12e7 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/jobscheduler/SearchThread.java @@ -20,32 +20,23 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.CreatePitRequest; -import org.opensearch.action.search.CreatePitResponse; -import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.time.DateUtils; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.builder.PointInTimeBuilder; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.sort.SortOrder; import java.time.ZonedDateTime; -import java.util.*; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.Map; import java.util.concurrent.TimeUnit; import com.wazuh.commandmanager.CommandManagerPlugin; import com.wazuh.commandmanager.model.*; +import com.wazuh.commandmanager.utils.Search; /** * The class in charge of searching and managing commands in {@link Status#PENDING} status and of @@ -55,7 +46,6 @@ public class SearchThread implements Runnable { public static final String COMMAND_STATUS_FIELD = Command.COMMAND + "." + Command.STATUS; public static final String DELIVERY_TIMESTAMP_FIELD = Order.DELIVERY_TIMESTAMP; private static final Logger log = LogManager.getLogger(SearchThread.class); - private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); private final Client client; private SearchResponse currentPage = null; @@ -68,40 +58,11 @@ public SearchThread(Client client) { this.client = client; } - /** - * Retrieves a nested value from a {@code Map} in a (somewhat) safe way. - * - * @param map The parent map to look at. - * @param key The key our nested object is found under. - * @param type The type we expect the nested object to be of. - * @param The type of the nested object. - * @return the nested object cast into the proper type. - */ - public static T getNestedObject(Map map, String key, Class type) { - final Object value = map.get(key); - if (value == null) { - return null; - } - if (type.isInstance(value)) { - // Make a defensive copy for supported types like Map or List - if (value instanceof Map) { - return type.cast(new HashMap<>((Map) value)); - } else if (value instanceof List) { - return type.cast(new ArrayList<>((List) value)); - } - // Return the value directly if it is immutable (e.g., String, Integer) - return type.cast(value); - } else { - throw new ClassCastException( - "Expected " + type.getName() + " but found " + value.getClass().getName()); - } - } - /** * Iterates over search results, updating their status field to {@link Status#FAILURE} if their - * delivery timestamps are earlier than the current time + * delivery timestamps are earlier than the current time. * - * @param searchResponse The search results page + * @param searchResponse The search results page. * @throws IllegalStateException from setFailureStatus() * @throws OpenSearchTimeoutException from setFailureStatus() */ @@ -132,7 +93,7 @@ public void handlePage(SearchResponse searchResponse) private void setFailureStatus(SearchHit hit) throws IllegalStateException, OpenSearchTimeoutException { final Map commandMap = - getNestedObject( + Search.getNestedObject( hit.getSourceAsMap(), CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, Map.class); @@ -162,27 +123,17 @@ private void setFailureStatus(SearchHit hit) */ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] searchAfter) throws IllegalStateException, OpenSearchTimeoutException { - final SearchRequest searchRequest = new SearchRequest(CommandManagerPlugin.INDEX_NAME); - final TermQueryBuilder termQueryBuilder = - QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING); - final TimeValue timeout = - TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS); - - this.searchSourceBuilder - .query(termQueryBuilder) - .size(CommandManagerPlugin.PAGE_SIZE) - .trackTotalHits(true) - .timeout(timeout) - .pointInTimeBuilder(pointInTimeBuilder); - if (this.searchSourceBuilder.sorts() == null) { - this.searchSourceBuilder.sort(SearchThread.DELIVERY_TIMESTAMP_FIELD, SortOrder.ASC); - } - if (searchAfter.length > 0) { - this.searchSourceBuilder.searchAfter(searchAfter); - } - searchRequest.source(this.searchSourceBuilder); - - return this.client.search(searchRequest).actionGet(timeout); + return Search.executePitQuery( + this.client, + CommandManagerPlugin.INDEX_NAME, + SearchThread.COMMAND_STATUS_FIELD, + String.valueOf(Status.PENDING), + pointInTimeBuilder, + searchAfter, + TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS), + CommandManagerPlugin.PAGE_SIZE, + DELIVERY_TIMESTAMP_FIELD, + SortOrder.ASC); } @Override @@ -190,13 +141,17 @@ public void run() { log.debug("Running scheduled job"); long consumableHits = 0L; boolean firstPage = true; - final PointInTimeBuilder pointInTimeBuilder = buildPit(); + final PointInTimeBuilder pointInTimeBuilder = + Search.buildPit( + client, + CommandManagerPlugin.PIT_KEEP_ALIVE_SECONDS, + CommandManagerPlugin.INDEX_NAME); try { do { this.currentPage = pitQuery( pointInTimeBuilder, - getSearchAfter(this.currentPage).orElse(new Object[0])); + Search.getSearchAfter(this.currentPage).orElse(new Object[0])); if (firstPage) { log.info("Query returned {} hits.", totalHits()); consumableHits = totalHits(); @@ -235,67 +190,4 @@ private long totalHits() { return 0; } } - - /** - * Gets the sort values of the last hit of a page. It is used by a PIT search to get the next - * page of results. - * - * @param searchResponse The current page - * @return The values of the sort fields of the last hit of a page whenever present. Otherwise, - * an empty Optional. - */ - private Optional getSearchAfter(SearchResponse searchResponse) { - if (searchResponse == null) { - return Optional.empty(); - } - try { - final List hits = Arrays.asList(searchResponse.getHits().getHits()); - if (hits.isEmpty()) { - log.warn("Empty hits page, not getting searchAfter values"); - return Optional.empty(); - } - return Optional.ofNullable(hits.get(hits.size() - 1).getSortValues()); - } catch (NullPointerException | NoSuchElementException e) { - log.error("Could not get the page's searchAfter values: {}", e.getMessage()); - return Optional.empty(); - } - } - - /** - * Prepares a PointInTimeBuilder object to be used in a search. - * - * @return a PointInTimeBuilder or null. - */ - private PointInTimeBuilder buildPit() { - final CompletableFuture future = new CompletableFuture<>(); - final ActionListener actionListener = - new ActionListener<>() { - @Override - public void onResponse(CreatePitResponse createPitResponse) { - future.complete(createPitResponse); - } - - @Override - public void onFailure(Exception e) { - log.error(e.getMessage()); - future.completeExceptionally(e); - } - }; - this.client.createPit( - new CreatePitRequest( - CommandManagerPlugin.PIT_KEEP_ALIVE_SECONDS, - false, - CommandManagerPlugin.INDEX_NAME), - actionListener); - try { - return new PointInTimeBuilder(future.get().getId()); - } catch (CancellationException e) { - log.error("Building PIT was cancelled: {}", e.getMessage()); - } catch (ExecutionException e) { - log.error("Error building PIT: {}", e.getMessage()); - } catch (InterruptedException e) { - log.error("Building PIT was interrupted: {}", e.getMessage()); - } - return null; - } } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/Search.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/Search.java index 227cc25f..4d6d8893 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/Search.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/Search.java @@ -18,20 +18,27 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.node.NodeClient; +import org.opensearch.client.Client; +import org.opensearch.client.support.AbstractClient; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.SortOrder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; /** Utility class for performing search operations. */ public class Search { @@ -40,42 +47,42 @@ public class Search { /** * Executes a synchronous search query on the specified index using a term query. * - * @param client the NodeClient used to execute the search query. + * @param client the AbstractClient used to execute the search query. * @param index the name of the index to search. * @param field the field to query. * @param value the value to search for in the specified field. * @return SearchHits object containing the search results. */ public static SearchHits syncSearch( - NodeClient client, String index, String field, String value) { + AbstractClient client, String index, String field, String value) { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().must(QueryBuilders.termQuery(field, value)); - return executeSyncSearch(client, index, boolQuery); + return executeSearch(client, index, boolQuery, true); } /** * Executes a synchronous search query on the specified index using a boolean query. * - * @param client the NodeClient used to execute the search query. + * @param client the AbstractClient used to execute the search query. * @param index the name of the index to search. * @param boolQuery the boolean query to execute. * @return SearchHits object containing the search results. */ public static SearchHits syncSearch( - NodeClient client, String index, BoolQueryBuilder boolQuery) { - return executeSyncSearch(client, index, boolQuery); + AbstractClient client, String index, BoolQueryBuilder boolQuery) { + return executeSearch(client, index, boolQuery, true); } /** * Executes a synchronous search query on the specified index. * - * @param client the NodeClient used to execute the search query. + * @param client the AbstractClient used to execute the search query. * @param index the name of the index to search. * @param boolQuery the boolean query to execute. * @return SearchHits object containing the search results. */ - private static SearchHits executeSyncSearch( - NodeClient client, String index, BoolQueryBuilder boolQuery) { + private static SearchHits executeSearch( + AbstractClient client, String index, BoolQueryBuilder boolQuery, Boolean sync) { SearchRequest searchRequest = new SearchRequest(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQuery); @@ -99,14 +106,108 @@ public void onFailure(Exception e) { latch.countDown(); } }); + if (sync) { + try { + latch.await(); // Wait for the search to complete + } catch (InterruptedException e) { + log.error("Interrupted while waiting for search to complete", e); + } + } + + return searchHits[0]; + } + /** + * Executes a PIT style query on the specified index using a term query. + * + * @param client the Client used to execute the search query. + * @param index the name of the index to search. + * @param field the field to query. + * @param value the value to search for in the specified field. + * @param pitBuilder the PointInTimeBuilder used for the query. + * @param searchAfter an array of objects containing the last page's values of the sort fields. + * @param timeout the timeout value for the search query. + * @param pageSize the size of each page of results. + * @return the SearchResponse object containing the search results. + */ + public static SearchResponse executePitQuery( + Client client, + String index, + String field, + String value, + PointInTimeBuilder pitBuilder, + Object[] searchAfter, + TimeValue timeout, + int pageSize, + String sortField, + SortOrder sortOrder) { + + SearchRequest searchRequest = new SearchRequest(index); + BoolQueryBuilder boolQuery = + QueryBuilders.boolQuery().must(QueryBuilders.termQuery(field, value)); + + SearchSourceBuilder searchSourceBuilder = + new SearchSourceBuilder() + .query(boolQuery) + .size(pageSize) + .trackTotalHits(true) + .timeout(timeout) + .pointInTimeBuilder(pitBuilder); + + if (searchSourceBuilder.sorts() == null) { + searchSourceBuilder.sort(sortField, sortOrder); + } + if (searchAfter.length > 0) { + searchSourceBuilder.searchAfter(searchAfter); + } + searchRequest.source(searchSourceBuilder); + + return client.search(searchRequest).actionGet(timeout); + } + + public static Optional getSearchAfter(SearchResponse searchResponse) { + if (searchResponse == null) { + return Optional.empty(); + } try { - latch.await(); // Wait for the search to complete - } catch (InterruptedException e) { - log.error("Interrupted while waiting for search to complete", e); + final List hits = List.of(searchResponse.getHits().getHits()); + if (hits.isEmpty()) { + log.warn("Empty hits page, not getting searchAfter values."); + return Optional.empty(); + } + return Optional.ofNullable(hits.get(hits.size() - 1).getSortValues()); + } catch (NullPointerException | NoSuchElementException e) { + log.error("Could not get the page's searchAfter values: {}", e.getMessage()); + return Optional.empty(); } + } - return searchHits[0]; + public static PointInTimeBuilder buildPit(Client client, TimeValue pitKeepAlive, String index) { + final CompletableFuture future = new CompletableFuture<>(); + final ActionListener actionListener = + new ActionListener<>() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + future.complete(createPitResponse); + } + + @Override + public void onFailure(Exception e) { + log.error(e.getMessage()); + future.completeExceptionally(e); + } + }; + client.createPit(new CreatePitRequest(pitKeepAlive, false, index), actionListener); + try { + return new PointInTimeBuilder(future.get().getId()); + } catch (CancellationException e) { + log.error("Building PIT was cancelled: {}", e.getMessage()); + } catch (ExecutionException e) { + log.error("Error building PIT: {}", e.getMessage()); + } catch (InterruptedException e) { + log.error("Building PIT was interrupted: {}", e.getMessage()); + } + return null; } /**