-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reap previously launched logviewer tasks #240
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,11 +43,11 @@ | |
import org.apache.mesos.Protos.Resource; | ||
import org.apache.mesos.Protos.TaskID; | ||
import org.apache.mesos.Protos.TaskInfo; | ||
import org.apache.mesos.Protos.TaskState; | ||
import org.apache.mesos.Protos.TaskStatus; | ||
import org.apache.mesos.Protos.Value.Range; | ||
import org.apache.mesos.Protos.Value.Ranges; | ||
import org.apache.mesos.Protos.Value.Scalar; | ||
import org.apache.mesos.Protos.TaskState; | ||
import org.apache.mesos.SchedulerDriver; | ||
import org.json.simple.JSONValue; | ||
import org.slf4j.Logger; | ||
|
@@ -60,8 +60,8 @@ | |
import storm.mesos.resources.ResourceEntry; | ||
import storm.mesos.resources.ResourceNotAvailableException; | ||
import storm.mesos.resources.ResourceType; | ||
import storm.mesos.schedulers.StormSchedulerImpl; | ||
import storm.mesos.schedulers.IMesosStormScheduler; | ||
import storm.mesos.schedulers.StormSchedulerImpl; | ||
import storm.mesos.shims.CommandLineShimFactory; | ||
import storm.mesos.shims.ICommandLineShim; | ||
import storm.mesos.shims.LocalStateShim; | ||
|
@@ -91,8 +91,8 @@ | |
import java.util.TimerTask; | ||
|
||
import static storm.mesos.util.PrettyProtobuf.offerIDListToString; | ||
import static storm.mesos.util.PrettyProtobuf.offerToString; | ||
import static storm.mesos.util.PrettyProtobuf.offerMapToString; | ||
import static storm.mesos.util.PrettyProtobuf.offerToString; | ||
import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; | ||
import static storm.mesos.util.PrettyProtobuf.taskStatusListToTaskIDsString; | ||
|
||
|
@@ -222,30 +222,17 @@ void initializeMesosStormConf(Map conf, String localDir) { | |
_disallowedHosts = listIntoSet((List<String>) conf.get(CONF_MESOS_DISALLOWED_HOSTS)); | ||
_enabledLogviewerSidecar = MesosCommon.enabledLogviewerSidecar(conf); | ||
|
||
if (_enabledLogviewerSidecar) { | ||
Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); | ||
String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); | ||
_logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; | ||
LOG.info("Logviewer information will be stored under {}", _logviewerZkDir); | ||
|
||
if (zkPort == null || zkServerSet == null) { | ||
throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); | ||
} else { | ||
List<String> zkConnectionList = new ArrayList<>(); | ||
for (String server : zkServerSet) { | ||
zkConnectionList.add(String.format("%s:%s", server, zkPort)); | ||
} | ||
_zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); | ||
} | ||
} | ||
initializeZkClient(conf); | ||
_logviewerZkDir = Optional.fromNullable((String) conf.get(Config.STORM_ZOOKEEPER_ROOT)).or("") + "/storm-mesos/logviewers"; | ||
LOG.info("Logviewer ZK path: {}", _logviewerZkDir); | ||
|
||
Boolean preferReservedResources = (Boolean) conf.get(CONF_MESOS_PREFER_RESERVED_RESOURCES); | ||
if (preferReservedResources != null) { | ||
_preferReservedResources = preferReservedResources; | ||
} | ||
|
||
_container = Optional.fromNullable((String) conf.get(CONF_MESOS_CONTAINER_DOCKER_IMAGE)); | ||
_mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir); | ||
_mesosScheduler = new NimbusMesosScheduler(this, _zkClient, _logviewerZkDir, _enabledLogviewerSidecar); | ||
|
||
// Generate YAML to be served up to clients | ||
_generatedConfPath = Paths.get( | ||
|
@@ -272,6 +259,21 @@ void initializeMesosStormConf(Map conf, String localDir) { | |
} | ||
} | ||
|
||
private void initializeZkClient(Map conf) { | ||
Set<String> zkServerSet = listIntoSet((List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS)); | ||
String zkPort = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_PORT)); | ||
|
||
if (zkPort == null || zkServerSet == null) { | ||
throw new RuntimeException("ZooKeeper configs are not found in storm.yaml: " + Config.STORM_ZOOKEEPER_SERVERS + ", " + Config.STORM_ZOOKEEPER_PORT); | ||
} else { | ||
List<String> zkConnectionList = new ArrayList<>(); | ||
for (String server : zkServerSet) { | ||
zkConnectionList.add(String.format("%s:%s", server, zkPort)); | ||
} | ||
_zkClient = new ZKClient(StringUtils.join(zkConnectionList, ',')); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected void startLocalHttpServer() throws Exception { | ||
createLocalServerPort(); | ||
|
@@ -286,34 +288,32 @@ public void doRegistration(final SchedulerDriver driver, Protos.FrameworkID id) | |
_state.put(FRAMEWORK_ID, id.getValue()); | ||
_offers = new HashMap<Protos.OfferID, Protos.Offer>(); | ||
|
||
if (_enabledLogviewerSidecar) { | ||
|
||
_timer.scheduleAtFixedRate(new TimerTask() { | ||
@Override | ||
public void run() { | ||
// performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks | ||
// in the framework scheduler's statusUpdate() method | ||
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); | ||
List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir); | ||
if (logviewerPaths == null) { | ||
_driver.reconcileTasks(taskStatuses); | ||
return; | ||
} | ||
for (String path : logviewerPaths) { | ||
TaskID logviewerTaskId = TaskID.newBuilder() | ||
.setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) | ||
.build(); | ||
TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() | ||
.setTaskId(logviewerTaskId) | ||
.setState(TaskState.TASK_RUNNING) | ||
.build(); | ||
taskStatuses.add(logviewerTaskStatus); | ||
} | ||
_timer.scheduleAtFixedRate(new TimerTask() { | ||
@Override | ||
public void run() { | ||
// performing "explicit" reconciliation; master will respond with the latest state for all logviewer tasks | ||
// in the framework scheduler's statusUpdate() method | ||
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>(); | ||
List<String> logviewerPaths = _zkClient.getChildren(_logviewerZkDir); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the line of code that is generating the log line that you mention as the visible side-effect of this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. The more I think about this (and having seen the code run on our systems), I think this trace will be a nuisance, given how frequently the timer task runs (every 5 mins). I'm in favor of adding an explicit check to first test that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the incredibly slow response. I like this solution, please implement. |
||
if (logviewerPaths == null || !_enabledLogviewerSidecar) { | ||
_driver.reconcileTasks(taskStatuses); | ||
LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); | ||
return; | ||
} | ||
}, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes | ||
} | ||
|
||
for (String path : logviewerPaths) { | ||
TaskID logviewerTaskId = TaskID.newBuilder() | ||
.setValue(new String(_zkClient.getNodeData(String.format("%s/%s", _logviewerZkDir, path)))) | ||
.build(); | ||
TaskStatus logviewerTaskStatus = TaskStatus.newBuilder() | ||
.setTaskId(logviewerTaskId) | ||
.setState(TaskState.TASK_RUNNING) | ||
.build(); | ||
taskStatuses.add(logviewerTaskStatus); | ||
} | ||
_driver.reconcileTasks(taskStatuses); | ||
LOG.info("Performing task reconciliation between scheduler and master on following tasks: {}", taskStatusListToTaskIDsString(taskStatuses)); | ||
} | ||
}, 0, TASK_RECONCILIATION_INTERVAL); // reconciliation performed every 5 minutes | ||
} | ||
|
||
public void shutdown() throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align this with the rest of the code please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually aligned with the rest of the code. The diff makes it seem like it isn't. I'm taking the timer thread out of the
if (_enabledLogviewerSidecar)
block and scheduling it unconditionally. See https://github.com/srikanth-viswanathan/mesos-storm/blob/66dcbbe502aa3438b2a1b107d146e25de058f82c/storm/src/main/storm/mesos/MesosNimbus.java#L291 for the full file.