当前位置:   article > 正文

Spark调度系列-----5.Spark task和Stage的跳过执行(ui显示task和stage skipped)_spark skipped

spark skipped

      在spark的首页ui上经常显示任务和Stage被skipped,如以下截图所式:



本文将阐述什么情况下Stage或者Task会显示为skipped,以及stage和task显示为skipped的时候是否spark application执行会出问题?

Spark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCompletion方法会发送SparkListenerJobEnd事件,源码如下:

  1. private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
  2. val task = event.task
  3. val stageId = task.stageId
  4. val taskType = Utils.getFormattedClassName(task)
  5. outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
  6. event.taskInfo.attempt, event.reason)
  7. // The success case is dealt with separately below, since we need to compute accumulator
  8. // updates before posting.
  9. if (event.reason != Success) {
  10. val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
  11. listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
  12. event.taskInfo, event.taskMetrics))
  13. }
  14. if (!stageIdToStage.contains(task.stageId)) {
  15. // Skip all the actions if the stage has been cancelled.
  16. return
  17. }
  18. val stage = stageIdToStage(task.stageId)
  19. event.reason match {
  20. case Success =>
  21. listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
  22. event.reason, event.taskInfo, event.taskMetrics))
  23. stage.pendingTasks -= task
  24. task match {
  25. case rt: ResultTask[_, _] =>
  26. // Cast to ResultStage here because it's part of the ResultTask
  27. // TODO Refactor this out to a function that accepts a ResultStage
  28. val resultStage = stage.asInstanceOf[ResultStage]
  29. resultStage.resultOfJob match {
  30. case Some(job) =>
  31. if (!job.finished(rt.outputId)) {
  32. updateAccumulators(event)
  33. job.finished(rt.outputId) = true
  34. job.numFinished += 1
  35. // If the whole job has finished, remove it
  36. if (job.numFinished == job.numPartitions) {//ResultStage所有任务都执行完毕,发送SparkListenerJobEnd事件
  37. markStageAsFinished(resultStage)
  38. cleanupStateForJobAndIndependentStages(job)
  39. listenerBus.post(
  40. SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
  41. }
  42. // taskSucceeded runs some user code that might throw an exception. Make sure
  43. // we are resilient against that.
  44. try {
  45. job.listener.taskSucceeded(rt.outputId, event.result)
  46. } catch {
  47. case e: Exception =>
  48. // TODO: Perhaps we want to mark the resultStage as failed?
  49. job.listener.jobFailed(new SparkDriverExecutionException(e))
  50. }
  51. }
  52. case None =>
  53. logInfo("Ignoring result from " + rt + " because its job has finished")
  54. }


JobProgressListener.onJobEnd方法负责处理SparkListenerJobEnd事件,代码如下:

  1. override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
  2. val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
  3. logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
  4. new JobUIData(jobId = jobEnd.jobId)
  5. }
  6. jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
  7. jobData.stageIds.foreach(pendingStages.remove)
  8. jobEnd.jobResult match {
  9. case JobSucceeded =>
  10. completedJobs += jobData
  11. trimJobsIfNecessary(completedJobs)
  12. jobData.status = JobExecutionStatus.SUCCEEDED
  13. numCompletedJobs += 1
  14. case JobFailed(exception) =>
  15. failedJobs += jobData
  16. trimJobsIfNecessary(failedJobs)
  17. jobData.status = JobExecutionStatus.FAILED
  18. numFailedJobs += 1
  19. }
  20. for (stageId <- jobData.stageIds) {
  21. stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
  22. jobsUsingStage.remove(jobEnd.jobId)
  23. if (jobsUsingStage.isEmpty) {
  24. stageIdToActiveJobIds.remove(stageId)
  25. }
  26. stageIdToInfo.get(stageId).foreach { stageInfo =>
  27. if (stageInfo.submissionTime.isEmpty) {//Job的Stage没有提交执行,则这个Stage和它对应的Task会标记为skipped stage和skipped task进行统计
  28. // if this stage is pending, it won't complete, so mark it as "skipped":
  29. skippedStages += stageInfo
  30. trimStagesIfNecessary(skippedStages)
  31. jobData.numSkippedStages += 1
  32. jobData.numSkippedTasks += stageInfo.numTasks
  33. }
  34. }
  35. }
  36. }
  37. }

