赞
踩
Job.java
的 waitForCompletion
方法
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
Thread.sleep(completionPollIntervalMillis);
}
}
return isSuccessful();
}
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
submitJobInternal
方法
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
... ...
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
... ...
}
YARNRunner.java
的 submitJob
方法
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {
addHistoryToken(ts);
ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts);
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " + diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
}
public ApplicationSubmissionContext createApplicationSubmissionContext(Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
Map<String, LocalResource> localResources = setupLocalResources(jobConf, jobSubmitDir);
ByteBuffer securityTokens = setupSecurityTokens(jobConf, ts);
List<String> vargs = setupAMCommand(jobConf);
ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(jobConf, localResources, securityTokens, vargs);
... ...
return appContext;
}
setupAMCommand
方法
private List<String> setupAMCommand(Configuration jobConf) {
List<String> vargs = new ArrayList<>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java");
Path amTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + amTmpDir);
MRApps.addLog4jSystemProperties(null, vargs, conf);
... ...
String mrAppMasterAdminOptions = jobConf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
vargs.add(mrAppMasterAdminOptions);
String mrAppMasterUserOptions = jobConf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
vargs.add(mrAppMasterUserOptions);
... ...
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR);
return vargs;
}
YARNRunner.java
的 submitApplication
方法
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
}
SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED);
EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED);
while (true) {
try {
ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState();
... ...
} catch (ApplicationNotFoundException ex) {
LOG.info("Re-submit application " + applicationId + "with the same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}
return applicationId;
}
ClientRMService.java
的 submitApplication
方法
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
ApplicationSubmissionContext submissionContext = request.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
... ...
try {
rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId, callerContext, submissionContext.getQueue());
} catch (YarnException e) {
LOG.info("Exception in submitting " + applicationId, e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId, callerContext, submissionContext.getQueue());
throw e;
}
return recordFactory.newRecordInstance(SubmitApplicationResponse.class);
}
首先,在项目的 pom.xml
文件中添加以下依赖项:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>3.1.3</version>
</dependency>
main
)接下来,在 MRAppMaster
类中找到 main
方法:
public static void main(String[] args) {
try {
ContainerId containerId = ContainerId.fromString(args[0]);
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(new CallerContext.Builder(
"mr_appmaster_" + applicationAttemptId.toString()).build());
}
long appSubmitTime = Long.parseLong(args[1]);
MRAppMaster appMaster = new MRAppMaster(
applicationAttemptId, containerId, args[2],
Integer.parseInt(args[3]),
Integer.parseInt(args[4]), appSubmitTime);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.error("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
... ...
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
if (appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
init
)public void init(Configuration conf) {
... ...
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
serviceInit
)protected void serviceInit(final Configuration conf) throws Exception {
... ...
clientService = createClientService(context);
clientService.init(conf);
containerAllocator = createContainerAllocator(clientService, context);
... ...
}
start
)public void start() {
if (isInState(STATE.STARTED)) {
return;
}
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
LOG.debug("Service {} is started", getName());
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
serviceStart
)protected void serviceStart() throws Exception {
... ...
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
jobEventDispatcher.handle(initFailedEvent);
} else {
startJobs();
}
}
startJobs
)1protected void startJobs() {
2 JobEvent startJobEvent = new JobStartEvent(job.getID(),
3 recoveredJobStartTime);
4 dispatcher.getEventHandler().handle(startJobEvent);
5}
handle
)class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
... ...
try {
eventQueue.put(event);
} catch (InterruptedException e) {
... ...
}
};
}
在 YarnChild
类中找到 main
方法:
public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
LOG.debug("Child starting");
... ...
task = myTask.getTask();
YarnChild.taskid = task.getTaskID();
... ...
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // 执行任务 (MapTask 或 ReduceTask)
return null;
}
});
... ...
}
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
... ...
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
output.close(mapperContext);
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
在 YarnChild
类中查找 run
方法实现类 ReduceTask.java
:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
... ...
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
... ...
try {
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
Iterator<VALUEIN> iter = context.getValues().iterator();
if (iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>) iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。