Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reap previously launched logviewer tasks #240

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions storm/src/main/storm/mesos/MesosNimbus.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
import org.apache.mesos.Protos.Value.Range;
import org.apache.mesos.Protos.Value.Ranges;
import org.apache.mesos.Protos.Value.Scalar;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.SchedulerDriver;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
Expand All @@ -60,8 +60,8 @@
import storm.mesos.resources.ResourceEntry;
import storm.mesos.resources.ResourceNotAvailableException;
import storm.mesos.resources.ResourceType;
import storm.mesos.schedulers.StormSchedulerImpl;
import storm.mesos.schedulers.IMesosStormScheduler;
import storm.mesos.schedulers.StormSchedulerImpl;
import storm.mesos.shims.CommandLineShimFactory;
import storm.mesos.shims.ICommandLineShim;
import storm.mesos.shims.LocalStateShim;
Expand Down Expand Up @@ -91,8 +91,8 @@
import java.util.TimerTask;

import static storm.mesos.util.PrettyProtobuf.offerIDListToString;
import static storm.mesos.util.PrettyProtobuf.offerToString;
import static storm.mesos.util.PrettyProtobuf.offerMapToString;
import static storm.mesos.util.PrettyProtobuf.offerToString;
import static storm.mesos.util.PrettyProtobuf.taskInfoListToString;
import static storm.mesos.util.PrettyProtobuf.taskStatusListToTaskIDsString;

Expand Down Expand Up @@ -222,30 +222,17 @@ void initializeMesosStormConf(Map conf, String localDir) {
_disallowedHosts = listIntoSet((List<String>) conf.get(CONF_MESOS_DISALLOWED_HOSTS));
_enabledLogviewerSidecar = MesosCommon.enabledLogviewerSidecar(conf);

if (_enabledLogviewerSidecar) {
Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS));
String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT));
_logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers";
LOG.info("Logviewer information will be stored under {}", _logviewerZkDir);

if (zkPort == null || zkServerSet == null) {
throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT);
} else {
List<String> zkConnectionList = new ArrayList<>();
for (String server : zkServerSet) {
zkConnectionList.add(String.format("%s:%s", server, zkPort));
}
_zkClient = new ZKClient(StringUtils.join(zkConnectionList, ','));
}
}
initializeZkClient(conf);
_logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers";
LOG.info("Logviewer ZK path: {}", _logviewerZkDir);

Boolean preferReservedResources = (Boolean) conf.get(CONF_MESOS_PREFER_RESERVED_RESOURCES);
if (preferReservedResources != null) {
_preferReservedResources = preferReservedResources;
}

_container = Optional.fromNullable((String) conf.get(CONF_MESOS_CONTAINER_DOCKER_IMAGE));
_mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir);
_mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir, _enabledLogviewerSidecar);

// Generate YAML to be served up to clients
_generatedConfPath = Paths.get(
Expand All @@ -272,6 +259,21 @@ void initializeMesosStormConf(Map conf, String localDir) {
}
}

private void initializeZkClient(Map conf) {
Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS));
String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT));

if (zkPort == null || zkServerSet == null) {
throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT);
} else {
List<String> zkConnectionList = new ArrayList<>();
for (String server : zkServerSet) {
zkConnectionList.add(String.format("%s:%s", server, zkPort));
}
_zkClient = new ZKClient(StringUtils.join(zkConnectionList, ','));
}
}

@SuppressWarnings("unchecked")
protected void startLocalHttpServer() throws Exception {
createLocalServerPort();
Expand All @@ -286,34 +288,32 @@ public void doRegistration(final SchedulerDriver driver, Protos.FrameworkID id)
_state.put(FRAMEWORK_ID, id.getValue());
_offers = new HashMap<Protos.OfferID, Protos.Offer>();

if (_enabledLogviewerSidecar) {

_timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
// performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks
// in the framework scheduler's statusUpdate() method
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir);
if (logviewerPaths == null) {
_driver.reconcileTasks(taskStatuses);
return;
}
for (String path : logviewerPaths) {
TaskID logviewerTaskId = TaskID.newBuilder()
.setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path))))
.build();
TaskStatus logviewerTaskStatus = TaskStatus.newBuilder()
.setTaskId(logviewerTaskId)
.setState(TaskState.TASK_RUNNING)
.build();
taskStatuses.add(logviewerTaskStatus);
}
_timer.scheduleAtFixedRate(new TimerTask() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align this with the rest of the code please

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually aligned with the rest of the code. The diff makes it seem like it isn't. I'm taking the timer thread out of the if (_enabledLogviewerSidecar) block and scheduling it unconditionally. See https://github.com/srikanth-viswanathan/mesos-storm/blob/66dcbbe502aa3438b2a1b107d146e25de058f82c/storm/src/main/storm/mesos/MesosNimbus.java#L291 for the full file.

@Override
public void run() {
// performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks
// in the framework scheduler's statusUpdate() method
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the line of code that is generating the log line that you mention as the visible side-effect of this change?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The more I think about this (and having seen the code run on our systems), I think this trace will be a nuisance, given how frequently the timer task runs (every 5 mins). I'm in favor of adding an explicit check to first test that the logviewer ZNode exists (without the error trace) before trying to get its children. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the incredibly slow response. I like this solution, please implement.

if (logviewerPaths == null || !_enabledLogviewerSidecar) {
_driver.reconcileTasks(taskStatuses);
LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses));
return;
}
}, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes
}

