Skip to content

Commit

Permalink
Move discovery.* settings to new setting infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Yannick Welsch committed Jan 22, 2016
1 parent 4c7f3d4 commit 296b48b
Show file tree
Hide file tree
Showing 37 changed files with 172 additions and 135 deletions.
12 changes: 6 additions & 6 deletions core/src/main/java/org/elasticsearch/common/Randomness.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.common;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

import java.lang.reflect.Method;
Expand All @@ -40,7 +41,7 @@
* setting a reproducible seed. When running the Elasticsearch server
* process, non-reproducible sources of randomness are provided (unless
* a setting is provided for a module that exposes a seed setting (e.g.,
* DiscoveryService#SETTING_DISCOVERY_SEED)).
* DiscoveryService#DISCOVERY_SEED_SETTING)).
*/
public final class Randomness {
private static final Method currentMethod;
Expand Down Expand Up @@ -68,13 +69,12 @@ private Randomness() {}
* seed in the settings with the key setting.
*
* @param settings the settings containing the seed
* @param setting the key to access the seed
* @param setting the setting to access the seed
* @return a reproducible source of randomness
*/
public static Random get(Settings settings, String setting) {
Long maybeSeed = settings.getAsLong(setting, null);
if (maybeSeed != null) {
return new Random(maybeSeed);
public static Random get(Settings settings, Setting<Long> setting) {
if (setting.exists(settings)) {
return new Random(setting.get(settings));
} else {
return get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
Expand Down Expand Up @@ -194,6 +198,26 @@ public boolean isLoggerSetting(String key) {
Environment.PATH_REPO_SETTING,
Environment.PATH_SCRIPTS_SETTING,
Environment.PATH_SHARED_DATA_SETTING,
Environment.PIDFILE_SETTING
)));
Environment.PIDFILE_SETTING,
DiscoveryService.DISCOVERY_SEED_SETTING,
DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.ZEN_MASTER_SERVICE_TYPE_SETTING,
FaultDetection.PING_RETRIES_SETTING,
FaultDetection.PING_TIMEOUT_SETTING,
FaultDetection.REGISTER_CONNECTION_LISTENER_SETTING,
FaultDetection.PING_INTERVAL_SETTING,
FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING,
ZenDiscovery.PING_TIMEOUT_SETTING,
ZenDiscovery.JOIN_TIMEOUT_SETTING,
ZenDiscovery.JOIN_RETRY_ATTEMPTS_SETTING,
ZenDiscovery.JOIN_RETRY_DELAY_SETTING,
ZenDiscovery.MAX_PINGS_FROM_ANOTHER_MASTER_SETTING,
ZenDiscovery.SEND_LEAVE_REQUEST_SETTING,
ZenDiscovery.MASTER_ELECTION_FILTER_CLIENT_SETTING,
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_FILTER_DATA_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING
)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.discovery.local.LocalDiscovery;
Expand All @@ -36,14 +37,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* A module for loading classes for node discovery.
*/
public class DiscoveryModule extends AbstractModule {

public static final String DISCOVERY_TYPE_KEY = "discovery.type";
public static final String ZEN_MASTER_SERVICE_TYPE_KEY = "discovery.zen.masterservice.type";
public static final Setting<String> DISCOVERY_TYPE_SETTING = new Setting<>("discovery.type",
settings -> DiscoveryNode.localNode(settings) ? "local" : "zen", Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<String> ZEN_MASTER_SERVICE_TYPE_SETTING = new Setting<>("discovery.zen.masterservice.type",
"zen", Function.identity(), false, Setting.Scope.CLUSTER);

private final Settings settings;
private final List<Class<? extends UnicastHostsProvider>> unicastHostProviders = new ArrayList<>();
Expand Down Expand Up @@ -93,15 +97,14 @@ public void addZenPing(Class<? extends ZenPing> clazz) {

@Override
protected void configure() {
String defaultType = DiscoveryNode.localNode(settings) ? "local" : "zen";
String discoveryType = settings.get(DISCOVERY_TYPE_KEY, defaultType);
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
Class<? extends Discovery> discoveryClass = discoveryTypes.get(discoveryType);
if (discoveryClass == null) {
throw new IllegalArgumentException("Unknown Discovery type [" + discoveryType + "]");
}

if (discoveryType.equals("local") == false) {
String masterServiceTypeKey = settings.get(ZEN_MASTER_SERVICE_TYPE_KEY, "zen");
String masterServiceTypeKey = ZEN_MASTER_SERVICE_TYPE_SETTING.get(settings);
final Class<? extends ElectMasterService> masterService = masterServiceType.get(masterServiceTypeKey);
if (masterService == null) {
throw new IllegalArgumentException("Unknown master service type [" + masterServiceTypeKey + "]");
Expand All @@ -121,4 +124,4 @@ protected void configure() {
bind(Discovery.class).to(discoveryClass).asEagerSingleton();
bind(DiscoveryService.class).asEagerSingleton();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

Expand All @@ -39,8 +40,8 @@
*/
public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryService> {

public static final String SETTING_INITIAL_STATE_TIMEOUT = "discovery.initial_state_timeout";
public static final String SETTING_DISCOVERY_SEED = "discovery.id.seed";
public static final Setting<TimeValue> INITIAL_STATE_TIMEOUT_SETTING = Setting.positiveTimeSetting("discovery.initial_state_timeout", TimeValue.timeValueSeconds(30), false, Setting.Scope.CLUSTER);
public static final Setting<Long> DISCOVERY_SEED_SETTING = Setting.longSetting("discovery.id.seed", 0l, Long.MIN_VALUE, false, Setting.Scope.CLUSTER);

private static class InitialStateListener implements InitialStateDiscoveryListener {

Expand Down Expand Up @@ -71,7 +72,7 @@ public DiscoveryService(Settings settings, DiscoverySettings discoverySettings,
super(settings);
this.discoverySettings = discoverySettings;
this.discovery = discovery;
this.initialStateTimeout = settings.getAsTime(SETTING_INITIAL_STATE_TIMEOUT, TimeValue.timeValueSeconds(30));
this.initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings);
}

public ClusterBlock getNoMasterBlock() {
Expand Down Expand Up @@ -132,7 +133,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, Discovery.AckListen
}

public static String generateNodeId(Settings settings) {
Random random = Randomness.get(settings, DiscoveryService.SETTING_DISCOVERY_SEED);
Random random = Randomness.get(settings, DiscoveryService.DISCOVERY_SEED_SETTING);
return Strings.randomBase64UUID(random);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, PingContextProvider {

public final static Setting<Boolean> REJOIN_ON_MASTER_GONE_SETTING = Setting.boolSetting("discovery.zen.rejoin_on_master_gone", true, true, Setting.Scope.CLUSTER);
public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping_timeout";
public final static String SETTING_JOIN_TIMEOUT = "discovery.zen.join_timeout";
public final static String SETTING_JOIN_RETRY_ATTEMPTS = "discovery.zen.join_retry_attempts";
public final static String SETTING_JOIN_RETRY_DELAY = "discovery.zen.join_retry_delay";
public final static String SETTING_MAX_PINGS_FROM_ANOTHER_MASTER = "discovery.zen.max_pings_from_another_master";
public final static String SETTING_SEND_LEAVE_REQUEST = "discovery.zen.send_leave_request";
public final static String SETTING_MASTER_ELECTION_FILTER_CLIENT = "discovery.zen.master_election.filter_client";
public final static String SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT = "discovery.zen.master_election.wait_for_joins_timeout";
public final static String SETTING_MASTER_ELECTION_FILTER_DATA = "discovery.zen.master_election.filter_data";
public final static Setting<TimeValue> PING_TIMEOUT_SETTING = Setting.positiveTimeSetting("discovery.zen.ping_timeout", timeValueSeconds(3), false, Setting.Scope.CLUSTER);
public final static Setting<TimeValue> JOIN_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen.join_timeout",
settings -> TimeValue.timeValueMillis(PING_TIMEOUT_SETTING.get(settings).millis() * 20).toString(), TimeValue.timeValueMillis(0), false, Setting.Scope.CLUSTER);
public final static Setting<Integer> JOIN_RETRY_ATTEMPTS_SETTING = Setting.intSetting("discovery.zen.join_retry_attempts", 3, 1, false, Setting.Scope.CLUSTER);
public final static Setting<TimeValue> JOIN_RETRY_DELAY_SETTING = Setting.positiveTimeSetting("discovery.zen.join_retry_delay", TimeValue.timeValueMillis(100), false, Setting.Scope.CLUSTER);
public final static Setting<Integer> MAX_PINGS_FROM_ANOTHER_MASTER_SETTING = Setting.intSetting("discovery.zen.max_pings_from_another_master", 3, 1, false, Setting.Scope.CLUSTER);
public final static Setting<Boolean> SEND_LEAVE_REQUEST_SETTING = Setting.boolSetting("discovery.zen.send_leave_request", true, false, Setting.Scope.CLUSTER);
public final static Setting<Boolean> MASTER_ELECTION_FILTER_CLIENT_SETTING = Setting.boolSetting("discovery.zen.master_election.filter_client", true, false, Setting.Scope.CLUSTER);
public final static Setting<TimeValue> MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen.master_election.wait_for_joins_timeout",
settings -> TimeValue.timeValueMillis(JOIN_TIMEOUT_SETTING.get(settings).millis() / 2).toString(), TimeValue.timeValueMillis(0), false, Setting.Scope.CLUSTER);
public final static Setting<Boolean> MASTER_ELECTION_FILTER_DATA_SETTING = Setting.boolSetting("discovery.zen.master_election.filter_data", false, false, Setting.Scope.CLUSTER);

public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";

Expand Down Expand Up @@ -164,26 +166,19 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.discoverySettings = discoverySettings;
this.pingService = pingService;
this.electMaster = electMasterService;
this.pingTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(3));
this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);

this.joinTimeout = settings.getAsTime(SETTING_JOIN_TIMEOUT, TimeValue.timeValueMillis(this.pingTimeout.millis() * 20));
this.joinRetryAttempts = settings.getAsInt(SETTING_JOIN_RETRY_ATTEMPTS, 3);
this.joinRetryDelay = settings.getAsTime(SETTING_JOIN_RETRY_DELAY, TimeValue.timeValueMillis(100));
this.maxPingsFromAnotherMaster = settings.getAsInt(SETTING_MAX_PINGS_FROM_ANOTHER_MASTER, 3);
this.sendLeaveRequest = settings.getAsBoolean(SETTING_SEND_LEAVE_REQUEST, true);
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinRetryAttempts = JOIN_RETRY_ATTEMPTS_SETTING.get(settings);
this.joinRetryDelay = JOIN_RETRY_DELAY_SETTING.get(settings);
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);

this.masterElectionFilterClientNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_CLIENT, true);
this.masterElectionFilterDataNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_DATA, false);
this.masterElectionWaitForJoinsTimeout = settings.getAsTime(SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT, TimeValue.timeValueMillis(joinTimeout.millis() / 2));
this.masterElectionFilterClientNodes = MASTER_ELECTION_FILTER_CLIENT_SETTING.get(settings);
this.masterElectionFilterDataNodes = MASTER_ELECTION_FILTER_DATA_SETTING.get(settings);
this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
this.rejoinOnMasterGone = REJOIN_ON_MASTER_GONE_SETTING.get(settings);

if (this.joinRetryAttempts < 1) {
throw new IllegalArgumentException("'" + SETTING_JOIN_RETRY_ATTEMPTS + "' must be a positive number. got [" + SETTING_JOIN_RETRY_ATTEMPTS + "]");
}
if (this.maxPingsFromAnotherMaster < 1) {
throw new IllegalArgumentException("'" + SETTING_MAX_PINGS_FROM_ANOTHER_MASTER + "' must be a positive number. got [" + this.maxPingsFromAnotherMaster + "]");
}

logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", this.pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);

clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, this::handleMinimumMasterNodesChanged, (value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -35,11 +37,11 @@
*/
public abstract class FaultDetection extends AbstractComponent {

public static final String SETTING_CONNECT_ON_NETWORK_DISCONNECT = "discovery.zen.fd.connect_on_network_disconnect";
public static final String SETTING_PING_INTERVAL = "discovery.zen.fd.ping_interval";
public static final String SETTING_PING_TIMEOUT = "discovery.zen.fd.ping_timeout";
public static final String SETTING_PING_RETRIES = "discovery.zen.fd.ping_retries";
public static final String SETTING_REGISTER_CONNECTION_LISTENER = "discovery.zen.fd.register_connection_listener";
public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING = Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, false, Scope.CLUSTER);
public static final Setting<TimeValue> PING_INTERVAL_SETTING = Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), false, Scope.CLUSTER);
public static final Setting<TimeValue> PING_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), false, Scope.CLUSTER);
public static final Setting<Integer> PING_RETRIES_SETTING = Setting.intSetting("discovery.zen.fd.ping_retries", 3, false, Scope.CLUSTER);
public static final Setting<Boolean> REGISTER_CONNECTION_LISTENER_SETTING = Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, false, Scope.CLUSTER);

protected final ThreadPool threadPool;
protected final ClusterName clusterName;
Expand All @@ -60,11 +62,11 @@ public FaultDetection(Settings settings, ThreadPool threadPool, TransportService
this.transportService = transportService;
this.clusterName = clusterName;

this.connectOnNetworkDisconnect = settings.getAsBoolean(SETTING_CONNECT_ON_NETWORK_DISCONNECT, false);
this.pingInterval = settings.getAsTime(SETTING_PING_INTERVAL, timeValueSeconds(1));
this.pingRetryTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(30));
this.pingRetryCount = settings.getAsInt(SETTING_PING_RETRIES, 3);
this.registerConnectionListener = settings.getAsBoolean(SETTING_REGISTER_CONNECTION_LISTENER, true);
this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings);
this.pingInterval = PING_INTERVAL_SETTING.get(settings);
this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings);
this.pingRetryCount = PING_RETRIES_SETTING.get(settings);
this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings);

this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -72,6 +74,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
Expand All @@ -83,7 +86,8 @@
public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {

public static final String ACTION_NAME = "internal:discovery/zen/unicast";
public static final String DISCOVERY_ZEN_PING_UNICAST_HOSTS = "discovery.zen.ping.unicast.hosts";
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, false, Setting.Scope.CLUSTER);

// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
Expand Down Expand Up @@ -135,13 +139,8 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
}
}

this.concurrentConnects = this.settings.getAsInt("discovery.zen.ping.unicast.concurrent_connects", 10);
String[] hostArr = this.settings.getAsArray(DISCOVERY_ZEN_PING_UNICAST_HOSTS);
// trim the hosts
for (int i = 0; i < hostArr.length; i++) {
hostArr[i] = hostArr[i].trim();
}
List<String> hosts = CollectionUtils.arrayAsArrayList(hostArr);
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
final int limitPortCounts;
if (hosts.isEmpty()) {
// if unicast hosts are not specified, fill with simple defaults on the local machine
Expand Down
Loading

0 comments on commit 296b48b

Please sign in to comment.