StageInfo.submissionTime在Stage被分解成TaskSet,并且TaskSet被提交到TaskSetManager之前进行设置,源码如下:

  1. private def submitMissingTasks(stage: Stage, jobId: Int) {
  2. logDebug("submitMissingTasks(" + stage + ")")
  3. // Get our pending tasks and remember them in our pendingTasks entry
  4. stage.pendingTasks.clear()
  5. // First figure out the indexes of partition ids to compute.
  6. //parititionsToCompute是一个List, 表示一个stage需要compute的所有分区的index
  7. val partitionsToCompute: Seq[Int] = {
  8. stage match {
  9. case stage: ShuffleMapStage =>
  10. (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
  11. case stage: ResultStage =>
  12. val job = stage.resultOfJob.get
  13. (0 until job.numPartitions).filter(id => !job.finished(id))
  14. }
  15. }
  16. val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
  17. runningStages += stage
  18. // SparkListenerStageSubmitted should be posted before testing whether tasks are
  19. // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  20. // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  21. // event.
  22. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
  23. outputCommitCoordinator.stageStart(stage.id)
  24. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  25. // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  26. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  27. // the serialized copy of the RDD and for each task we will deserialize it, which means each
  28. // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  29. // might modify state of objects referenced in their closures. This is necessary in Hadoop
  30. // where the JobConf/Configuration object is not thread-safe.
  31. var taskBinary: Broadcast[Array[Byte]] = null
  32. try {
  33. // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  34. // For ResultTask, serialize and broadcast (rdd, func).
  35. val taskBinaryBytes: Array[Byte] = stage match {
  36. case stage: ShuffleMapStage =>
  37. closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
  38. case stage: ResultStage =>
  39. closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
  40. }
  41. taskBinary = sc.broadcast(taskBinaryBytes)//将任务信息构造成广播变量,广播到每个Executor
  42. } catch {
  43. // In the case of a failure during serialization, abort the stage.
  44. case e: NotSerializableException =>
  45. abortStage(stage, "Task not serializable: " + e.toString)
  46. runningStages -= stage
  47. // Abort execution
  48. return
  49. case NonFatal(e) =>
  50. abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
  51. runningStages -= stage
  52. return
  53. }
  54. //tasks是一个List,它表示一个stage每个task的描述,描述信息为:task所在stage id、task处理的partition、partition所在的主机地址和Executor id
  55. val tasks: Seq[Task[_]] = try {
  56. stage match {
  57. case stage: ShuffleMapStage =>
  58. partitionsToCompute.map { id =>
  59. /*
  60. * 获取task所在的节点,数据所在的节点优先启动任务处理这些数据,在这里用到ShuffleMapStage.
  61. * */
  62. val locs = getPreferredLocs(stage.rdd, id)
  63. val part = stage.rdd.partitions(id)
  64. new ShuffleMapTask(stage.id, taskBinary, part, locs)//taskBinary是广播变量
  65. }
  66. case stage: ResultStage =>
  67. val job = stage.resultOfJob.get
  68. partitionsToCompute.map { id =>
  69. val p: Int = job.partitions(id)
  70. val part = stage.rdd.partitions(p)
  71. val locs = getPreferredLocs(stage.rdd, p)
  72. new ResultTask(stage.id, taskBinary, part, locs, id)
  73. }
  74. }
  75. } catch {
  76. case NonFatal(e) =>
  77. abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
  78. runningStages -= stage
  79. return
  80. }
  81. if (tasks.size > 0) {
  82. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  83. stage.pendingTasks ++= tasks
  84. logDebug("New pending tasks: " + stage.pendingTasks)
  85. taskScheduler.submitTasks(
  86. new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
  87. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())//设置StageInfo的submissionTime成员,表示这个TaskSet会被执行,不会被skipped
  88. } else

Job的Stage没有分解成TaskSet提交执行,则这个Stage和它对应的Task会标记为skipped stage和skipped task进行统计显示。

那种Stage不会分解成TaskSet分解执行呢?

Spark在提交Job的时候,会发送JobSubmitted事件,DAGScheduler.doOnReceive接收到JobSubmitted事件之后,会调用DAGScheduler.handleJobSubmitted方法处理任务提交。

DAGScheduler.handleJobSubmitted首先调用DAGScheduler.newResultStage方法创建最后一个Stage,DAGScheduler.newResultStage通过以下一系列函数调用最终会调用到DAGScheduler.registerShuffleDependencies,这个方法将这个RDD所有的祖宗Stage加入到DAGScheduler.jobIdToStageIds这个HashMap中。然后获取这个Job的每个Stage对应的StageInfo,转换成一个Seq,发送SparkListenerJobStart事件。

DAGScheduler.newResultStage->

DAGScheduler.getParentStagesAndId->

DAGScheduler.getParentStagesAndId->getParentStages

DAGScheduler.getParentStagesAndId->getShuffleMapStage

DAGScheduler.registerShuffleDependencies




DAGScheduler.registerShuffleDependencies首先调用DAGScheduler.getAncestorShuffleDependencies找到当前rdd所有祖宗的rdd依赖,包括父辈、爷爷辈,以致更高辈分的rdd依赖,然后调用DAGScheduler.newOrUsedShuffleStage创建每个祖宗rdd依赖对应的ShuffleMapStage,

  1. private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
  2. val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)//获取所有祖宗rdd依赖,包括父辈、爷爷辈等
  3. while (parentsWithNoMapStage.nonEmpty) {
  4. val currentShufDep = parentsWithNoMapStage.pop()
  5. //根据ShuffleDependency和jobid生成Stage,由于是从栈里面弹出,所以最先添加的是Root stage,依次类推,最先添加的Stage shuffleId越小
  6. val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
  7. shuffleToMapStage(currentShufDep.shuffleId) = stage
  8. }
  9. }


  1. private def newOrUsedShuffleStage(
  2. shuffleDep: ShuffleDependency[_, _, _],
  3. firstJobId: Int): ShuffleMapStage = {
  4. val rdd = shuffleDep.rdd
  5. val numTasks = rdd.partitions.size
  6. val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)//创建stage
  7. if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  8. val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
  9. val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
  10. for (i <- 0 until locs.size) {
  11. stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
  12. }
  13. stage.numAvailableOutputs = locs.count(_ != null)
  14. } else {
  15. // Kind of ugly: need to register RDDs with the cache and map output tracker here
  16. // since we can't do it in the RDD constructor because # of partitions is unknown
  17. logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
  18. mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
  19. }
  20. stage
  21. }

