官术网_书友最值得收藏!

8.4 ShuffleMapTask和ResultTask處理結果是如何被Driver管理的

Spark Job中,根據Task所處Stage的位置,我們將Task分為兩類:第一類叫shuffleMapTask,指Task所處的Stage不是最后一個Stage,也就是Stage的計算結果還沒有輸出,而是通過Shuffle交給下一個Stage使用;第二類叫resultTask,指Task所處Stage是DAG中最后的一個Stage,也就是Stage計算結果需要進行輸出等操作,計算到此為止已經結束。簡單地說,Spark Job中除了最后一個Stage的Task叫resultTask,其他所有Task都叫ShuffleMapTask。

8.4.1 ShuffleMapTask執行結果和Driver的交互原理及源碼詳解

Driver中的CoarseGrainedSchedulerBackend給CoarseGrainedExecutorBackend發送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后會調用executor.launchTask。通過launchTask執行Task,launchTask方法中根據傳入的參數:taskId、嘗試次數、任務名稱、序列化后的任務創建一個TaskRunner,在threadPool中執行TaskRunner。TaskRunner內部會先做一些準備工作,如反序列化Task的依賴,通過網絡獲取需要的文件、Jar等;然后調用反序列化后的Task.run方法來執行任務并獲得執行結果。

其中,Task的run方法調用的時候會導致Task的抽象方法runTask的調用,Task.scala的runTask方法是一個抽象方法。Task包括ResultTask、ShuffleMapTask兩種Task,抽象runTask方法具體的實現由子類的runTask實現。ShuffleMapTask的runTask實際運行的時候會調用RDD的iterator,然后針對Partition進行計算。

ShuffleMapTask.scala的源碼如下。

1.      override def runTask(context: TaskContext): MapStatus = {
2.   ......
3.      val ser = SparkEnv.get.closureSerializer.newInstance()
4.      val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _,
        _])](
5.      ......
6.        val manager = SparkEnv.get.shuffleManager
7.       writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId,
         context)
8.        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator
          [_ <: Product2[Any, Any]]])
9.        writer.stop(success = true).get
10.    ......

ShuffleMapTask方法中調用ShuffleManager寫入器writer方法,在write時最終計算會調用RDD的compute方法。通過writer.stop(success = true).get,如果寫入成功,就返回MapStatus結果值。

SortShuffleWriter.scala的源碼如下。

1.  override def write(records: Iterator[Product2[K, V]]): Unit = {
2.  ......
3.        val blockId = ShuffleBlockId(dep.shuffleId, mapId,
          IndexShuffleBlockResolver.NOOP_REDUCE_ID)
4.        val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
5.       shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId,
         partitionLengths, tmp)
6.        mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
7.   ......
8.   override def stop(success: Boolean): Option[MapStatus] = {
9.        ......
10.       if (success) {
11.         return Option(mapStatus)
12.       } else {
13.         return None
14.       }
15. ......

回到TaskRunner的run方法,把task.run執行結果通過resultSer.serialize(value)序列化,生成一個directResult。然后根據大小判斷不同的結果賦值給serializedResult,傳回給Driver。

(1)如果任務執行結果特別大,超過1GB,日志就提示超出任務大小限制,返回元數據ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。

Executor.scala的源碼如下。

1.  if (maxResultSize > 0 && resultSize > maxResultSize) {
2.            logWarning(s"Finished $taskName (TID $taskId). Result is larger
              than maxResultSize " + s"(${Utils.bytesToString(resultSize)} >
              ${Utils.bytesToString (maxResultSize)}), " + s"dropping it.")
3.
4.            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId
              (taskId), resultSize))

(2)如果任務執行結果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元數據ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。

Executor.scala的源碼如下。

1.  .......
2.    } else if (resultSize > maxDirectResultSize) {
3.              val blockId = TaskResultBlockId(taskId)
4.              env.blockManager.putBytes(
5.                blockId,
6.                new ChunkedByteBuffer(serializedDirectResult.duplicate()),
7.                StorageLevel.MEMORY_AND_DISK_SER)
8.              logInfo(
9.                s"Finished $taskName (TID $taskId). $resultSize bytes result
                  sent via BlockManager)")
10.             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

(3)如果任務執行結果小于128MB,就直接返回serializedDirectResult。

Executor.scala的源碼如下。

1.  .......
2.  } else {
3.            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes
              result sent to driver")
4.            serializedDirectResult
5.  ......

接下來,TaskRunner的run方法中調用execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)給Driver發送一個消息,消息中將taskId、TaskState.FINISHED、serializedResult傳進去。這里,execBackend是CoarseGrainedExecutorBackend。

