Skip to content

Commit

Permalink
Move generic search-related code from SearchThread to Search util class
Browse files Browse the repository at this point in the history
  • Loading branch information
QU3B1M committed Jan 22, 2025
1 parent c9a6f26 commit 4105917
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,23 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;

import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.*;
import com.wazuh.commandmanager.utils.Search;

/**
* The class in charge of searching and managing commands in {@link Status#PENDING} status and of
Expand All @@ -55,7 +46,6 @@ public class SearchThread implements Runnable {
public static final String COMMAND_STATUS_FIELD = Command.COMMAND + "." + Command.STATUS;
public static final String DELIVERY_TIMESTAMP_FIELD = Order.DELIVERY_TIMESTAMP;
private static final Logger log = LogManager.getLogger(SearchThread.class);
private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
private final Client client;
private SearchResponse currentPage = null;

Expand All @@ -68,40 +58,11 @@ public SearchThread(Client client) {
this.client = client;
}

/**
* Retrieves a nested value from a {@code Map<String, Object>} in a (somewhat) safe way.
*
* @param map The parent map to look at.
* @param key The key our nested object is found under.
* @param type The type we expect the nested object to be of.
* @param <T> The type of the nested object.
* @return the nested object cast into the proper type.
*/
public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T> type) {
final Object value = map.get(key);
if (value == null) {
return null;
}
if (type.isInstance(value)) {
// Make a defensive copy for supported types like Map or List
if (value instanceof Map) {
return type.cast(new HashMap<>((Map<?, ?>) value));
} else if (value instanceof List) {
return type.cast(new ArrayList<>((List<?>) value));
}
// Return the value directly if it is immutable (e.g., String, Integer)
return type.cast(value);
} else {
throw new ClassCastException(
"Expected " + type.getName() + " but found " + value.getClass().getName());
}
}

/**
* Iterates over search results, updating their status field to {@link Status#FAILURE} if their
* delivery timestamps are earlier than the current time
* delivery timestamps are earlier than the current time.
*
* @param searchResponse The search results page
* @param searchResponse The search results page.
* @throws IllegalStateException from setFailureStatus()
* @throws OpenSearchTimeoutException from setFailureStatus()
*/
Expand Down Expand Up @@ -132,7 +93,7 @@ public void handlePage(SearchResponse searchResponse)
private void setFailureStatus(SearchHit hit)
throws IllegalStateException, OpenSearchTimeoutException {
final Map<String, Object> commandMap =
getNestedObject(
Search.getNestedObject(
hit.getSourceAsMap(),
CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME,
Map.class);
Expand Down Expand Up @@ -162,41 +123,35 @@ private void setFailureStatus(SearchHit hit)
*/
public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] searchAfter)
throws IllegalStateException, OpenSearchTimeoutException {
final SearchRequest searchRequest = new SearchRequest(CommandManagerPlugin.INDEX_NAME);
final TermQueryBuilder termQueryBuilder =
QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING);
final TimeValue timeout =
TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS);

this.searchSourceBuilder
.query(termQueryBuilder)
.size(CommandManagerPlugin.PAGE_SIZE)
.trackTotalHits(true)
.timeout(timeout)
.pointInTimeBuilder(pointInTimeBuilder);
if (this.searchSourceBuilder.sorts() == null) {
this.searchSourceBuilder.sort(SearchThread.DELIVERY_TIMESTAMP_FIELD, SortOrder.ASC);
}
if (searchAfter.length > 0) {
this.searchSourceBuilder.searchAfter(searchAfter);
}
searchRequest.source(this.searchSourceBuilder);

return this.client.search(searchRequest).actionGet(timeout);
return Search.executePitQuery(
this.client,
CommandManagerPlugin.INDEX_NAME,
SearchThread.COMMAND_STATUS_FIELD,
String.valueOf(Status.PENDING),
pointInTimeBuilder,
searchAfter,
TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS),
CommandManagerPlugin.PAGE_SIZE,
DELIVERY_TIMESTAMP_FIELD,
SortOrder.ASC);
}

@Override
public void run() {
log.debug("Running scheduled job");
long consumableHits = 0L;
boolean firstPage = true;
final PointInTimeBuilder pointInTimeBuilder = buildPit();
final PointInTimeBuilder pointInTimeBuilder =
Search.buildPit(
client,
CommandManagerPlugin.PIT_KEEP_ALIVE_SECONDS,
CommandManagerPlugin.INDEX_NAME);
try {
do {
this.currentPage =
pitQuery(
pointInTimeBuilder,
getSearchAfter(this.currentPage).orElse(new Object[0]));
Search.getSearchAfter(this.currentPage).orElse(new Object[0]));
if (firstPage) {
log.info("Query returned {} hits.", totalHits());
consumableHits = totalHits();
Expand Down Expand Up @@ -235,67 +190,4 @@ private long totalHits() {
return 0;
}
}

/**
* Gets the sort values of the last hit of a page. It is used by a PIT search to get the next
* page of results.
*
* @param searchResponse The current page
* @return The values of the sort fields of the last hit of a page whenever present. Otherwise,
* an empty Optional.
*/
private Optional<Object[]> getSearchAfter(SearchResponse searchResponse) {
if (searchResponse == null) {
return Optional.empty();
}
try {
final List<SearchHit> hits = Arrays.asList(searchResponse.getHits().getHits());
if (hits.isEmpty()) {
log.warn("Empty hits page, not getting searchAfter values");
return Optional.empty();
}
return Optional.ofNullable(hits.get(hits.size() - 1).getSortValues());
} catch (NullPointerException | NoSuchElementException e) {
log.error("Could not get the page's searchAfter values: {}", e.getMessage());
return Optional.empty();
}
}

/**
* Prepares a PointInTimeBuilder object to be used in a search.
*
* @return a PointInTimeBuilder or null.
*/
private PointInTimeBuilder buildPit() {
final CompletableFuture<CreatePitResponse> future = new CompletableFuture<>();
final ActionListener<CreatePitResponse> actionListener =
new ActionListener<>() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
future.complete(createPitResponse);
}

@Override
public void onFailure(Exception e) {
log.error(e.getMessage());
future.completeExceptionally(e);
}
};
this.client.createPit(
new CreatePitRequest(
CommandManagerPlugin.PIT_KEEP_ALIVE_SECONDS,
false,
CommandManagerPlugin.INDEX_NAME),
actionListener);
try {
return new PointInTimeBuilder(future.get().getId());
} catch (CancellationException e) {
log.error("Building PIT was cancelled: {}", e.getMessage());
} catch (ExecutionException e) {
log.error("Error building PIT: {}", e.getMessage());
} catch (InterruptedException e) {
log.error("Building PIT was interrupted: {}", e.getMessage());
}
return null;
}
}
Loading

0 comments on commit 4105917

Please sign in to comment.