Skip to content

Commit

Permalink
[HOPSWORKS-1022] Fix bug with log aggregation status NOT_START (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
kouzant authored and tkakantousis committed Apr 3, 2019
1 parent 8f3d401 commit 74feebf
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
Expand Down Expand Up @@ -104,7 +108,8 @@ public static void writeLog(DistributedFileSystemOps dfs, String dst,
public static void copyAggregatedYarnLogs(DistributedFileSystemOps dfs, String src, String dst,
String[] desiredLogTypes, YarnMonitor monitor) throws YarnException, IOException, InterruptedException {

LogAggregationStatus logAggregationStatus = waitForLogAggregation(monitor);
LogAggregationStatus logAggregationStatus = waitForLogAggregation(monitor.getYarnClient(),
monitor.getApplicationId());


PrintStream writer = null;
Expand Down Expand Up @@ -148,21 +153,33 @@ public static void copyAggregatedYarnLogs(DistributedFileSystemOps dfs, String s
}
}

public static LogAggregationStatus waitForLogAggregation(YarnMonitor monitor) throws InterruptedException,
YarnException, IOException {
LogAggregationStatus logAggregationStatus = monitor.getLogAggregationStatus();
public static LogAggregationStatus waitForLogAggregation(YarnClient yarnClient, ApplicationId appId)
throws InterruptedException, YarnException, IOException {
LogAggregationStatus logAggregationStatus = yarnClient.getApplicationReport(appId)
.getLogAggregationStatus();

int not_startRetries = 0;
while (!isFinal(logAggregationStatus)) {
Thread.sleep(1000);
logAggregationStatus = monitor.getLogAggregationStatus();
TimeUnit.SECONDS.sleep(1);
logAggregationStatus = yarnClient.getApplicationReport(appId).getLogAggregationStatus();
// NOT_START LogAggregation status might happen in two cases:
// (a) Application has failed very early and status didn't change to FAILED
// (b) Application has succeeded but the moment we probe for status,
// log aggregation hasn't started yet.
if (logAggregationStatus.equals(LogAggregationStatus.NOT_START)) {
if (++not_startRetries > 5) {
break;
}
}
}
return logAggregationStatus;
}

public static boolean isFinal(LogAggregationStatus status){
switch(status){
private static boolean isFinal(LogAggregationStatus status){
switch(status) {
case RUNNING:
case RUNNING_WITH_FAILURE:
case NOT_START:
return false;
default :
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;

Expand Down Expand Up @@ -130,4 +131,8 @@ public void cancelJob(String appid) throws YarnException, IOException {
ApplicationId applicationId = ConverterUtils.toApplicationId(appid);
yarnClientWrapper.getYarnClient().killApplication(applicationId);
}

public YarnClient getYarnClient() {
return yarnClientWrapper.getYarnClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand Down Expand Up @@ -1409,12 +1408,7 @@ private void waitForJobLogs(List<ApplicationReport> projectsApps, YarnClient cli
appReport = client.getApplicationReport(appReport.getApplicationId());
finalState = appReport.getFinalApplicationStatus();
}
LogAggregationStatus logAggregationState = appReport.getLogAggregationStatus();
while (!YarnLogUtil.isFinal(logAggregationState)) {
Thread.sleep(500);
appReport = client.getApplicationReport(appReport.getApplicationId());
logAggregationState = appReport.getLogAggregationStatus();
}
YarnLogUtil.waitForLogAggregation(client, appReport.getApplicationId());
}
}

Expand Down Expand Up @@ -2091,12 +2085,7 @@ public void removeMemberFromTeam(Project project, Users user, String toRemoveEma
appReport = client.getApplicationReport(appReport.getApplicationId());
finalState = appReport.getFinalApplicationStatus();
}
LogAggregationStatus logAggregationState = appReport.getLogAggregationStatus();
while (!YarnLogUtil.isFinal(logAggregationState)) {
Thread.sleep(500);
appReport = client.getApplicationReport(appReport.getApplicationId());
logAggregationState = appReport.getLogAggregationStatus();
}
YarnLogUtil.waitForLogAggregation(client, appReport.getApplicationId());
}
} catch (YarnException | IOException | InterruptedException e) {
throw new ProjectException(RESTCodes.ProjectErrorCode.KILL_MEMBER_JOBS, Level.SEVERE,
Expand Down

0 comments on commit 74feebf

Please sign in to comment.