Skip to content

Commit

Permalink
Change the logic of SearchThread to manage the failed commands
Browse files Browse the repository at this point in the history
  • Loading branch information
mcasas993 committed Jan 9, 2025
1 parent bf1bba8 commit 95671bc
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,4 @@ private Instant parseInstantValue(XContentParser parser) throws IOException {
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

/**
* Close the resources opened by this plugin.
*
* @throws IOException if the plugin failed to close its resources
*/
@Override
public void close() throws IOException {
super.close();
HttpRestClient.getInstance().stopHttpAsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package com.wazuh.commandmanager.jobscheduler;

import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -27,11 +25,9 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.SearchHit;
Expand All @@ -40,19 +36,14 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.*;
import com.wazuh.commandmanager.settings.PluginSettings;

/**
* The class in charge of searching and managing commands in {@link Status#PENDING} status and of
Expand Down Expand Up @@ -106,54 +97,58 @@ public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T
}

/**
* Iterates over search results, updating their status field and submitting them to the
* destination
* Iterates over search results, updating their status field to {@link Status#FAILURE}
* if their delivery timestamps are older than the current time,
* and submitting them to the destination
*
* @param searchResponse The search results page
* @throws IllegalStateException Rethrown from setSentStatus()
*/
public void handlePage(SearchResponse searchResponse) throws IllegalStateException {
SearchHits searchHits = searchResponse.getHits();
log.debug("IN HANDLE PAGEEEE.");

Orders orders = new Orders();
orders = Orders.fromSearchHits(searchHits);
SearchHits searchHits = searchResponse.getHits();

ZonedDateTime current_time = DateUtils.nowWithMillisResolution();
log.debug("IN HANDLE PAGEEEE. Total hits: {}", searchHits.getTotalHits().value);

Status status = Status.FAILURE;
if (List.of(RestStatus.CREATED, RestStatus.ACCEPTED, RestStatus.OK)
.contains(RestStatus.fromCode(response.getCode()))) {
status = Status.SENT;
}
for (SearchHit hit : searchHits) {
this.setSentStatus(hit, status);
log.debug("IN HANDLE PAGEEEE. Hit id: {}", hit.getId());

Document document = Document.fromSearchHit(hit);
if (document != null && document.getDeliveryTimestamp().isBefore(current_time)){
this.setFailureStatus(hit);
}
}
}


/**
* Retrieves the hit's contents and updates the {@link Status} field to {@link Status#SENT}.
* Retrieves the hit's contents and updates the {@link Status} field to {@link Status#FAILURE}.
*
* @param hit The page's result we are to update.
* @throws IllegalStateException Raised by {@link ActionFuture#actionGet(long)}.
*/
@SuppressWarnings("unchecked")
private void setSentStatus(SearchHit hit, Status status) throws IllegalStateException {
private void setFailureStatus(SearchHit hit) throws IllegalStateException {
final Map<String, Object> commandMap =
getNestedObject(
hit.getSourceAsMap(),
CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME,
Map.class);
commandMap.put(Command.STATUS, status);
hit.getSourceAsMap()
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
final IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);

if(commandMap != null){
commandMap.put(Command.STATUS, Status.FAILURE);
hit.getSourceAsMap()
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
final IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);
}
}

/**
Expand All @@ -171,6 +166,8 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s
QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING);
final TimeValue timeout =
TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS);
log.debug("IN PIT query.");

this.searchSourceBuilder
.query(termQueryBuilder)
.size(CommandManagerPlugin.PAGE_SIZE)
Expand All @@ -184,6 +181,9 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s
this.searchSourceBuilder.searchAfter(searchAfter);
}
searchRequest.source(this.searchSourceBuilder);

log.debug("FINISHING IN PIT query.");

return this.client.search(searchRequest).actionGet(timeout);
}

Expand All @@ -195,6 +195,8 @@ public void run() {
final PointInTimeBuilder pointInTimeBuilder = buildPit();
try {
do {
log.debug("In the do-while loop.");

this.currentPage =
pitQuery(
pointInTimeBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ public String getUser() {
return this.user;
}

/**
* Retrieves the status of this command.
*
* @return the status of the command.
* @see Status
*/
public Status getStatus() {
return this.status;
}

@Override
public String toString() {
return "Command{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package com.wazuh.commandmanager.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.UUIDs;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.time.FormatNames;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.*;
import org.opensearch.search.SearchHit;

import java.io.IOException;
import java.time.ZonedDateTime;
Expand All @@ -40,6 +43,8 @@ public class Document implements ToXContentObject {
private final ZonedDateTime timestamp;
private final ZonedDateTime deliveryTimestamp;

private static final Logger log = LogManager.getLogger(Document.class);

/**
* Default constructor.
*
Expand All @@ -54,6 +59,20 @@ public Document(Agent agent, Command command) {
this.deliveryTimestamp = timestamp.plusSeconds(command.getTimeout());
}

/**
* Custom constructor for existent documents.
*
* @param agent "agent" nested fields.
* @param command "command" nested fields.
*/
public Document(String id, Agent agent, Command command, ZonedDateTime timestamp, ZonedDateTime deliveryTimestamp) {
this.id = id;
this.agent = agent;
this.command = command;
this.timestamp = timestamp;
this.deliveryTimestamp = deliveryTimestamp;
}

/**
* Parses data from an XContentParser into this model.
*
Expand All @@ -78,6 +97,63 @@ public static Document parse(XContentParser parser) throws IOException {
return new Document(agent, command);
}

public static Document fromSearchHit(SearchHit hit) {
try {
XContentParser parser =
XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.IGNORE_DEPRECATIONS,
hit.getSourceRef(),
XContentType.JSON);

Command command = null;
Agent agent = null;
ZonedDateTime deliveryTimestamp = null;
ZonedDateTime timestamp = null;

// Iterate over the JsonXContentParser's JsonToken until we hit null,
// which corresponds to end of data
while (parser.nextToken() != null) {
// Look for FIELD_NAME JsonToken s
if (parser.currentToken().equals(XContentParser.Token.FIELD_NAME)) {
String fieldName = parser.currentName();
switch (fieldName) {
case Document.DELIVERY_TIMESTAMP:
deliveryTimestamp = ZonedDateTime.from(DATE_FORMATTER.parse(parser.text()));
break;

case Document.TIMESTAMP:
timestamp = ZonedDateTime.from(DATE_FORMATTER.parse(parser.text()));
break;

case Agent.AGENT:
// Parse Agent
agent = Agent.parse(parser);
break;

case Command.COMMAND:
// Parse Command
command = Command.parse(parser);
break;

default:
parser.skipChildren();
}
}
}
// Create a new Document object with the Command's fields
return new Document(hit.getId(), agent, command, timestamp, deliveryTimestamp);

} catch (IOException e) {
log.error("Document could not be parsed: {}", e.getMessage());
} catch (NullPointerException e) {
log.error(
"Could not create Document object. One or more of the constructor's arguments was null: {}",
e.getMessage());
}
return null;
}

/**
* Returns the document's "_id".
*
Expand All @@ -87,6 +163,24 @@ public String getId() {
return this.id;
}

/**
* Returns the Command object associated with this Document.
*
* @return Command object
*/
public Command getCommand() {
return this.command;
}

/**
* Returns the timestamp at which the Command was delivered to the Agent.
*
* @return ZonedDateTime object representing the delivery timestamp
*/
public ZonedDateTime getDeliveryTimestamp() {
return this.deliveryTimestamp;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -69,4 +71,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
return builder.endArray();
}

public static Documents fromSearchHits(SearchHits searchHits) {
Documents documents = new Documents();
// Iterate over search results
for (SearchHit hit : searchHits) {
// Parse the hit's document
Document document = Document.fromSearchHit(hit);
documents.addDocument(document);
}
return documents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public Orders() {
* @param searchHits the commands search result
* @return A json string payload with an array of orders to be processed
*/

/**
* Static builder method that initializes an instance of Orders from a SearchHits instance.
*
Expand Down
Loading

0 comments on commit 95671bc

Please sign in to comment.