Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling Dr Elephant to work with YARN in HTTPS mode #475

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@
*/
public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final Logger logger = Logger.getLogger(AnalyticJobGeneratorHadoop2.class);
private static final String RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address";
private static final String RM_HTTP_POLICY = "yarn.http.policy";
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";


//Assigning URL's a header of http:// or https:// depending if YARN https is enabled or not
private static String RM_URL_HEADER;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http:// or https:// refers to URL scheme instead of header. Rename the variable name accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating object name from RM_URL_HEADER to RM_URL_SCHEME

private static String RESOURCE_MANAGER_ADDRESS;
Copy link
Contributor

@varunsaxena varunsaxena Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to make these 3 variables static? In fact, RESOURCE_MANAGER_ADDRESS may not even need to be a member variable.

private static String RM_NODE_STATE_URL;
shahrukhkhan489 marked this conversation as resolved.
Show resolved Hide resolved

private static Configuration configuration;

// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
Expand All @@ -67,6 +72,31 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private final ArrayList<AnalyticJob> _secondRetryQueue = new ArrayList<AnalyticJob>();

public void updateResourceManagerAddresses() {

shahrukhkhan489 marked this conversation as resolved.
Show resolved Hide resolved
String rm_http_policy = configuration.get(RM_HTTP_POLICY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please follow naming convention

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move your code to seperate method , as it makes this method really long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (rm_http_policy == null) {
throw new RuntimeException(
"Cannot get YARN HTTP Policy [" + rm_http_policy + "] from Hadoop Configuration property: [" + RM_HTTP_POLICY
+ "].");
}

if (rm_http_policy.equals("HTTP_ONLY")) {
RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address";
RM_URL_HEADER = "http://";
RM_NODE_STATE_URL = RM_URL_HEADER + "%s/ws/v1/cluster/info";
} else if (rm_http_policy.equals("HTTPS_ONLY")) {
RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.https.address";
RM_URL_HEADER = "https://";
RM_NODE_STATE_URL = RM_URL_HEADER + "%s/ws/v1/cluster/info";
}

if (RESOURCE_MANAGER_ADDRESS == null) {
throw new RuntimeException(
"RESOURCE_MANAGER_ADDRESS is not assigned - [" + RESOURCE_MANAGER_ADDRESS
+ "] as [" + RM_HTTP_POLICY + "] = [" + rm_http_policy + "].");
}

if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS);
if (resourceManagers != null) {
Expand Down Expand Up @@ -139,7 +169,7 @@ public List<AnalyticJob> fetchAnalyticJobs()
+ ", and current time: " + _currentTime);

// Fetch all succeeded apps
URL succeededAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format(
URL succeededAppsURL = new URL(new URL(RM_URL_HEADER + _resourceManagerAddress), String.format(
"/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s",
String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
logger.info("The succeeded apps URL is " + succeededAppsURL);
Expand All @@ -149,7 +179,7 @@ public List<AnalyticJob> fetchAnalyticJobs()
// Fetch all failed apps
// state: Application Master State
// finalStatus: Status of the Application as reported by the Application Master
URL failedAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format(
URL failedAppsURL = new URL(new URL(RM_URL_HEADER + _resourceManagerAddress), String.format(
"/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s",
String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
List<AnalyticJob> failedApps = readApps(failedAppsURL);
Expand Down
38 changes: 30 additions & 8 deletions app/com/linkedin/drelephant/tez/fetchers/TezFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,49 @@
public class TezFetcher implements ElephantFetcher<TezApplicationData> {

private static final Logger logger = Logger.getLogger(TezFetcher.class);

private static final String TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.address";
private static final String RM_HTTP_POLICY = "yarn.http.policy";

private URLFactory _urlFactory;
private JSONFactory _jsonFactory;
private String _timelineWebAddr;

private FetcherConfigurationData _fetcherConfigurationData;
private static String RM_URL_HEADER;
Copy link
Contributor

@varunsaxena varunsaxena Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be TIMELINE_SERVER_URL_PREFIX/SCHEME

private static String TIMELINE_SERVER_URL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be static?


public TezFetcher(FetcherConfigurationData fetcherConfData) throws IOException {

String rm_http_policy = new Configuration().get(RM_HTTP_POLICY);

if (rm_http_policy == null) {
throw new RuntimeException(
"Cannot get YARN HTTP Policy [" + rm_http_policy + "] from Hadoop Configuration property: [" + RM_HTTP_POLICY
+ "].");
}

if (rm_http_policy.equals("HTTP_ONLY")) {
TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.address";
RM_URL_HEADER = "http://";
} else if (rm_http_policy.equals("HTTPS_ONLY")) {
TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.https.address";
RM_URL_HEADER = "https://";
}

if (TIMELINE_SERVER_URL == null) {
throw new RuntimeException(
"TIMELINE_SERVER_URL is not assigned - [" + TIMELINE_SERVER_URL
+ "] as [" + RM_HTTP_POLICY + "] = [" + rm_http_policy + "].");
}

public TezFetcher(FetcherConfigurationData fetcherConfData) throws IOException {
this._fetcherConfigurationData = fetcherConfData;
final String applicationHistoryAddr = new Configuration().get(TIMELINE_SERVER_URL);

//Connection validity checked using method verifyURL(_timelineWebAddr) inside URLFactory constructor;
_urlFactory = new URLFactory(applicationHistoryAddr);
_urlFactory = new URLFactory(applicationHistoryAddr,RM_URL_HEADER);
logger.info("Connection success.");

_jsonFactory = new JSONFactory();
_timelineWebAddr = "http://" + _timelineWebAddr + "/ws/v1/timeline/";
_timelineWebAddr = RM_URL_HEADER + _timelineWebAddr + "/ws/v1/timeline/";

}

Expand Down Expand Up @@ -145,8 +167,8 @@ private class URLFactory {

private String _timelineWebAddr;

private URLFactory(String hserverAddr) throws IOException {
_timelineWebAddr = "http://" + hserverAddr + "/ws/v1/timeline";
private URLFactory(String hserverAddr,String RM_URL_HEADER) throws IOException {
Copy link
Contributor

@varunsaxena varunsaxena Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kindy follow naming convention for method parameters.

_timelineWebAddr = RM_URL_HEADER + hserverAddr + "/ws/v1/timeline";
verifyURL(_timelineWebAddr);
}

Expand Down Expand Up @@ -397,4 +419,4 @@ else if (!isMapTask && name.equals("SHUFFLE_PHASE_TIME")){
return time;
}
}
}
}