- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 1711字
- 2019-12-12 17:30:04
8.4 ShuffleMapTask和ResultTask處理結果是如何被Driver管理的
Spark Job中,根據Task所處Stage的位置,我們將Task分為兩類:第一類叫shuffleMapTask,指Task所處的Stage不是最后一個Stage,也就是Stage的計算結果還沒有輸出,而是通過Shuffle交給下一個Stage使用;第二類叫resultTask,指Task所處Stage是DAG中最后的一個Stage,也就是Stage計算結果需要進行輸出等操作,計算到此為止已經結束。簡單地說,Spark Job中除了最后一個Stage的Task叫resultTask,其他所有Task都叫ShuffleMapTask。
8.4.1 ShuffleMapTask執行結果和Driver的交互原理及源碼詳解
Driver中的CoarseGrainedSchedulerBackend給CoarseGrainedExecutorBackend發送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后會調用executor.launchTask。通過launchTask執行Task,launchTask方法中根據傳入的參數:taskId、嘗試次數、任務名稱、序列化后的任務創建一個TaskRunner,在threadPool中執行TaskRunner。TaskRunner內部會先做一些準備工作,如反序列化Task的依賴,通過網絡獲取需要的文件、Jar等;然后調用反序列化后的Task.run方法來執行任務并獲得執行結果。
其中,Task的run方法調用的時候會導致Task的抽象方法runTask的調用,Task.scala的runTask方法是一個抽象方法。Task包括ResultTask、ShuffleMapTask兩種Task,抽象runTask方法具體的實現由子類的runTask實現。ShuffleMapTask的runTask實際運行的時候會調用RDD的iterator,然后針對Partition進行計算。
ShuffleMapTask.scala的源碼如下。
1. override def runTask(context: TaskContext): MapStatus = { 2. ...... 3. val ser = SparkEnv.get.closureSerializer.newInstance() 4. val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( 5. ...... 6. val manager = SparkEnv.get.shuffleManager 7. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 8. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator [_ <: Product2[Any, Any]]]) 9. writer.stop(success = true).get 10. ......
ShuffleMapTask方法中調用ShuffleManager寫入器writer方法,在write時最終計算會調用RDD的compute方法。通過writer.stop(success = true).get,如果寫入成功,就返回MapStatus結果值。
SortShuffleWriter.scala的源碼如下。
1. override def write(records: Iterator[Product2[K, V]]): Unit = { 2. ...... 3. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 4. val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 5. shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 6. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 7. ...... 8. override def stop(success: Boolean): Option[MapStatus] = { 9. ...... 10. if (success) { 11. return Option(mapStatus) 12. } else { 13. return None 14. } 15. ......
回到TaskRunner的run方法,把task.run執行結果通過resultSer.serialize(value)序列化,生成一個directResult。然后根據大小判斷不同的結果賦值給serializedResult,傳回給Driver。
(1)如果任務執行結果特別大,超過1GB,日志就提示超出任務大小限制,返回元數據ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。
Executor.scala的源碼如下。
1. if (maxResultSize > 0 && resultSize > maxResultSize) { 2. logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString (maxResultSize)}), " + s"dropping it.") 3. 4. ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId (taskId), resultSize))
(2)如果任務執行結果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元數據ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。
Executor.scala的源碼如下。
1. ....... 2. } else if (resultSize > maxDirectResultSize) { 3. val blockId = TaskResultBlockId(taskId) 4. env.blockManager.putBytes( 5. blockId, 6. new ChunkedByteBuffer(serializedDirectResult.duplicate()), 7. StorageLevel.MEMORY_AND_DISK_SER) 8. logInfo( 9. s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") 10. ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
(3)如果任務執行結果小于128MB,就直接返回serializedDirectResult。
Executor.scala的源碼如下。
1. ....... 2. } else { 3. logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") 4. serializedDirectResult 5. ......
接下來,TaskRunner的run方法中調用execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)給Driver發送一個消息,消息中將taskId、TaskState.FINISHED、serializedResult傳進去。這里,execBackend是CoarseGrainedExecutorBackend。
Executor.scala的源碼如下。
1. override def run(): Unit = { 2. ..... 3. execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) 4. ......
CoarseGrainedExecutorBackend的statusUpdate方法的源碼如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. case Some(driverRef) => driverRef.send(msg) 5. case None => logWarning(s"Drop $msg because has not yet connected to driver") 6. } 7. }
CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate來傳輸執行結果。DriverEndpoint是一個ThreadSafeRpcEndpoint消息循環體,模式匹配收到StatusUpdate消息,調用scheduler.statusUpdate(taskId, state, data.value)方法執行。這里的scheduler是TaskSchedulerImpl。
CoarseGrainedSchedulerBackend.scala的DriverEndpoint的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. scheduler.statusUpdate(taskId, state, data.value)
DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,交給TaskResultGetter內部,通過線程去分別處理Task執行成功和失敗時的不同情況,然后告訴DAGScheduler任務處理結束的狀況。
TaskSchedulerImpl.scala的statusUpdate的源碼如下。
1. def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { 2. ........ 3. if (TaskState.isFinished(state)) { 4. cleanupTaskState(tid) 5. taskSet.removeRunningTask(tid) 6. if (state == TaskState.FINISHED) { 7. taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) 8. } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { 9. taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) 10. } 11. }
TaskResultGetter.scala的enqueueSuccessfulTask方法中,開辟一條新線程處理成功任務,對結果進行相應的處理后調用scheduler.handleSuccessfulTask。
TaskSchedulerImpl的handleSuccessfulTask的源碼如下。
1. def handleSuccessfulTask( 2. taskSetManager: TaskSetManager, 3. tid: Long, 4. taskResult: DirectTaskResult[_]): Unit = synchronized { 5. taskSetManager.handleSuccessfulTask(tid, taskResult) 6. }
TaskSchedulerImpl的handleSuccessfulTask交給TaskSetManager調用handleSuccessfulTask。
TaskSetManager的handleSuccessfulTask的源碼如下。
1. def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { 2. ...... 3. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) 4. ...... 5.
handleSuccessfulTask方法中調用sched.dagScheduler.taskEnded,taskEnded由TaskSetManager調用,匯報任務完成或者失敗。將任務完成的事件CompletionEvent放入eventProcessLoop事件處理循環中。
DAGScheduler.scala的源碼如下。
1. def taskEnded( 2. task: Task[_], 3. reason: TaskEndReason, 4. result: Any, 5. accumUpdates: Seq[AccumulatorV2[_, _]], 6. taskInfo: TaskInfo): Unit = { 7. eventProcessLoop.post( 8. CompletionEvent(task, reason, result, accumUpdates, taskInfo)) 9. }
由事件循環線程讀取消息,并調用DAGSchedulerEventProcessLoop.onReceive方法進行消息處理。
DAGScheduler.scala的源碼如下。
1. override def onReceive(event: DAGSchedulerEvent): Unit = { 2. val timerContext = timer.time() 3. try { 4. doOnReceive(event) 5. } finally { 6. timerContext.stop() 7. } 8. }
onReceive中調用doOnReceive(event)方法,模式匹配到CompletionEvent,調用dagScheduler.handleTaskCompletion方法。
DAGScheduler.scala的源碼如下。
1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { 2. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => 3. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 4. ...... 5. case completion: CompletionEvent => 6. dagScheduler.handleTaskCompletion(completion) 7. .....
DAGScheduler.handleTaskCompletion中task執行成功的情況,根據ShuffleMapTask和ResultTask兩種情況分別處理。其中,ShuffleMapTask將MapStatus匯報給MapOutTracker。
Spark 2.1.1版本的DAGScheduler的handleTaskCompletion的源碼如下。
1. private[scheduler] def handleTaskCompletion(event: CompletionEvent) { 2. ...... 3. val stage = stageIdToStage(task.stageId) 4. event.reason match { 5. case Success => 6. stage.pendingPartitions -= task.partitionId 7. task match { 8. ...... 9. case smt: ShuffleMapTask => 10. val shuffleStage = stage.asInstanceOf[ShuffleMapStage] 11. updateAccumulators(event) 12. val status = event.result.asInstanceOf[MapStatus] 13. val execId = status.location.executorId 14. logDebug("ShuffleMapTask finished on " + execId) 15. if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch (execId)) { 16. logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") 17. } else { 18. shuffleStage.addOutputLoc(smt.partitionId, status) 19. } 20. 21. if (runningStages.contains(shuffleStage) && shuffleStage. pendingPartitions.isEmpty) { 22. markStageAsFinished(shuffleStage) 23. logInfo("looking for newly runnable stages") 24. logInfo("running: " + runningStages) 25. logInfo("waiting: " + waitingStages) 26. logInfo("failed: " + failedStages) 27. /** * 我們設置為true,來遞增紀元編號,以防止map輸出重新計算。在這種情 * 況下,一些節點可能已經緩存了損壞的位置(從我們檢測到的錯誤),將需 * 要紀元編號遞增來重取它們。待辦事項:如果這不是第一次,那么只增加紀 * 元編號,我們注冊了map輸出 28. */ 29. mapOutputTracker.registerMapOutputs( 30. shuffleStage.shuffleDep.shuffleId, 31. shuffleStage.outputLocInMapOutputTrackerFormat(), 32. changeEpoch = true) 33. clearCacheLocs() 34. if (!shuffleStage.isAvailable) { 35. //有些任務已經失敗了,重新提交shuffleStage 36. //待辦事項:低級調度器也應該處理這個問題 37. logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + 38. ") because some of its tasks had failed: " + 39. shuffleStage.findMissingPartitions().mkString(", ")) 40. submitStage(shuffleStage) 41. } else { 42. //標識任何map階段的作業都在這個階段等待完成 43. if (shuffleStage.mapStageJobs.nonEmpty) { 44. val stats = mapOutputTracker.getStatistics(shuffleStage. shuffleDep) 45. for (job <- shuffleStage.mapStageJobs) { 46. markMapStageJobAsFinished(job, stats) 47. } 48. } 49. submitWaitingChildStages(shuffleStage) 50. } 51. } 52. }
Spark 2.2.0版本的DAGScheduler的handleTaskCompletion的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第6行刪掉stage.pendingPartitions -= task.partitionId。
上段代碼中第14行之后新增if的邏輯判斷,如果stageIdToStage(task.stageId). latestInfo.attemptId等于task.stageAttemptId,則執行shuffleStage.pendingPartitions -= task.partitionId。
1. case smt: ShuffleMapTask => 2. ...... 3. if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { 4. /** * 任務Task是當前stage正在嘗試做的。因為任務在TaskSetManager下順利完成,其 * 標記為不再等待TaskSetManager把任務完成,當任務的epoch較小時,需忽略輸出。 * 在這種情況下,掛起的分區為空時,仍然會丟失輸出位置,這將導致DAGScheduler 重新 * 提交下面的stage 5. */ 6. shuffleStage.pendingPartitions -= task.partitionId 7. } 8. ......
8.4.2 ResultTask執行結果與Driver的交互原理及源碼詳解
Task的run方法調用的時候會導致Task的抽象方法runTask的調用,Task.scala的runTask方法是一個抽象方法。Task包括ResultTask、ShuffleMapTask兩種Task,抽象runTask方法具體的實現由子類的runTask實現。ResultTask的runTask具體實現的源碼如下。
ResultTask.scala的runTask的源碼如下。
1. override def runTask(context: TaskContext): U = { 2. ...... 3. //反序列RDD和func處理函數 4. val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( 5. ...... 6. func(context, rdd.iterator(partition, context)) 7. }
而ResultTask的runTask方法中反序列化生成func函數,最后通過func函數計算出最終的結果。
ResultTask執行結果與Driver的交互過程同ShuffleMapTask類似,最終,DAGScheduler.handleTaskCompletion中Task執行結果,根據ShuffleMapTask和ResultTask兩種情況分別處理。其中,ResultTask的處理結果如下。
DAGScheduler的handleTaskCompletion的源碼如下。
1. case rt: ResultTask[_, _] => 2. //因為是ResultTask的一部分,所以對應為ResultStage 3. //待辦事宜:這一功能進行重構,接受ResultStage 4. val resultStage = stage.asInstanceOf[ResultStage] 5. resultStage.activeJob match { 6. case Some(job) => 7. if (!job.finished(rt.outputId)) { 8. updateAccumulators(event) 9. job.finished(rt.outputId) = true 10. job.numFinished += 1 11. //如果整個作業完成,就刪除 12. if (job.numFinished == job.numPartitions) { 13. markStageAsFinished(resultStage) 14. cleanupStateForJobAndIndependentStages(job) 15. listenerBus.post( 16. SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) 17. } 18. 19. //taskSucceeded 運行用戶代碼可能會拋出一個異常 20. try { 21. job.listener.taskSucceeded(rt.outputId, event.result) 22. } catch { 23. case e: Exception => 24. //待辦事項:可能我們要標記resultStage 失敗? 25. job.listener.jobFailed(new SparkDriverExecution- Exception(e)) 26. } 27. } 28. case None => 29. logInfo("Ignoring result from " + rt + " because its job has finished") 30. }
Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask執行的結果交給ResultTask,ResultTask根據前面Stage的執行結果進行shuffle后產生整個Job最后的結果。
- PPT,要你好看
- Splunk 7 Essentials(Third Edition)
- 21天學通C++
- 人工智能與人工生命
- Ceph:Designing and Implementing Scalable Storage Systems
- Docker High Performance(Second Edition)
- Docker on Amazon Web Services
- 網絡安全技術及應用
- 分析力!專業Excel的制作與分析實用法則
- Godot Engine Game Development Projects
- 基于ARM9的小型機器人制作
- Puppet 3 Beginner’s Guide
- 計算機組裝與維修實訓
- 基于元胞自動機的人群疏散系統建模與分析
- Learning Couchbase