for (String path : logviewerPaths) {
TaskID logviewerTaskId = TaskID.newBuilder()
.setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path))))
.build();
TaskStatus logviewerTaskStatus = TaskStatus.newBuilder()
.setTaskId(logviewerTaskId)
.setState(TaskState.TASK_RUNNING)
.build();
taskStatuses.add(logviewerTaskStatus);
}
_driver.reconcileTasks(taskStatuses);
LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses));
}
}, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes
}

public void shutdown() throws Exception {
Expand Down
43 changes: 36 additions & 7 deletions storm/src/main/storm/mesos/NimbusMesosScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ public class NimbusMesosScheduler implements Scheduler {
private ZKClient zkClient;
private String logviewerZkDir;
private CountDownLatch _registeredLatch = new CountDownLatch(1);
private boolean enableLogViewers;
public static final Logger LOG = LoggerFactory.getLogger(MesosNimbus.class);

public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir) {
public NimbusMesosScheduler(MesosNimbus mesosNimbus, ZKClient zkClient, String logviewerZkDir, boolean enableLogViewers) {
this.mesosNimbus = mesosNimbus;
this.zkClient = zkClient;
this.logviewerZkDir = logviewerZkDir;
this.enableLogViewers = enableLogViewers;
}

public void waitUntilRegistered() throws InterruptedException {
Expand Down Expand Up @@ -127,23 +129,29 @@ private void updateLogviewerState(TaskStatus status) {
}
String nodeId = taskId.split("\\" + MesosCommon.MESOS_COMPONENT_ID_DELIMITER)[1];
String logviewerZKPath = String.format("%s/%s", logviewerZkDir, nodeId);
if (!enableLogViewers) {
LOG.info("Logviewers are disabled. Reaping existing logviewer task {}", taskId);
reapLogviewerTask(logviewerZKPath, status);
return;
}
switch (status.getState()) {
case TASK_STAGING:
checkRunningLogviewerState(logviewerZKPath);
ensureLogviewerZNodeExists(logviewerZKPath);
return;
case TASK_STARTING:
checkRunningLogviewerState(logviewerZKPath);
ensureLogviewerZNodeExists(logviewerZKPath);
return;
case TASK_RUNNING:
checkRunningLogviewerState(logviewerZKPath);
ensureLogviewerZNodeExists(logviewerZKPath);
return;
case TASK_LOST:
// this status update can be triggered by the explicit kill and isn't terminal, do not kill again
break;
default:
// explicitly kill the logviewer task to ensure logviewer is terminated
// explicitly kill the logviewer task to ensure it is terminated
mesosNimbus._driver.killTask(status.getTaskId());
}

// if it gets to this point it means logviewer terminated; update ZK with new logviewer state
if (zkClient.nodeExists(logviewerZKPath)) {
LOG.info("updateLogviewerState: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId);
Expand All @@ -154,13 +162,34 @@ private void updateLogviewerState(TaskStatus status) {
}
}

private void checkRunningLogviewerState(String logviewerZKPath) {
private void ensureLogviewerZNodeExists(String logviewerZKPath) {
if (!zkClient.nodeExists(logviewerZKPath)) {
LOG.error("checkRunningLogviewerState: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper");
LOG.warn("ensureLogviewerZNodeExists: Running mesos logviewer task exists for logviewer that isn't tracked in ZooKeeper");
zkClient.createNode(logviewerZKPath);
}
}

private void reapLogviewerTask(String logviewerZKPath, TaskStatus status) {
String taskId = status.getTaskId().getValue();
if (zkClient.nodeExists(logviewerZKPath)) {
LOG.info("reapLogviewerTask: Remove logviewer state in zk at {} for logviewer task {}", logviewerZKPath, taskId);
zkClient.deleteNode(logviewerZKPath);
}

switch (status.getState()) {
case TASK_FAILED:
case TASK_FINISHED:
case TASK_KILLED:
case TASK_LOST:
// terminal states
break;
default:
// explicitly kill the logviewer task to ensure it is terminated
LOG.info("reapLogviewerTask: Killing logviewer mesos task {}", logviewerZKPath, taskId);
mesosNimbus._driver.killTask(status.getTaskId());
}
}

@Override
public void frameworkMessage(SchedulerDriver driver, ExecutorID executorId, SlaveID slaveId, byte[] data) {
}
Expand Down