Executor.scala的源碼如下。

1.      override def run(): Unit = {
2.   .....
3.          execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
4.  ......

CoarseGrainedExecutorBackend的statusUpdate方法的源碼如下。

1.  override def statusUpdate(taskId: Long, state: TaskState, data:
    ByteBuffer) {
2.    val msg = StatusUpdate(executorId, taskId, state, data)
3.    driver match {
4.      case Some(driverRef) => driverRef.send(msg)
5.      case None => logWarning(s"Drop $msg because has not yet connected
        to driver")
6.    }
7.  }

CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate來傳輸執行結果。DriverEndpoint是一個ThreadSafeRpcEndpoint消息循環體,模式匹配收到StatusUpdate消息,調用scheduler.statusUpdate(taskId, state, data.value)方法執行。這里的scheduler是TaskSchedulerImpl。

CoarseGrainedSchedulerBackend.scala的DriverEndpoint的源碼如下。

1.    override def receive: PartialFunction[Any, Unit] = {
2.  case StatusUpdate(executorId, taskId, state, data) =>
3.    scheduler.statusUpdate(taskId, state, data.value)

DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,交給TaskResultGetter內部,通過線程去分別處理Task執行成功和失敗時的不同情況,然后告訴DAGScheduler任務處理結束的狀況。

TaskSchedulerImpl.scala的statusUpdate的源碼如下。

1.  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
2.  ........
3.    if (TaskState.isFinished(state)) {
4.                cleanupTaskState(tid)
5.                taskSet.removeRunningTask(tid)
6.                if (state == TaskState.FINISHED) {
7.                  taskResultGetter.enqueueSuccessfulTask(taskSet, tid,
                    serializedData)
8.                } else if (Set(TaskState.FAILED, TaskState.KILLED,
                  TaskState.LOST).contains(state)) {
9.                  taskResultGetter.enqueueFailedTask(taskSet, tid, state,
                    serializedData)
10.               }
11.             }

TaskResultGetter.scala的enqueueSuccessfulTask方法中,開辟一條新線程處理成功任務,對結果進行相應的處理后調用scheduler.handleSuccessfulTask。

TaskSchedulerImpl的handleSuccessfulTask的源碼如下。

1.   def handleSuccessfulTask(
2.      taskSetManager: TaskSetManager,
3.      tid: Long,
4.      taskResult: DirectTaskResult[_]): Unit = synchronized {
5.    taskSetManager.handleSuccessfulTask(tid, taskResult)
6.  }

TaskSchedulerImpl的handleSuccessfulTask交給TaskSetManager調用handleSuccessfulTask。

TaskSetManager的handleSuccessfulTask的源碼如下。

1.   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
2.     ......
3.    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
      result.accumUpdates, info)
4.  ......
5.

handleSuccessfulTask方法中調用sched.dagScheduler.taskEnded,taskEnded由TaskSetManager調用,匯報任務完成或者失敗。將任務完成的事件CompletionEvent放入eventProcessLoop事件處理循環中。

DAGScheduler.scala的源碼如下。

1.  def taskEnded(
2.       task: Task[_],
3.       reason: TaskEndReason,
4.       result: Any,
5.       accumUpdates: Seq[AccumulatorV2[_, _]],
6.       taskInfo: TaskInfo): Unit = {
7.     eventProcessLoop.post(
8.       CompletionEvent(task, reason, result, accumUpdates, taskInfo))
9.   }

由事件循環線程讀取消息,并調用DAGSchedulerEventProcessLoop.onReceive方法進行消息處理。

DAGScheduler.scala的源碼如下。

1.    override def onReceive(event: DAGSchedulerEvent): Unit = {
2.    val timerContext = timer.time()
3.    try {
4.      doOnReceive(event)
5.    } finally {
6.      timerContext.stop()
7.    }
8.  }

onReceive中調用doOnReceive(event)方法,模式匹配到CompletionEvent,調用dagScheduler.handleTaskCompletion方法。

DAGScheduler.scala的源碼如下。

1.    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
        properties) =>
3.        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions,
          callSite, listener, properties)
