-
Notifications
You must be signed in to change notification settings - Fork 859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enabling Dr Elephant to work with YARN in HTTPS mode #475
base: master
Are you sure you want to change the base?
Changes from 3 commits
d99939e
3466364
931f880
2bad70c
88256b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,12 +37,17 @@ | |
*/ | ||
public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator { | ||
private static final Logger logger = Logger.getLogger(AnalyticJobGeneratorHadoop2.class); | ||
private static final String RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address"; | ||
private static final String RM_HTTP_POLICY = "yarn.http.policy"; | ||
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled"; | ||
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids"; | ||
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info"; | ||
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis"; | ||
|
||
|
||
//Assigning URL's a header of http:// or https:// depending if YARN https is enabled or not | ||
private static String RM_URL_HEADER; | ||
private static String RESOURCE_MANAGER_ADDRESS; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any specific reason to make these 3 variables static? In fact, RESOURCE_MANAGER_ADDRESS may not even need to be a member variable. |
||
private static String RM_NODE_STATE_URL; | ||
shahrukhkhan489 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static Configuration configuration; | ||
|
||
// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS | ||
|
@@ -67,6 +72,31 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator { | |
private final ArrayList<AnalyticJob> _secondRetryQueue = new ArrayList<AnalyticJob>(); | ||
|
||
public void updateResourceManagerAddresses() { | ||
|
||
shahrukhkhan489 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String rm_http_policy = configuration.get(RM_HTTP_POLICY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please follow naming convention There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move your code to seperate method , as it makes this method really long There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
if (rm_http_policy == null) { | ||
throw new RuntimeException( | ||
"Cannot get YARN HTTP Policy [" + rm_http_policy + "] from Hadoop Configuration property: [" + RM_HTTP_POLICY | ||
+ "]."); | ||
} | ||
|
||
if (rm_http_policy.equals("HTTP_ONLY")) { | ||
RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address"; | ||
RM_URL_HEADER = "http://"; | ||
RM_NODE_STATE_URL = RM_URL_HEADER + "%s/ws/v1/cluster/info"; | ||
} else if (rm_http_policy.equals("HTTPS_ONLY")) { | ||
RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.https.address"; | ||
RM_URL_HEADER = "https://"; | ||
RM_NODE_STATE_URL = RM_URL_HEADER + "%s/ws/v1/cluster/info"; | ||
} | ||
|
||
if (RESOURCE_MANAGER_ADDRESS == null) { | ||
throw new RuntimeException( | ||
"RESOURCE_MANAGER_ADDRESS is not assigned - [" + RESOURCE_MANAGER_ADDRESS | ||
+ "] as [" + RM_HTTP_POLICY + "] = [" + rm_http_policy + "]."); | ||
} | ||
|
||
if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) { | ||
String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS); | ||
if (resourceManagers != null) { | ||
|
@@ -139,7 +169,7 @@ public List<AnalyticJob> fetchAnalyticJobs() | |
+ ", and current time: " + _currentTime); | ||
|
||
// Fetch all succeeded apps | ||
URL succeededAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format( | ||
URL succeededAppsURL = new URL(new URL(RM_URL_HEADER + _resourceManagerAddress), String.format( | ||
"/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s", | ||
String.valueOf(_lastTime + 1), String.valueOf(_currentTime))); | ||
logger.info("The succeeded apps URL is " + succeededAppsURL); | ||
|
@@ -149,7 +179,7 @@ public List<AnalyticJob> fetchAnalyticJobs() | |
// Fetch all failed apps | ||
// state: Application Master State | ||
// finalStatus: Status of the Application as reported by the Application Master | ||
URL failedAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format( | ||
URL failedAppsURL = new URL(new URL(RM_URL_HEADER + _resourceManagerAddress), String.format( | ||
"/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s", | ||
String.valueOf(_lastTime + 1), String.valueOf(_currentTime))); | ||
List<AnalyticJob> failedApps = readApps(failedAppsURL); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,27 +40,49 @@ | |
public class TezFetcher implements ElephantFetcher<TezApplicationData> { | ||
|
||
private static final Logger logger = Logger.getLogger(TezFetcher.class); | ||
|
||
private static final String TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.address"; | ||
private static final String RM_HTTP_POLICY = "yarn.http.policy"; | ||
|
||
private URLFactory _urlFactory; | ||
private JSONFactory _jsonFactory; | ||
private String _timelineWebAddr; | ||
|
||
private FetcherConfigurationData _fetcherConfigurationData; | ||
private static String RM_URL_HEADER; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be TIMELINE_SERVER_URL_PREFIX/SCHEME |
||
private static String TIMELINE_SERVER_URL; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be static? |
||
|
||
public TezFetcher(FetcherConfigurationData fetcherConfData) throws IOException { | ||
|
||
String rm_http_policy = new Configuration().get(RM_HTTP_POLICY); | ||
|
||
if (rm_http_policy == null) { | ||
throw new RuntimeException( | ||
"Cannot get YARN HTTP Policy [" + rm_http_policy + "] from Hadoop Configuration property: [" + RM_HTTP_POLICY | ||
+ "]."); | ||
} | ||
|
||
if (rm_http_policy.equals("HTTP_ONLY")) { | ||
TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.address"; | ||
RM_URL_HEADER = "http://"; | ||
} else if (rm_http_policy.equals("HTTPS_ONLY")) { | ||
TIMELINE_SERVER_URL = "yarn.timeline-service.webapp.https.address"; | ||
RM_URL_HEADER = "https://"; | ||
} | ||
|
||
if (TIMELINE_SERVER_URL == null) { | ||
throw new RuntimeException( | ||
"TIMELINE_SERVER_URL is not assigned - [" + TIMELINE_SERVER_URL | ||
+ "] as [" + RM_HTTP_POLICY + "] = [" + rm_http_policy + "]."); | ||
} | ||
|
||
public TezFetcher(FetcherConfigurationData fetcherConfData) throws IOException { | ||
this._fetcherConfigurationData = fetcherConfData; | ||
final String applicationHistoryAddr = new Configuration().get(TIMELINE_SERVER_URL); | ||
|
||
//Connection validity checked using method verifyURL(_timelineWebAddr) inside URLFactory constructor; | ||
_urlFactory = new URLFactory(applicationHistoryAddr); | ||
_urlFactory = new URLFactory(applicationHistoryAddr,RM_URL_HEADER); | ||
logger.info("Connection success."); | ||
|
||
_jsonFactory = new JSONFactory(); | ||
_timelineWebAddr = "http://" + _timelineWebAddr + "/ws/v1/timeline/"; | ||
_timelineWebAddr = RM_URL_HEADER + _timelineWebAddr + "/ws/v1/timeline/"; | ||
|
||
} | ||
|
||
|
@@ -145,8 +167,8 @@ private class URLFactory { | |
|
||
private String _timelineWebAddr; | ||
|
||
private URLFactory(String hserverAddr) throws IOException { | ||
_timelineWebAddr = "http://" + hserverAddr + "/ws/v1/timeline"; | ||
private URLFactory(String hserverAddr,String RM_URL_HEADER) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kindy follow naming convention for method parameters. |
||
_timelineWebAddr = RM_URL_HEADER + hserverAddr + "/ws/v1/timeline"; | ||
verifyURL(_timelineWebAddr); | ||
} | ||
|
||
|
@@ -397,4 +419,4 @@ else if (!isMapTask && name.equals("SHUFFLE_PHASE_TIME")){ | |
return time; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http:// or https:// refers to URL scheme instead of header. Rename the variable name accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating object name from
RM_URL_HEADER
toRM_URL_SCHEME