Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Command Manager settings on opensearch.yml #213

Merged
merged 26 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d2082cf
Implement plugin settings class to handle command manager configuration
QU3B1M Jan 13, 2025
a18a76c
Initialize settings on main Command Manager class
QU3B1M Jan 13, 2025
f9fbfea
Make PluginSetting singleton thread-safe adding syncrhonization
QU3B1M Jan 13, 2025
dc8ddc9
Replace constants usage with PluginSettings corresponding getters
QU3B1M Jan 13, 2025
c94c182
Register settings in CommandManager main class
QU3B1M Jan 13, 2025
747d3ee
Add logs to validate settings registering
QU3B1M Jan 13, 2025
d959c8d
Implement RealoadablePlugin interface to CommandManager plugin
QU3B1M Jan 13, 2025
d4c14d4
Apply spotless formatting
QU3B1M Jan 13, 2025
b06a775
Replace setting job.index.name for a constant
QU3B1M Jan 13, 2025
0136b8a
Apply getter functions for job index name and job type
QU3B1M Jan 14, 2025
1947e26
Add unit tests for PluginSettings class
QU3B1M Jan 14, 2025
6cb00b7
Add docstrings to PluginSettings functions
QU3B1M Jan 14, 2025
6bcb6b9
Merge branch 'master' into enhancement/180-command-manager-configuration
QU3B1M Jan 14, 2025
7635e26
Remove unnecesary debug logs
QU3B1M Jan 14, 2025
2bbf3a8
Convert indexes, templates, and API URI related configuration fields …
QU3B1M Jan 15, 2025
f240284
Update tests with new constant values
QU3B1M Jan 15, 2025
853aa18
Merge branch 'master' into enhancement/180-command-manager-configuration
QU3B1M Jan 15, 2025
ec0e85a
Rename timeout setting to client.timeout for clearer naming
QU3B1M Jan 15, 2025
14f83dc
Update docstrings
QU3B1M Jan 15, 2025
364c4bb
Make constant settings static
AlexRuiz7 Jan 16, 2025
6b76309
Log settings on plugin's start (#235)
AlexRuiz7 Jan 21, 2025
0f69aac
Constrict job scheduler configuration options (#236)
f-galland Jan 22, 2025
f4d0712
Merge branch 'master' into enhancement/180-command-manager-configuration
AlexRuiz7 Jan 22, 2025
d501d72
Fix import
AlexRuiz7 Jan 22, 2025
4c4b8eb
Fix old configuration references
QU3B1M Jan 23, 2025
c5e601f
Remove volatile from PluginSettings instance
AlexRuiz7 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.settings.*;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -41,6 +37,7 @@
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.*;
import org.opensearch.script.ScriptService;
Expand All @@ -49,6 +46,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -60,6 +58,7 @@
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner;
import com.wazuh.commandmanager.jobscheduler.JobDocument;
import com.wazuh.commandmanager.rest.RestPostCommandAction;
import com.wazuh.commandmanager.settings.PluginSettings;

/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands
Expand All @@ -70,21 +69,11 @@
*
* <p>The Command Manager plugin is also a JobScheduler extension plugin.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin, JobSchedulerExtension {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager";
public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands";
public static final String INDEX_NAME = ".commands";
public static final String INDEX_TEMPLATE_NAME = "index-template-commands";
public class CommandManagerPlugin extends Plugin
implements ActionPlugin, JobSchedulerExtension, ReloadablePlugin {
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final String JOB_INDEX_TEMPLATE_NAME = "index-template-scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 100;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
public static final TimeValue PIT_KEEP_ALIVE_SECONDS = TimeValue.timeValueSeconds(30L);

private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class);
public static final String JOB_TYPE = "command_manager_scheduler_extension";

private CommandIndex commandIndex;
private JobDocument jobDocument;
Expand All @@ -104,6 +93,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier) {
// Command index repository initialization.
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
PluginSettings.getInstance(environment.settings());

// Scheduled job initialization
// NOTE it's very likely that client and thread pool may not be required as the command
Expand Down Expand Up @@ -139,7 +129,7 @@ private void scheduleCommandJob(
threadPool,
UUIDs.base64UUID(),
getJobType(),
JOB_PERIOD_MINUTES);
PluginSettings.getInstance().getJobSchedule());
indexResponseCompletableFuture.thenAccept(
indexResponse -> {
log.info(
Expand All @@ -162,14 +152,29 @@ public List<RestHandler> getRestHandlers(
return Collections.singletonList(new RestPostCommandAction(this.commandIndex));
}

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
// Register API settings
PluginSettings.TIMEOUT,
PluginSettings.JOB_PAGE_SIZE,
PluginSettings.JOB_SCHEDULE,
PluginSettings.JOB_KEEP_ALIVE,
PluginSettings.JOB_INDEX_TEMPLATE,
PluginSettings.API_PREFIX,
PluginSettings.API_ENDPOINT,
PluginSettings.INDEX_NAME,
PluginSettings.INDEX_TEMPLATE);
}

@Override
public String getJobType() {
return CommandManagerPlugin.JOB_TYPE;
return PluginSettings.getJobType();
}

@Override
public String getJobIndex() {
return CommandManagerPlugin.JOB_INDEX_NAME;
return PluginSettings.getJobIndexName();
}

@Override
Expand Down Expand Up @@ -230,4 +235,9 @@ private Instant parseInstantValue(XContentParser parser) throws IOException {
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

@Override
public void reload(Settings settings) {
// TODO
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.Document;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.IndexTemplateUtils;

/** Class to manage the Command Manager index and index template. */
Expand Down Expand Up @@ -63,11 +63,13 @@ public CommandIndex(Client client, ClusterService clusterService, ThreadPool thr
/**
* Checks if the command index exists.
*
* @return whether the internal {@value
* com.wazuh.commandmanager.CommandManagerPlugin#INDEX_NAME} exists.
* @return whether the internal Command Manager's index exists.
*/
public boolean indexExists() {
return this.clusterService.state().routingTable().hasIndex(CommandManagerPlugin.INDEX_NAME);
return this.clusterService
.state()
.routingTable()
.hasIndex(PluginSettings.getInstance().getIndexName());
}

/**
Expand Down Expand Up @@ -97,15 +99,15 @@ public CompletableFuture<RestStatus> asyncBulkCreate(ArrayList<Document> documen
() -> {
try (ThreadContext.StoredContext ignored =
this.threadPool.getThreadContext().stashContext()) {
String indexTemplateName = PluginSettings.getInstance().getIndexTemplate();
// Create index template if it does not exist.
if (IndexTemplateUtils.isMissingIndexTemplate(
this.clusterService, CommandManagerPlugin.INDEX_TEMPLATE_NAME)) {
IndexTemplateUtils.putIndexTemplate(
this.client, CommandManagerPlugin.INDEX_TEMPLATE_NAME);
this.clusterService, indexTemplateName)) {
IndexTemplateUtils.putIndexTemplate(this.client, indexTemplateName);
} else {
log.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.INDEX_TEMPLATE_NAME);
indexTemplateName);
}

final RestStatus restStatus = client.bulk(bulkRequest).actionGet().status();
Expand All @@ -127,7 +129,7 @@ public CompletableFuture<RestStatus> asyncBulkCreate(ArrayList<Document> documen
*/
private IndexRequest createIndexRequest(Document document) throws IOException {
return new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.index(PluginSettings.getInstance().getIndexName())
.source(document.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(document.getId())
.create(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.threadpool.ThreadPool;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.settings.PluginSettings;

/**
* Implements the ScheduledJobRunner interface, which exposes the runJob() method, which executes
Expand Down Expand Up @@ -67,7 +67,7 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
if (!this.indexManager.indexExists()) {
log.info(
"{} index not yet created, not running command manager jobs",
CommandManagerPlugin.INDEX_NAME);
PluginSettings.getInstance().getIndexName());
return;
}
final SearchThread searchThread = new SearchThread(this.client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.IndexTemplateUtils;

/** Indexes the command job to the Jobs index. */
Expand Down Expand Up @@ -79,7 +79,7 @@ public CompletableFuture<IndexResponse> create(
try {
IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.JOB_INDEX_NAME)
.index(PluginSettings.getInstance().getJobIndexName())
.id(id)
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
.create(true);
Expand All @@ -88,13 +88,14 @@ public CompletableFuture<IndexResponse> create(
try (ThreadContext.StoredContext ignored =
threadPool.getThreadContext().stashContext()) {
if (IndexTemplateUtils.isMissingIndexTemplate(
clusterService, CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME)) {
clusterService,
PluginSettings.getInstance().getJobIndexTemplate())) {
IndexTemplateUtils.putIndexTemplate(
client, CommandManagerPlugin.JOB_INDEX_TEMPLATE_NAME);
client, PluginSettings.getInstance().getJobIndexTemplate());
} else {
log.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.JOB_INDEX_NAME);
PluginSettings.getInstance().getJobIndexName());
}
IndexResponse indexResponse = client.index(indexRequest).actionGet();
completableFuture.complete(indexResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.*;
import com.wazuh.commandmanager.settings.PluginSettings;

/**
* The class in charge of searching and managing commands in {@link Status#PENDING} status and of
Expand Down Expand Up @@ -96,8 +97,8 @@ public static <T> T getNestedObject(Map<String, Object> map, String key, Class<T
}

/**
* Iterates over search results, updating their status field to {@link Status#FAILURE}
* if their delivery timestamps are earlier than the current time
* Iterates over search results, updating their status field to {@link Status#FAILURE} if their
* delivery timestamps are earlier than the current time
*
* @param searchResponse The search results page
* @throws IllegalStateException Rethrown from setSentStatus()
Expand Down Expand Up @@ -137,12 +138,12 @@ private void setFailureStatus(SearchHit hit) throws IllegalStateException {
.put(CommandManagerPlugin.COMMAND_DOCUMENT_PARENT_OBJECT_NAME, commandMap);
final IndexRequest indexRequest =
new IndexRequest()
.index(CommandManagerPlugin.INDEX_NAME)
.index(PluginSettings.getInstance().getIndexName())
.source(hit.getSourceAsMap())
.id(hit.getId());
this.client
.index(indexRequest)
.actionGet(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS * 1000);
.actionGet(PluginSettings.getInstance().getTimeout() * 1000);
}
}

Expand All @@ -156,15 +157,16 @@ private void setFailureStatus(SearchHit hit) throws IllegalStateException {
*/
public SearchResponse pitQuery(PointInTimeBuilder pointInTimeBuilder, Object[] searchAfter)
throws IllegalStateException {
final SearchRequest searchRequest = new SearchRequest(CommandManagerPlugin.INDEX_NAME);
final SearchRequest searchRequest =
new SearchRequest(PluginSettings.getInstance().getIndexName());
final TermQueryBuilder termQueryBuilder =
QueryBuilders.termQuery(SearchThread.COMMAND_STATUS_FIELD, Status.PENDING);
final TimeValue timeout =
TimeValue.timeValueSeconds(CommandManagerPlugin.DEFAULT_TIMEOUT_SECONDS);
TimeValue.timeValueSeconds(PluginSettings.getInstance().getTimeout());

this.searchSourceBuilder
.query(termQueryBuilder)
.size(CommandManagerPlugin.PAGE_SIZE)
.size(PluginSettings.getInstance().getJobPageSize())
.trackTotalHits(true)
.timeout(timeout)
.pointInTimeBuilder(pointInTimeBuilder);
Expand Down Expand Up @@ -275,9 +277,9 @@ public void onFailure(Exception e) {
};
this.client.createPit(
new CreatePitRequest(
CommandManagerPlugin.PIT_KEEP_ALIVE_SECONDS,
new TimeValue(PluginSettings.getInstance().getJobKeepAlive()),
false,
CommandManagerPlugin.INDEX_NAME),
PluginSettings.getInstance().getIndexName()),
actionListener);
try {
return new PointInTimeBuilder(future.get().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,17 @@
import java.util.Locale;
import java.util.concurrent.CompletableFuture;

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.model.Agent;
import com.wazuh.commandmanager.model.Command;
import com.wazuh.commandmanager.model.Document;
import com.wazuh.commandmanager.model.Documents;
import com.wazuh.commandmanager.settings.PluginSettings;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
* Handles HTTP requests to the POST {@value
* com.wazuh.commandmanager.CommandManagerPlugin#COMMANDS_URI} endpoint.
*/
/** Handles HTTP requests to the POST the Commands API endpoint. */
public class RestPostCommandAction extends BaseRestHandler {

public static final String POST_COMMAND_ACTION_REQUEST_DETAILS =
Expand All @@ -71,7 +68,11 @@ public String getName() {
public List<Route> routes() {
return List.of(
new Route(
POST, String.format(Locale.ROOT, "%s", CommandManagerPlugin.COMMANDS_URI)));
POST,
String.format(
Locale.ROOT,
"%s",
PluginSettings.getInstance().getApiCommandsUri())));
}

@Override
Expand Down Expand Up @@ -158,7 +159,8 @@ private RestChannelConsumer handlePost(RestRequest request) throws IOException {
restStatus -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("_index", CommandManagerPlugin.INDEX_NAME);
builder.field(
"_index", PluginSettings.getInstance().getIndexName());
documents.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.field("result", restStatus.name());
builder.endObject();
Expand Down
Loading