4.  ......
5.    case completion: CompletionEvent =>
6.        dagScheduler.handleTaskCompletion(completion)
7.  .....

DAGScheduler.handleTaskCompletion中task執行成功的情況,根據ShuffleMapTask和ResultTask兩種情況分別處理。其中,ShuffleMapTask將MapStatus匯報給MapOutTracker。

Spark 2.1.1版本的DAGScheduler的handleTaskCompletion的源碼如下。

1.       private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
2.  ......
3.    val stage = stageIdToStage(task.stageId)
4.      event.reason match {
5.        case Success =>
6.          stage.pendingPartitions -= task.partitionId
7.          task match {
8.  ......
9.    case smt: ShuffleMapTask =>
10.             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
11.             updateAccumulators(event)
12.             val status = event.result.asInstanceOf[MapStatus]
13.             val execId = status.location.executorId
14.             logDebug("ShuffleMapTask finished on " + execId)
15.             if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch
                (execId)) {
16.               logInfo(s"Ignoring      possibly    bogus    $smt   completion    from
                  executor $execId")
17.             } else {
18.               shuffleStage.addOutputLoc(smt.partitionId, status)
19.             }
20.
21.             if   (runningStages.contains(shuffleStage)           &&  shuffleStage.
                pendingPartitions.isEmpty) {
22.               markStageAsFinished(shuffleStage)
23.               logInfo("looking for newly runnable stages")
24.               logInfo("running: " + runningStages)
25.       logInfo("waiting: " + waitingStages)
26.       logInfo("failed: " + failedStages)
27.       /**
            * 我們設置為true,來遞增紀元編號,以防止map輸出重新計算。在這種情
            * 況下,一些節點可能已經緩存了損壞的位置(從我們檢測到的錯誤),將需
            * 要紀元編號遞增來重取它們。待辦事項:如果這不是第一次,那么只增加紀
            * 元編號,我們注冊了map輸出
28.         */
29.       mapOutputTracker.registerMapOutputs(
30.         shuffleStage.shuffleDep.shuffleId,
31.         shuffleStage.outputLocInMapOutputTrackerFormat(),
32.         changeEpoch = true)
33.       clearCacheLocs()
34.       if (!shuffleStage.isAvailable) {
35.         //有些任務已經失敗了,重新提交shuffleStage
36.         //待辦事項:低級調度器也應該處理這個問題
37.         logInfo("Resubmitting " + shuffleStage + " (" +
            shuffleStage.name +
38.           ") because some of its tasks had failed: " +
39.           shuffleStage.findMissingPartitions().mkString(", "))
40.         submitStage(shuffleStage)
41.       } else {
42.         //標識任何map階段的作業都在這個階段等待完成
43.         if (shuffleStage.mapStageJobs.nonEmpty) {
44.           val stats = mapOutputTracker.getStatistics(shuffleStage.
              shuffleDep)
45.           for (job <- shuffleStage.mapStageJobs) {
46.             markMapStageJobAsFinished(job, stats)
47.           }
48.         }
49.         submitWaitingChildStages(shuffleStage)
50.       }
51.     }
52.  }

Spark 2.2.0版本的DAGScheduler的handleTaskCompletion的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第6行刪掉stage.pendingPartitions -= task.partitionId。

 上段代碼中第14行之后新增if的邏輯判斷,如果stageIdToStage(task.stageId). latestInfo.attemptId等于task.stageAttemptId,則執行shuffleStage.pendingPartitions -= task.partitionId。

1.         case smt: ShuffleMapTask =>
2.            ......
3.              if    (stageIdToStage(task.stageId).latestInfo.attemptId               ==
                task.stageAttemptId) {
4.   /**
       * 任務Task是當前stage正在嘗試做的。因為任務在TaskSetManager下順利完成,其
       * 標記為不再等待TaskSetManager把任務完成,當任務的epoch較小時,需忽略輸出。
       * 在這種情況下,掛起的分區為空時,仍然會丟失輸出位置,這將導致DAGScheduler 重新
       * 提交下面的stage
5.     */
6.                shuffleStage.pendingPartitions -= task.partitionId
7.              }
8.  ......

8.4.2 ResultTask執行結果與Driver的交互原理及源碼詳解

Task的run方法調用的時候會導致Task的抽象方法runTask的調用,Task.scala的runTask方法是一個抽象方法。Task包括ResultTask、ShuffleMapTask兩種Task,抽象runTask方法具體的實現由子類的runTask實現。ResultTask的runTask具體實現的源碼如下。

ResultTask.scala的runTask的源碼如下。

1.    override def runTask(context: TaskContext): U = {
2.      ......
3.  //反序列RDD和func處理函數
4.      val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T])
        => U)](
5.      ......
6.      func(context, rdd.iterator(partition, context))
7.    }