DAGScheduler.newOrUsedShuffleStage会调用DAGScheduler.newShuffleMapStage创建stage。

DAGScheduler.newShuffleMapStage方法创建了stage之后,调用DAGScheduler.updateJobIdStageIdMaps方法将新创建的stage.id加入到DAGScheduler.jobIdToStageIds中。源码如下:

  1. private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
  2. def updateJobIdStageIdMapsList(stages: List[Stage]) {
  3. if (stages.nonEmpty) {
  4. val s = stages.head
  5. s.jobIds += jobId
  6. jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id//将stage id加入到jobIdToStageIds中
  7. val parents: List[Stage] = getParentStages(s.rdd, jobId)
  8. val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
  9. updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
  10. }
  11. }
  12. updateJobIdStageIdMapsList(List(stage))
  13. }

DAGScheduler.handleJobSubmitted源码如下:

 
  1. private[scheduler] def handleJobSubmitted(jobId: Int,
  2. finalRDD: RDD[_],
  3. func: (TaskContext, Iterator[_]) => _,
  4. partitions: Array[Int],
  5. allowLocal: Boolean,
  6. callSite: CallSite,
  7. listener: JobListener,
  8. properties: Properties) {
  9. var finalStage: ResultStage = null
  10. try {
  11. // New stage creation may throw an exception if, for example, jobs are run on a
  12. // HadoopRDD whose underlying HDFS files have been deleted.
  13. finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)//创建ResultStage,在这个方法里面会将这个Job执行过程中,需要可能经历的Stage全部放入到
  14. } catch {
  15. case e: Exception =>
  16. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  17. listener.jobFailed(e)
  18. return
  19. }
  20. if (finalStage != null) {
  21. val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
  22. clearCacheLocs()
  23. logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
  24. job.jobId, callSite.shortForm, partitions.length, allowLocal))
  25. logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
  26. logInfo("Parents of final stage: " + finalStage.parents)
  27. logInfo("Missing parents: " + getMissingParentStages(finalStage))
  28. val shouldRunLocally =
  29. localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
  30. val jobSubmissionTime = clock.getTimeMillis()
  31. if (shouldRunLocally) {
  32. // Compute very short actions like first() or take() with no parent stages locally.
  33. listenerBus.post(
  34. SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
  35. runLocally(job)
  36. } else {
  37. jobIdToActiveJob(jobId) = job
  38. activeJobs += job
  39. finalStage.resultOfJob = Some(job)
  40. val stageIds = jobIdToStageIds(jobId).toArray//获取一个Job对应的所有的Stage id,Job的所有Stage在执行newResultStage的时候会创建,所以在这里能获取成功
  41. val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))//获取每个Stage对应的StageInfo
  42. listenerBus.post(
  43. SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))//发送Job启动事件SparkListenerJobStart
  44. submitStage(finalStage)
  45. }
  46. }
  47. submitWaitingStages()
  48. }

JobProgressListener.onJobStart负责接收处理SparkListenerJobStart事件。它会把 DAGScheduler.handleJobSubmitted方法创建的所有StageInfo信息放到JobProgressListener.stageIdToInfo这个HashMap中。

至此可以得出结论:JobProgressListener.onJobEnd方法中,处理的obProgressListener.stageIdToInfo信息是执行DAGScheduler.handleJobSubmitted产生的。在Job对应的所有Stage分解成Task之前就已经产生了。


根据本人的

Spark storage系列------3.Spark cache数据块之后对后继Job任务调度的影响,以及后继Job Task执行的影响

文章可以知道,在将Stage分解成TaskSet的时候,如果一个RDD已经Cache到了BlockManager,则这个RDD对应的所有祖宗Stage都不会分解成TaskSet进行执行,所以这些祖宗Stage对应的StageInfo.submissionTime.isEmpty就会返回true,所以这些祖宗Stage和它们对应的Task就会在Spark ui上显示为skipped

Stage执行完成之后,会执行JobProgressListener.onStageCompleted将Stage信息保存到JobProgressListener.stageIdToInfo,源码如下:

  1. override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
  2. val stage = stageCompleted.stageInfo
  3. stageIdToInfo(stage.stageId) = stage//保存Stage的信息,便于跟踪显示
  4. val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
  5. logWarning("Stage completed for unknown stage " + stage.stageId)
  6. new StageUIData
  7. })

Stage对应的TaskSet中所有任务成功执行后,会将Stage对应的StageInfo反馈到JobProgressListener.stageIdToInfo,这样这些任务就不会显示为skipped了



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/776439
推荐阅读
相关标签
  

闽ICP备14008679号