官术网_书友最值得收藏!

3.6 Spark RDD容錯原理及其四大核心要點解析

本節講解RDD不同的依賴關系(寬依賴、窄依賴)的Spark RDD容錯處理;對Spark框架層面容錯機制的三大層面(調度層、RDD血統層、Checkpoint層)及Spark RDD容錯四大核心要點進行深入解析。

3.6.1 Spark RDD容錯原理

RDD不同的依賴關系導致Spark對不同的依賴關系有不同的處理方式。

對于寬依賴而言,由于寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此情況下出現部分計算結果丟失,單一計算丟失的數據無法達到效果,便采用重新計算該步驟中的所有數據,從而會導致計算數據重復;對于窄依賴而言,由于窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此情況下出現部分計算的錯誤,由于計算結果的數據只與依賴的父RDD的相關數據有關,所以不需要重新計算所有數據,只重新計算出錯部分的數據即可。

3.6.2 RDD容錯的四大核心要點

Spark框架層面的容錯機制,主要分為三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。

 Stage輸出失敗,上層調度器DAGScheduler重試。

 Spark計算中,Task內部任務失敗,底層調度器重試。

 RDD Lineage血統中窄依賴、寬依賴計算。

 Checkpoint緩存。

1.調度層(包含DAG生成和Task重算兩大核心)

從調度層面講,錯誤主要出現在兩個方面,分別是在Stage輸出時出錯和在計算時出錯。

1)DAG生成層

Stage輸出失敗,上層調度器DAGScheduler會進行重試,如下列源碼所示。

DAGScheduler.scala的resubmitFailedStages的源碼如下。

1.      private[scheduler] def resubmitFailedStages() {
2.   //判斷是否存在失敗的Stages
3.      if (failedStages.size > 0) {
4.        //失敗的階段可以通過作業取消刪除,如果ResubmitFailedStages事件已調度,失
          //敗將是空值
5.
6.        logInfo("Resubmitting failed stages")
7.        clearCacheLocs()
8.      //獲取所有失敗Stage的列表
9.        val failedStagesCopy = failedStages.toArray
10.     //清空failedStages
11.       failedStages.clear()
12.     //對之前獲取的所有失敗的Stage,根據jobId排序后逐一重試
13.       for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
14.         submitStage(stage)
15.       }
16.     }
17.   }

2)Task計算層

Spark計算過程中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。

TaskSetManager.scala的handleFailedTask的源碼如下。

1.  def  handleFailedTask(tid: Long, state: TaskState, reason:
    TaskFailedReason) {
2.      ......
3.    if (!isZombie && reason.countTowardsTaskFailures) {
4.        taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
5.          info.host, info.executorId, index))
6.        assert (null != failureReason)
7.  //對失敗的Task的numFailures進行計數加一
8.        numFailures(index) += 1
9.  //判斷失敗的Task計數是否大于設定的最大失敗次數,如果大于,則輸出日志,并不再重試
10.       if (numFailures(index) >= maxTaskFailures) {
11.         logError("Task %d in stage %s failed %d times; aborting job".format(
12.           index, taskSet.id, maxTaskFailures))
13.         abort("Task %d in stage %s failed %d times, most recent failure:
            %s\nDriver stacktrace:"
14.           .format(index,      taskSet.id,    maxTaskFailures,      failureReason),
              failureException)
15.         return
16.       }
17.     }
18. //如果運行的Task為0時,則完成Task步驟
19.     maybeFinishTaskSet()
20.   }
21. .......
2.RDD Lineage血統層容錯

Spark中RDD采用高度受限的分布式共享內存,且新的RDD的產生只能夠通過其他RDD上的批量操作來創建,依賴于以RDD的Lineage為核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還可以在5~7s內交互式地查詢TB級別的數據集。

Spark RDD實現基于Lineage的容錯機制,基于RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候可以根據Lineage重新恢復計算。

 在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,并不存在冗余計算。

 在寬依賴情況下,丟失一個子RDD分區,重算的每個父RDD的每個分區的所有數據并不是都給丟失的子RDD分區用的,會有一部分數據相當于對應的是未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷和巨大的性能浪費。

3.checkpoint層容錯

Spark checkpoint通過將RDD寫入Disk作檢查點,是Spark lineage容錯的輔助,lineage過長會造成容錯成本過高,這時在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。

checkpoint主要適用于以下兩種情況:

 DAG中的Lineage過長,如果重算,開銷太大,如PageRank、ALS等。

 尤其適合于在寬依賴上作checkpoint,這個時候就可以避免為Lineage重新計算而帶來的冗余計算。

主站蜘蛛池模板: 疏勒县| 铜陵市| 马龙县| 伊金霍洛旗| 芦溪县| 错那县| 黄大仙区| 长垣县| 于田县| 闻喜县| 淮安市| 樟树市| 贺州市| 仪陇县| 县级市| 石阡县| 峨眉山市| 青海省| 清丰县| 津市市| 海宁市| 扬中市| 望都县| 平果县| 北安市| 托克逊县| 黔江区| 任丘市| 邮箱| 安顺市| 昌图县| 察隅县| 金山区| 哈密市| 东安县| 安塞县| 余江县| 绵竹市| 饶阳县| 南陵县| 项城市|