- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 1201字
- 2019-12-12 17:29:54
3.6 Spark RDD容錯原理及其四大核心要點解析
本節講解RDD不同的依賴關系(寬依賴、窄依賴)的Spark RDD容錯處理;對Spark框架層面容錯機制的三大層面(調度層、RDD血統層、Checkpoint層)及Spark RDD容錯四大核心要點進行深入解析。
3.6.1 Spark RDD容錯原理
RDD不同的依賴關系導致Spark對不同的依賴關系有不同的處理方式。
對于寬依賴而言,由于寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此情況下出現部分計算結果丟失,單一計算丟失的數據無法達到效果,便采用重新計算該步驟中的所有數據,從而會導致計算數據重復;對于窄依賴而言,由于窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此情況下出現部分計算的錯誤,由于計算結果的數據只與依賴的父RDD的相關數據有關,所以不需要重新計算所有數據,只重新計算出錯部分的數據即可。
3.6.2 RDD容錯的四大核心要點
Spark框架層面的容錯機制,主要分為三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。
Stage輸出失敗,上層調度器DAGScheduler重試。
Spark計算中,Task內部任務失敗,底層調度器重試。
RDD Lineage血統中窄依賴、寬依賴計算。
Checkpoint緩存。
1.調度層(包含DAG生成和Task重算兩大核心)
從調度層面講,錯誤主要出現在兩個方面,分別是在Stage輸出時出錯和在計算時出錯。
1)DAG生成層
Stage輸出失敗,上層調度器DAGScheduler會進行重試,如下列源碼所示。
DAGScheduler.scala的resubmitFailedStages的源碼如下。
1. private[scheduler] def resubmitFailedStages() { 2. //判斷是否存在失敗的Stages 3. if (failedStages.size > 0) { 4. //失敗的階段可以通過作業取消刪除,如果ResubmitFailedStages事件已調度,失 //敗將是空值 5. 6. logInfo("Resubmitting failed stages") 7. clearCacheLocs() 8. //獲取所有失敗Stage的列表 9. val failedStagesCopy = failedStages.toArray 10. //清空failedStages 11. failedStages.clear() 12. //對之前獲取的所有失敗的Stage,根據jobId排序后逐一重試 13. for (stage <- failedStagesCopy.sortBy(_.firstJobId)) { 14. submitStage(stage) 15. } 16. } 17. }
2)Task計算層
Spark計算過程中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。
TaskSetManager.scala的handleFailedTask的源碼如下。
1. def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) { 2. ...... 3. if (!isZombie && reason.countTowardsTaskFailures) { 4. taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask( 5. info.host, info.executorId, index)) 6. assert (null != failureReason) 7. //對失敗的Task的numFailures進行計數加一 8. numFailures(index) += 1 9. //判斷失敗的Task計數是否大于設定的最大失敗次數,如果大于,則輸出日志,并不再重試 10. if (numFailures(index) >= maxTaskFailures) { 11. logError("Task %d in stage %s failed %d times; aborting job".format( 12. index, taskSet.id, maxTaskFailures)) 13. abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" 14. .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) 15. return 16. } 17. } 18. //如果運行的Task為0時,則完成Task步驟 19. maybeFinishTaskSet() 20. } 21. .......
2.RDD Lineage血統層容錯
Spark中RDD采用高度受限的分布式共享內存,且新的RDD的產生只能夠通過其他RDD上的批量操作來創建,依賴于以RDD的Lineage為核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還可以在5~7s內交互式地查詢TB級別的數據集。
Spark RDD實現基于Lineage的容錯機制,基于RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候可以根據Lineage重新恢復計算。
在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,并不存在冗余計算。
在寬依賴情況下,丟失一個子RDD分區,重算的每個父RDD的每個分區的所有數據并不是都給丟失的子RDD分區用的,會有一部分數據相當于對應的是未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷和巨大的性能浪費。
3.checkpoint層容錯
Spark checkpoint通過將RDD寫入Disk作檢查點,是Spark lineage容錯的輔助,lineage過長會造成容錯成本過高,這時在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。
checkpoint主要適用于以下兩種情況:
DAG中的Lineage過長,如果重算,開銷太大,如PageRank、ALS等。
尤其適合于在寬依賴上作checkpoint,這個時候就可以避免為Lineage重新計算而帶來的冗余計算。