而ResultTask的runTask方法中反序列化生成func函數,最后通過func函數計算出最終的結果。

ResultTask執行結果與Driver的交互過程同ShuffleMapTask類似,最終,DAGScheduler.handleTaskCompletion中Task執行結果,根據ShuffleMapTask和ResultTask兩種情況分別處理。其中,ResultTask的處理結果如下。

DAGScheduler的handleTaskCompletion的源碼如下。

1.   case rt: ResultTask[_, _] =>
2.          //因為是ResultTask的一部分,所以對應為ResultStage
3.          //待辦事宜:這一功能進行重構,接受ResultStage
4.          val resultStage = stage.asInstanceOf[ResultStage]
5.          resultStage.activeJob match {
6.            case Some(job) =>
7.              if (!job.finished(rt.outputId)) {
8.                updateAccumulators(event)
9.                job.finished(rt.outputId) = true
10.               job.numFinished += 1
11.               //如果整個作業完成,就刪除
12.               if (job.numFinished == job.numPartitions) {
13.                 markStageAsFinished(resultStage)
14.                 cleanupStateForJobAndIndependentStages(job)
15.                 listenerBus.post(
16.                  SparkListenerJobEnd(job.jobId, clock.getTimeMillis(),
                     JobSucceeded))
17.               }
18.
19.   //taskSucceeded 運行用戶代碼可能會拋出一個異常
20.               try {
21.                 job.listener.taskSucceeded(rt.outputId, event.result)
22.               } catch {
23.                 case e: Exception =>
24.                   //待辦事項:可能我們要標記resultStage 失敗?
25.                   job.listener.jobFailed(new SparkDriverExecution-
                      Exception(e))
26.               }
27.             }
28.    case None =>
29.      logInfo("Ignoring result from " + rt + " because its job has
         finished")
30.  }

Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask執行的結果交給ResultTask,ResultTask根據前面Stage的執行結果進行shuffle后產生整個Job最后的結果。

主站蜘蛛池模板: 安化县| 泸西县| 沾益县| 大足县| 古交市| 宜黄县| 天柱县| 正宁县| 侯马市| 商都县| 石城县| 黄平县| 宣威市| 元谋县| 平安县| 英德市| 丹巴县| 岑溪市| 康马县| 拉孜县| 宁明县| 巫溪县| 安丘市| 荣成市| 永顺县| 肥城市| 徐汇区| 襄垣县| 文昌市| 涟水县| 蚌埠市| 乌鲁木齐县| 阿拉善右旗| 庄浪县| 湛江市| 望都县| 平昌县| 东宁县| 宣恩县| 昭通市| 穆棱市|