官术网_书友最值得收藏!

3.2 RDD彈性特性七個方面解析

RDD作為彈性分布式數據集,它的彈性具體體現在以下七個方面。

1.自動進行內存和磁盤數據存儲的切換

Spark會優先把數據放到內存中,如果內存實在放不下,會放到磁盤里面,不但能計算內存放下的數據,也能計算內存放不下的數據。如果實際數據大于內存,則要考慮數據放置策略和優化算法。當應用程序內存不足時,Spark應用程序將數據自動從內存存儲切換到磁盤存儲,以保障其高效運行。

2.基于Lineage(血統)的高效容錯機制

Lineage是基于Spark RDD的依賴關系來完成的(依賴分為窄依賴和寬依賴兩種形態),每個操作只關聯其父操作,各個分片的數據之間互不影響,出現錯誤時只要恢復單個Split的特定部分即可。常規容錯有兩種方式:一個是數據檢查點;另一個是記錄數據的更新。數據檢查點的基本工作方式,就是通過數據中心的網絡鏈接不同的機器,然后每次操作的時候都要復制數據集,就相當于每次都有一個復制,復制是要通過網絡傳輸的,網絡帶寬就是分布式的瓶頸,對存儲資源也是很大的消耗。記錄數據更新就是每次數據變化了就記錄一下,這種方式不需要重新復制一份數據,但是比較復雜,消耗性能。Spark的RDD通過記錄數據更新的方式為何很高效?因為① RDD是不可變的且Lazy;② RDD的寫操作是粗粒度的。但是,RDD讀操作既可以是粗粒度的,也可以是細粒度的。

3.Task如果失敗,會自動進行特定次數的重試

默認重試次數為4次。TaskSchedulerImpl的源碼如下所示。

Spark 2.1.1版本的TaskSchedulerImpl.scala的源碼如下。

