Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove delivery of commands and change JobSchedule #204

Merged
merged 8 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions plugins/command-manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ testClusters.integTest {
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}

// add customized keystore
keystore 'm_api.auth.username', 'wazuh'
keystore 'm_api.auth.password', 'wazuh'
keystore 'm_api.uri', 'https://127.0.0.1:55000' // base URI of the M_API
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -42,7 +41,6 @@
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.*;
import org.opensearch.script.ScriptService;
Expand All @@ -51,7 +49,6 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -63,8 +60,6 @@
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner;
import com.wazuh.commandmanager.jobscheduler.JobDocument;
import com.wazuh.commandmanager.rest.RestPostCommandAction;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.httpclient.HttpRestClient;

/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands
Expand All @@ -75,8 +70,7 @@
*
* <p>The Command Manager plugin is also a JobScheduler extension plugin.
*/
public class CommandManagerPlugin extends Plugin
implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension {
public class CommandManagerPlugin extends Plugin implements ActionPlugin, JobSchedulerExtension {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager";
public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands";
public static final String INDEX_NAME = ".commands";
Expand Down Expand Up @@ -111,9 +105,6 @@ public Collection<Object> createComponents(
// Command index repository initialization.
this.commandIndex = new CommandIndex(client, clusterService, threadPool);

// Plugin settings initialization.
PluginSettings.getInstance(environment.settings());

// Scheduled job initialization
// NOTE it's very likely that client and thread pool may not be required as the command
// index
Expand Down Expand Up @@ -171,20 +162,6 @@ public List<RestHandler> getRestHandlers(
return Collections.singletonList(new RestPostCommandAction(this.commandIndex));
}

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
// Register API settings
PluginSettings.M_API_AUTH_USERNAME,
PluginSettings.M_API_AUTH_PASSWORD,
PluginSettings.M_API_URI);
}

@Override
public void reload(Settings settings) {
// TODO
}

@Override
public String getJobType() {
return CommandManagerPlugin.JOB_TYPE;
Expand Down Expand Up @@ -253,15 +230,4 @@ private Instant parseInstantValue(XContentParser parser) throws IOException {
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

/**
* Close the resources opened by this plugin.
*
* @throws IOException if the plugin failed to close its resources
*/
@Override
public void close() throws IOException {
super.close();
HttpRestClient.getInstance().stopHttpAsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package com.wazuh.commandmanager.jobscheduler;

import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
Expand All @@ -27,11 +25,9 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.SearchHit;
Expand All @@ -40,20 +36,14 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.*;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.httpclient.AuthHttpRestClient;

/**
* The class in charge of searching and managing commands in {@link Status#PENDING} status and of
Expand Down Expand Up @@ -107,92 +97,54 @@ public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T
}

/**
* Iterates over search results, updating their status field and submitting them to the
* destination
* Iterates over search results, updating their status field to {@link Status#FAILURE} if their
* delivery timestamps are older than the current time, and submitting them to the destination
mcasas993 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param searchResponse The search results page
* @throws IllegalStateException Rethrown from setSentStatus()
*/
public void handlePage(SearchResponse searchResponse) throws IllegalStateException {
SearchHits searchHits = searchResponse.getHits();
String payload;
try {
payload =
Orders.fromSearchHits(searchHits)
.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)
.toString();
} catch (IOException e) {
log.error("Error parsing orders from search hits, due to {}", e.getMessage());
return;
}
final SimpleHttpResponse response = this.deliverOrders(payload);
if (response == null) {
log.error("No reply from server.");
return;
}
log.info("Server replied with {}. Updating orders' status.", response.getCode());
Status status = Status.FAILURE;
if (List.of(RestStatus.CREATED, RestStatus.ACCEPTED, RestStatus.OK)
.contains(RestStatus.fromCode(response.getCode()))) {
status = Status.SENT;
}
for (SearchHit hit : searchHits) {
this.setSentStatus(hit, status);
}
}

/**
* Send the command order over HTTP
*
* @param orders The list of order to send.
*/
private SimpleHttpResponse deliverOrders(String orders) {
SimpleHttpResponse response = null;
try {
final PluginSettings settings = PluginSettings.getInstance();
final URI host =
new URIBuilder(settings.getUri() + SearchThread.ORDERS_ENDPOINT).build();
ZonedDateTime current_time = DateUtils.nowWithMillisResolution();
mcasas993 marked this conversation as resolved.
Show resolved Hide resolved

response =
AccessController.doPrivileged(
(PrivilegedAction<SimpleHttpResponse>)
() -> {
final AuthHttpRestClient httpClient =
new AuthHttpRestClient();
return httpClient.post(host, orders, null);
});
} catch (URISyntaxException e) {
log.error("Invalid URI: {}", e.getMessage());
} catch (Exception e) {
log.error("Error sending data: {}", e.getMessage());
for (SearchHit hit : searchHits) {
ZonedDateTime deliveryTimestampFromSearchHit =
mcasas993 marked this conversation as resolved.
Show resolved Hide resolved
Document.deliveryTimestampFromSearchHit(hit);
if (deliveryTimestampFromSearchHit != null
&& deliveryTimestampFromSearchHit.isBefore(current_time)) {
this.setFailureStatus(hit);
}
}
return response;
}

/**
* Retrieves the hit's contents and updates the {@link Status} field to {@link Status#SENT}.
* Retrieves the hit's contents and updates the {@link Status} field to {@link Status#FAILURE}.
*
* @param hit The page's result we are to update.
* @throws IllegalStateException Raised by {@link ActionFuture#actionGet(long)}.
*/
@SuppressWarnings("unchecked")
private void setSentStatus(SearchHit hit, Status status) throws IllegalStateException {
private void setFailureStatus(SearchHit hit) throws IllegalStateException {
final Map<String, Object> commandMap =
getNestedObject(
hit.getSourceAsMap(),
CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME,
Map.class);
commandMap.put(Command.STATUS, status);
hit.getSourceAsMap()
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
final IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);

if (commandMap != null) {
commandMap.put(Command.STATUS, Status.FAILURE);
hit.getSourceAsMap()
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
final IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);
}
}

/**
Expand All @@ -210,6 +162,7 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s
QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING);
final TimeValue timeout =
TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS);

this.searchSourceBuilder
.query(termQueryBuilder)
.size(CommandManagerPlugin.PAGE_SIZE)
Expand All @@ -223,12 +176,12 @@ public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] s
this.searchSourceBuilder.searchAfter(searchAfter);
}
searchRequest.source(this.searchSourceBuilder);

return this.client.search(searchRequest).actionGet(timeout);
}

@Override
public void run() {
log.debug("Running scheduled job");
mcasas993 marked this conversation as resolved.
Show resolved Hide resolved
long consumableHits = 0L;
boolean firstPage = true;
final PointInTimeBuilder pointInTimeBuilder = buildPit();
Expand All @@ -239,7 +192,7 @@ public void run() {
pointInTimeBuilder,
getSearchAfter(this.currentPage).orElse(new Object[0]));
if (firstPage) {
log.debug("Query returned {} hits.", totalHits());
log.info("Query returned {} hits.", totalHits());
consumableHits = totalHits();
firstPage = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ public String getUser() {
return this.user;
}

/**
* Retrieves the status of this command.
*
* @return the status of the command.
* @see Status
*/
public Status getStatus() {
return this.status;
}

@Override
public String toString() {
return "Command{"
Expand Down
Loading