1.  private[spark] class TaskSchedulerImpl(
2.     val sc: SparkContext,
3.      val maxTaskFailures: Int,
4.      isLocal: Boolean = false)
5.    extends TaskScheduler with Logging
6.  {
7.  def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
8.
9.  config\package.scala
10. ......
11.   private[spark] val MAX_TASK_FAILURES =
12.     ConfigBuilder("spark.task.maxFailures")
13.       .intConf
14.       .createWithDefault(4)

Spark 2.2.0版本的TaskSchedulerImpl.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第1行增加了類TaskSchedulerImpl的訪問權限限制,限于在[scheduler]包內訪問。

 上段代碼中第3行之后增加了黑名單列表跟蹤變量,用于跟蹤問題executors和nodes節點。

 上段代碼中第5行之后新增了導入TaskSchedulerImpl._的所有內容。

 上段代碼中第7行this構造函數中新增了maybeCreateBlacklistTracker參數。

 新增了一個帶sc、maxTaskFailures、isLocal參數的this構造函數。

1.   private[spark] class TaskSchedulerImpl private[scheduler](
2.     .......
3.      private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
4.      isLocal: Boolean = false)
5.    extends TaskScheduler with Logging {
6.
7.    import TaskSchedulerImpl._
8.
9.    def this(sc: SparkContext) = {
10.     this(
11.       ......
12.       TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
13.   }
14.
15.   def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
16.     this(
17.       sc,
18.       maxTaskFailures,
19.       TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
20.       isLocal = isLocal)
21.   }
22. .....

TaskSchedulerImpl是底層的任務調度接口TaskScheduler的實現,這些Schedulers從每一個Stage中的DAGScheduler中獲取TaskSet,運行它們,嘗試是否有故障。DAGScheduler是高層調度,它計算每個Job的Stage的DAG,然后提交Stage,用TaskSets的形式啟動底層TaskScheduler調度在集群中運行。

4.Stage如果失敗,會自動進行特定次數的重試

這樣,Stage對象可以跟蹤多個StageInfo(存儲SparkListeners監聽到的Stage的信息,將Stage信息傳遞給Listeners或web UI)。默認重試次數為4次,且可以直接運行計算失敗的階段,只計算失敗的數據分片,Stage的源碼如下所示。

Spark 2.1.1版本的Stage.scala的源碼如下。

1.  private[scheduler] abstract class Stage(
2.      val id: Int,
3.      val rdd: RDD[_],
4.      val numTasks: Int,
5.      val parents: List[Stage],
6.      val firstJobId: Int,
7.      val callSite: CallSite)
8.    extends Logging {
9.   //partition的個數
10.   val numPartitions = rdd.partitions.length
11.
12.   /** 屬于這個工作集的Stage */
13.   val jobIds = new HashSet[Int]
14.
15.   val pendingPartitions = new HashSet[Int]
16.
17.   /** 用于此Stage的下一個新attempt 的標識ID */
18.   private var nextAttemptId: Int = 0
19.
20.   val name: String = callSite.shortForm
21.   val details: String = callSite.longForm
22.
23.   /**
        *最新的[StageInfo] object指針,需要被初始化,
        *任何attempts都是被創造出來的,因為DAGScheduler使用 StageInfo
        *告訴SparkListeners工作何時開始(即發生前的任何階段已經創建)
24.     */
25.   private var _latestInfo: StageInfo = StageInfo.fromStage(this,
      nextAttemptId)
26.
27.   /**
        *設置stage attempt IDs 當失敗時可以讀取失敗信息,
        *跟蹤這些失敗,為了避免無休止地重復失敗
        *跟蹤每一次 attempt,以便避免記錄重復故障
        *如果從同一stage創建多任務失敗(spark-5945)
28.     */
29.   private val fetchFailedAttemptIds = new HashSet[Int]
30.
31.   private[scheduler] def clearFailures() : Unit = {
32.     fetchFailedAttemptIds.clear()
33.   }
34.
35.   /**
       * 檢查是否應該中止由于連續多次讀取失敗的stage
       * 如果失敗的次數超過允許的次數,此方法更新失敗stage attempts 和返回的運行集
36.    */
37.   private[scheduler]       def   failedOnFetchAndShouldAbort(stageAttemptId:
      Int): Boolean = {
38.     fetchFailedAttemptIds.add(stageAttemptId)
39.     fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
40.   }
41.
42.   /** 在stage 中創建一個新的 attempt  */
43.   def makeNewStageAttempt(
44.      numPartitionsToCompute: Int,
45.      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
46.     val metrics = new TaskMetrics
47.     metrics.register(rdd.sparkContext)
48.     _latestInfo = StageInfo.fromStage(
49.       this, nextAttemptId, Some(numPartitionsToCompute), metrics,
          taskLocalityPreferences)
50.     nextAttemptId += 1
51.   }
52.
53.   /** 返回當前stage中最新的 StageInfo */
54.   def latestInfo: StageInfo = _latestInfo
55.
56.   override final def hashCode(): Int = id
57.
58.   override final def equals(other: Any): Boolean = other match {
59.     case stage: Stage => stage != null && stage.id == id
60.     case _ => false
61.   }
62.
63.   /**返回需要重新計算的分區標識的序列*/
64.   def findMissingPartitions(): Seq[Int]
65. }
66.
67. private[scheduler] object Stage {
68.   //允許一個stage中止的連續故障數
69.   val MAX_CONSECUTIVE_FETCH_FAILURES = 4
70. }

Spark 2.2.0版本的Stage.scala的源碼與Spark 2.1.1版本的Stage.scala的源碼相比具有如下特點。

 上段代碼中第15行刪除pendingPartitions變量。

 上段代碼中第37~40行刪除failedOnFetchAndShouldAbort方法。

 上段代碼中第67~70行刪除Stage的object Stage對象,去掉了val MAX_CONSECUTIVE_FETCH_FAILURES = 4的變量。

在Stage終止之前允許的Stage連續嘗試的次數為4次,重試次數參數從Spark 2.1.1版本的Stage.scala的源碼移到了Spark 2.2.0版本的DAGScheduler.scala的源碼object DAGScheduler中進行定義。

1.   /**
       *在終止之前允許的連續嘗試的次數
2.     */
3.
4.    private[scheduler] val maxConsecutiveStageAttempts =
5.      sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
6.        DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
7.  ......
8.
9.  private[spark] object DAGScheduler {
10.   //在毫秒級別,等待讀取失敗事件后就停止(在下一個檢測到來之前);這是一個避免重新提
      //交任務的簡單方法,非讀取數據的map中更多失敗事件的到來
11.   val RESUBMIT_TIMEOUT = 200
12.
13.   //在終止之前允許連續嘗試的次數
14.   val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
15. }

Stage是Spark Job運行時具有相同邏輯功能和并行計算任務的一個基本單元。Stage中所有的任務都依賴同樣的Shuffle,每個DAG任務通過DAGScheduler在Stage的邊界處發生Shuffle形成Stage,然后DAGScheduler運行這些階段的拓撲順序。每個Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,則跟蹤每個輸出節點(nodes)上的輸出文件分區,它的任務結果是輸入其他的Stage(s),或者輸入一個ResultStage,若輸入一個ResultStage,這個ResultStage的任務直接在這個RDD上運行計算這個Spark Action的函數(如count()、 save()等),并生成shuffleDep等字段描述Stage和生成變量,如outputLocs和numAvailableOutputs,為跟蹤map輸出做準備。每個Stage會有firstjobid,確定第一個提交Stage的Job,使用FIFO調度時,會使得其前面的Job先行計算或快速恢復(失敗時)。

ShuffleMapStage是DAG產生數據進行Shuffle的中間階段,它發生在每次Shuffle操作之前,可能包含多個Pipelined操作,ResultStage階段捕獲函數在RDD的分區上運行Action算子計算結果,有些Stage不是運行在RDD的所有的分區上,例如,first()、lookup()等。SparkListener是Spark調度器的事件監聽接口。注意,這個接口隨著Spark版本的不同會發生變化。

5.checkpoint和persist(檢查點和持久化),可主動或被動觸發

checkpoint是對RDD進行的標記,會產生一系列的文件,且所有父依賴都會被刪除,是整個依賴(Lineage)的終點。checkpoint也是Lazy級別的。persist后RDD工作時每個工作節點都會把計算的分片結果保存在內存或磁盤中,下一次如果對相同的RDD進行其他的Action計算,就可以重用。

因為用戶只與Driver Program交互,因此只能用RDD中的cache()方法去cache用戶能看到的RDD。所謂能看到,是指經過Transformation算子處理后生成的RDD,而某些在Transformation算子中Spark自己生成的RDD是不能被用戶直接cache的。例如,reduceByKey()中會生成的ShuffleRDD、MapPartitionsRDD是不能被用戶直接cache的。在Driver Program中設定RDD.cache()后,系統怎樣進行cache?首先,在計算RDD的Partition之前就去判斷Partition要不要被cache,如果要被cache,先將Partition計算出來,然后cache到內存。cache可使用memory,如果寫到HDFS磁盤的話,就要檢查checkpoint。調用RDD.cache()后,RDD就變成persistRDD了,其StorageLevel為MEMORY_ONLY,persistRDD會告知Driver說自己是需要被persist的。此時會調用RDD.iterator()。 RDD.scala的iterator()的源碼如下。

1.  /**
      * RDD的內部方法,將從合適的緩存中讀取,否則計算它
      * 這不應該被用戶直接使用,但可用于實現自定義的子RDD
2.    */
3.
4.
5.   final def iterator(split: Partition, context: TaskContext): Iterator[T]
     = {
6.     if (storageLevel != StorageLevel.NONE) {
7.       getOrCompute(split, context)
8.     } else {
9.       computeOrReadCheckpoint(split, context)
10.    }
11.  }

當RDD.iterator()被調用的時候,也就是要計算該RDD中某個Partition的時候,會先去cacheManager那里獲取一個blockId,然后去BlockManager里匹配該Partition是否被checkpoint了,如果是,那就不用計算該Partition了,直接從checkpoint中讀取該Partition的所有records放入ArrayBuffer里面。如果沒有被checkpoint過,先將Partition計算出來,然后將其所有records放到cache中。總體來說,當RDD會被重復使用(不能太大)時,RDD需要cache。Spark自動監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據。如果想手動刪除RDD,可以使用RDD.unpersist()方法。

此外,可以利用不同的存儲級別存儲每一個被持久化的RDD。例如,它允許持久化集合到磁盤上,將集合作為序列化的Java對象持久化到內存中、在節點間復制集合或者存儲集合到Alluxio中。可以通過傳遞一個StorageLevel對象給persist()方法設置這些存儲級別。cache()方法使用默認的存儲級別-StorageLevel.MEMORY_ONLY。RDD根據useDisk、useMemory、 useOffHeap、deserialized、replication 5個參數的組合提供了常用的12種基本存儲,完整的存儲級別介紹如下。Spark 1.6.0版本的StorageLevel.scala的源碼如下。

1.   val NONE = new StorageLevel(false, false, false, false)
2.   val DISK_ONLY = new StorageLevel(true, false, false, false)
3.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
4.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)
5.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
6.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
7.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
8.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
9.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
10.  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
11.  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
12.  //堆外存儲
13.  val OFF_HEAP = new StorageLevel(false, false, true, false)

Spark 2.2.0版本的Stage.scala的源碼與Spark 1.6.0版本相比具有如下特點。

 上段代碼中第13行堆外存儲OFF_HEAP顯式指定副本的參數值為1。

 OFF_HEAP = new StorageLevel(true, true, true, false, 1)

1.  ......
2.  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

StorageLevel是控制存儲RDD的標志,每個StorageLevel記錄RDD是否使用memory,或使用ExternalBlockStore存儲,如果RDD脫離了memory或ExternalBlockStore,是否扔掉RDD,是否保留數據在內存中的序列化格式,以及是否復制多個節點的RDD分區。另外,org.apache.spark.storage.StorageLevel是單實例(singleton)對象,包含了一些靜態常量和常用的存儲級別,且可用singleton對象工廠方法StorageLevel(...)創建定制化的存儲級別。

Spark的多個存儲級別意味著在內存利用率和CPU利用率間的不同權衡。推薦通過下面的過程選擇一個合適的存儲級別:①如果RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。因為這是CPU利用率最高的選項,會使RDD上的操作盡可能地快。②如果不適合用默認級別,就選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提高對象的空間使用率,但是仍能夠相當快地訪問。③除非算子計算RDD花費較大或者需要過濾大量的數據,不要將RDD存儲到磁盤上,否則重復計算一個分區,就會和從磁盤上讀取數據一樣慢。④如果希望更快地恢復錯誤,可以利用replicated存儲機制,所有的存儲級別都可以通過replicated計算丟失的數據來支持完整的容錯。另外,replicated的數據能在RDD上繼續運行任務,而不需要重復計算丟失的數據。在擁有大量內存的環境中或者多應用程序的環境中,Off_Heap(將對象從堆中脫離出來序列化,然后存儲在一大塊內存中,這就像它存儲到磁盤上一樣,但它仍然在RAM內存中。Off_Heap對象在這種狀態下不能直接使用,須進行序列化及反序列化。序列化和反序列化可能會影響性能,Off_Heap堆外內存不需要進行GC)。Off_Heap具有如下優勢:Off_Heap運行多個執行者共享的Alluxio中相同的內存池,顯著地減少GC。如果單個的Executor崩潰,緩存的數據也不會丟失。

6.數據調度彈性,DAGScheduler、TASKScheduler和資源管理無關

Spark將執行模型抽象為通用的有向無環圖計劃(DAG),這可以將多Stage的任務串聯或并行執行,從而不需要將Stage中間結果輸出到HDFS中,當發生節點運行故障時,可有其他可用節點代替該故障節點運行。

7.數據分片的高度彈性(coalesce)

Spark進行數據分片時,默認將數據放在內存中,如果內存放不下,一部分會放在磁盤上進行保存。

RDD.scala的coalesce算子代碼如下:

1.   def coalesce(numPartitions: Int, shuffle: Boolean = false,
2.                partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
3.               (implicit ord: Ordering[T] = null)
4.       : RDD[T] = withScope {
5.     require(numPartitions > 0, s"Number of partitions ($numPartitions)
       must be positive.")
6.     if (shuffle) {
7.       /**從隨機分區開始,將元素均勻分布在輸出分區上*/
8.       val distributePartition = (index: Int, items: Iterator[T]) => {
9.         var position = (new Random(index)).nextInt(numPartitions)
10.        items.map { t =>
11.          //注:Key的哈希碼是Key本身,HashPartitioner分區器將它與總分區數進行
             //取模運算
12.
13.          position = position + 1
14.          (position, t)
15.        }
16.      } : Iterator[(Int, T)]
17.
18.      //包括一個shuffle 步驟,使我們的上游任務仍然是分布式的
19.      new CoalescedRDD(
20.        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex
           (distributePartition),
21.        new HashPartitioner(numPartitions)),
22.        numPartitions,
23.        partitionCoalescer).values
24.    } else {
25.      new CoalescedRDD(this, numPartitions, partitionCoalescer)
26.    }
27.  }

例如,在計算的過程中,會產生很多的數據碎片,這時產生一個Partition可能會非常小,如果一個Partition非常小,每次都會消耗一個線程去處理,這時可能會降低它的處理效率,需要考慮把許多小的Partition合并成一個較大的Partition去處理,這樣會提高效率。另外,有可能內存不是那么多,而每個Partition的數據Block比較大,這時需要考慮把Partition變成更小的數據分片,這樣讓Spark處理更多的批次,但是不會出現OOM。

主站蜘蛛池模板: 临颍县| 思茅市| 定南县| 芦溪县| 普定县| 台山市| 永登县| 大邑县| 皋兰县| 莱州市| 磐安县| 东兴市| 措美县| 浠水县| 宜黄县| 汝州市| 临江市| 新蔡县| 泗阳县| 南华县| 雅江县| 丹东市| 九龙城区| 清远市| 双辽市| 桃园市| 合阳县| 卓资县| 荣成市| 桑植县| 宣城市| 南宁市| 洪洞县| 岚皋县| 陈巴尔虎旗| 察隅县| 芜湖县| 古田县| 贵州省| 呼图壁县| 紫金县|