官术网_书友最值得收藏!

7.2 Shuffle的框架

本節講解Shuffle的框架、Shuffle的框架內核、Shuffle數據讀寫的源碼解析。Spark Shuffle從基于Hash的Shuffle,引入了Shuffle Consolidate機制(即文件合并機制),演進到基于Sort的Shuffle實現方式。隨著Tungsten計劃的引入與優化,引入了基于Tungsten-Sort的Shuffle實現方式。

7.2.1 Shuffle的框架演進

Spark的Shuffle框架演進歷史可以從框架本身的演進、Shuffle具體實現機制的演進兩部分進行解析。

框架本身的演進可以從面向接口編程的原則出發,結合Build設計模式進行理解。整個Spark的Shuffle框架從Spark 1.1版本開始,提供便于測試、擴展的可插拔式框架。

而對應Shuffle的具體實現機制的演進部分,可以跟蹤Shuffle實現細節在各個版本中的變更。具體體現在Shuffle數據的寫入或讀取,以及讀寫相關的數據塊解析方式。下面簡單描述一下整個演進過程。

在Spark 1.1之前,Spark中只實現了一種Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的實現方式中,每個Mapper階段的Task都會為每個Reduce階段的Task生成一個文件,通常會產生大量的文件(即對應為M×R個中間文件,其中,M表示Mapper階段的Task個數,R表示Reduce階段的Task個數)。伴隨大量的隨機磁盤I/O操作與大量的內存開銷。

為了緩解上述問題,在Spark 0.8.1版本中為基于Hash的Shuffle的實現引入了Shuffle Consolidate機制(即文件合并機制),將Mapper端生成的中間文件進行合并的處理機制。通過將配置屬性spark.shuffle.consolidateFiles設置為true,減少中間生成的文件數量。通過文件合并,可以將中間文件的生成方式修改為每個執行單位(類似于Hadoop的Slot)為每個Reduce階段的Task生成一個文件。其中,執行單位對應為:每個Mapper階段的Cores數/每個Task分配的Cores數(默認為1)。最終可以將文件個數從M×R修改為E×C/T×R,其中,E表示Executors個數,C表示可用Cores個數,T表示Task分配的Cores個數。

基于Hash的Shuffle的實現方式中,生成的中間結果文件的個數都會依賴于Reduce階段的Task個數,即Reduce端的并行度,因此文件數仍然不可控,無法真正解決問題。為了更好地解決問題,在Spark 1.1版本引入了基于Sort的Shuffle實現方式,并且在Spark 1.2版本之后,默認的實現方式也從基于Hash的Shuffle,修改為基于Sort的Shuffle實現方式,即使用的ShuffleManager從默認的hash修改為sort。首先,每個Mapper階段的Task不會為每個Reduce階段的Task生成一個單獨的文件;而是全部寫到一個數據(Data)文件中,同時生成一個索引(Index)文件,Reduce階段的各個Task可以通過該索引文件獲取相關的數據。避免產生大量文件的直接收益就是降低隨機磁盤I/O與內存的開銷。最終生成的文件個數減少到2M,其中M表示Mapper階段的Task個數,每個Mapper階段的Task分別生成兩個文件(1個數據文件、1個索引文件),最終的文件個數為M個數據文件與M個索引文件。因此,最終文件個數是2×M個。

隨著Tungsten計劃的引入與優化,從Spark 1.4版本開始(Tungsten計劃目前在Spark 1.5與Spark 1.6兩個版本中分別實現了第一與第二兩個階段),在Shuffle過程中也引入了基于Tungsten-Sort的Shuffle實現方式,通過Tungsten項目所做的優化,可以極大提高Spark在數據處理上的性能。

為了更合理、更高效地使用內存,在Spark的Shuffle實現方式演進過程中,引進了外部排序等處理機制(針對基于Sort的Shuffle機制。基于Hash的Shuffle機制從最原始的全部放入內存改為記錄級寫入)。同時,為了保存Shuffle結果提高性能以及支持資源動態分配等特性,也引進了外部Shuffle服務等機制。

7.2.2 Shuffle的框架內核

Shuffle框架的設計可以從兩方面理解:一方面,為了Shuffle模塊更加內聚并與其他模塊解耦;另一方面,為了更方便替換、測試、擴展Shuffle的不同實現方式。從Spark 1.1版本開始,引進了可插拔式的Shuffle框架(通過將Shuffle相關的實現封裝到一個統一的對外接口,提供一種具體實現可插拔的框架)。Spark框架中,通過ShuffleManager來管理各種不同實現機制的Shuffle過程,由ShuffleManager統一構建、管理具體實現子類來實現Shuffle框架的可插拔的Shuffle機制。

在詳細描述Shuffle框架實現細節之前,先給出可插拔式Shuffle的整體架構的類圖,如圖7-2所示。

圖7-2 可插拔式Shuffle的整體架構的類圖

在DAG的調度過程中,Stage階段的劃分是根據是否有Shuffle過程,也就是當存在ShuffleDependency的寬依賴時,需要進行Shuffle,這時會將作業(Job)劃分成多個Stage。對應地,在源碼實現中,通過在劃分Stage的關鍵點——構建ShuffleDependency時——進行Shuffle注冊,獲取后續數據讀寫所需的ShuffleHandle。

Stage階段劃分后,最終每個作業(Job)提交后都會對應生成一個ResultStage與若干個ShuffleMapStage,其中ResultStage表示生成作業的最終結果所在的Stage。ResultStage與ShuffleMapStage中的Task分別對應了ResultTask與ShuffleMapTask。一個作業,除了最終的ResultStage,其他若干ShuffleMapStage中的各個ShuffleMapTask都需要將最終的數據根據相應的分區器(Partitioner)對數據進行分組(即將數據重組到新的各個分區中),然后持久化分組后的數據。對應地,每個RDD本身記錄了它的數據來源,在計算(compute)時會讀取所需數據,對于帶有寬依賴的RDD,讀取時會獲取在ShuffleMapTask中持久化的數據。

從圖7-2中可以看到,外部寬依賴相關的RDD與ShuffleManager之間的注冊交互,通過該注冊,每個RDD自帶的寬依賴(ShuffleDependency)內部會維護Shuffle的唯一標識信息ShuffleId以及與Shuffle過程具體讀寫相關的句柄ShuffleHandle,后續在ShuffleMapTask中啟動任務(Task)的運行時,可以通過該句柄獲取相關的Shuffle寫入器實例,實現具體的數據磁盤寫操作。

而在帶有寬依賴(ShuffleDependency)的RDD中,執行compute時會去讀取上一Stage為其輸出的Shuffle數據,此時同樣會通過該句柄獲取相關的Shuffle讀取器實例,實現具體數據的讀取操作。需要注意的是,當前Shuffle的讀寫過程中,與BlockManager的交互,是通過MapOutputTracker來跟蹤Shuffle過程中各個任務的輸出數據的。在任務完成等場景中,會將對應的MapStatus信息注冊到MapOutputTracker中,而在compute數據讀取過程中,也會通過該跟蹤器來獲取上一Stage的輸出數據在BlockManager中的位置,然后通過getReader得到的數據讀取器,從這些位置中讀取數據。

目前對Shuffle的輸出進行跟蹤的MapOutputTracker并沒有和Shuffle數據讀寫類一樣,也封裝到Shuffle的框架中。如果從代碼聚合與解耦等角度出發,也可以將MapOutputTracker合并到整個Shuffle框架中,然后在Shuffle寫入器輸出數據之后立即進行注冊,在數據讀取器讀取數據前獲取位置等(但對應的DAG等調度部分,也需要進行修改)。

ShuffleManager封裝了各種Shuffle機制的具體實現細節,包含的接口與屬性如下所示。

(1)registerShuffle:每個RDD在構建它的父依賴(這里特指ShuffleDependency)時,都會先注冊到ShuffleManager,獲取ShuffleHandler,用于后續數據塊的讀寫等。

(2)getWriter:可以通過ShuffleHandler獲取數據塊寫入器,寫數據時通過Shuffle的塊解析器shuffleBlockResolver,獲取寫入位置(通常將寫入位置抽象為Bucket,位置的選擇則由洗牌的規則,即Shuffle的分區器決定),然后將數據寫入到相應位置(理論上,位置可以位于任何能存儲數據的地方,包括磁盤、內存或其他存儲框架等,目前在可插拔框架的幾種實現中,Spark與Hadoop一樣都采用磁盤的方式進行存儲,主要目的是為了節約內存,同時提高容錯性)。

(3)getReader:可以通過ShuffleHandler獲取數據塊讀取器,然后通過Shuffle的塊解析器shuffleBlockResolver,獲取指定數據塊。

(4)unregisterShuffle:與注冊對應,用于刪除元數據等后續清理操作。

(5)shuffleBlockResolver:Shuffle的塊解析器,通過該解析器,為數據塊的讀寫提供支撐層,便于抽象具體的實現細節。

7.2.3 Shuffle框架的源碼解析

用戶可以通過自定義ShuffleManager接口,并通過指定的配置屬性進行設置,也可以通過該配置屬性指定Spark已經支持的ShuffleManager具體實現子類。

在SparkEnv源碼中可以看到設置的配置屬性,以及當前在Spark的ShuffleManager可插拔框架中已經提供的ShuffleManager具體實現。Spark 2.0版本中支持sort、tungsten-sort兩種方式。

Spark 2.1.1版本的SparkEnv.scala的源碼如下。

1.   //用戶可以通過短格式的命名來指定所使用的ShuffleManager
2.  val shortShuffleMgrNames = Map(
3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager]
         .getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.
          SortShuffleManager].getName)
4.
5.  //指定ShuffleManager的配置屬性:"spark.shuffle.manager"
6.    //默認情況下使用"sort",即SortShuffleManager的實現
7.      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
8.      val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.
        toLowerCase, shuffleMgrName)
9.      val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

Spark 2.2.0版本的SparkEnv.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第8行調用toLowerCase小寫轉換方法,設置Locale.ROOT區域表示。root locale是一個區域設置,其語言、地區、變量都設置為空("")字符串。

1.   ......
2.      val shuffleMgrClass =
3.        shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase
          (Locale.ROOT), shuffleMgrName)
4.  .......

從代碼中可以看出,ShuffleManager是Spark Shuffle系統提供的一個可插拔式接口,可以通過spark.shuffle.manager配置屬性來設置自定義的ShuffleManager。

在Driver和每個Executor的SparkEnv實例化過程中,都會創建一個ShuffleManager,用于管理塊數據,提供集群塊數據的讀寫,包括數據的本地讀寫和讀取遠程節點的塊數據。

Shuffle系統的框架可以以ShuffleManager為入口進行解析。在ShuffleManager中指定了整個Shuffle框架使用的各個組件,包括如何注冊到ShuffleManager,以獲取一個用于數據讀寫的處理句柄ShuffleHandle,通過ShuffleHandle獲取特定的數據讀寫接口:ShuffleWriter與ShuffleReader,以及如何獲取塊數據信息的解析接口ShuffleBlockResolver。下面通過源碼分別對這幾個比較重要的組件進行解析。

1.ShuffleManager的源碼解析

由于ShuffleManager是Spark Shuffle系統提供的一個可插拔式接口,提供具體實現子類或自定義具體實現子類時,都需要重寫ShuffleManager類的抽象接口。下面首先分析ShuffleManager的源碼。

ShuffleManager.scala的源碼如下。

1.
2.  //Shuffle系統的可插拔接口。在Driver和每個Executor的SparkEnv實例中創建
3.  private[spark] trait ShuffleManager {
4.
5.    /**
6.      *在Driver端向ShuffleManager注冊一個Shuffle,獲取一個Handle
7.      *在具體Tasks中會通過該Handle來讀寫數據
8.      */
9.    def registerShuffle[K, V, C](
10.       shuffleId: Int,
11.       numMaps: Int,
12.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle
13.
14.   /**
        *獲取對應給定的分區使用的ShuffleWriter,該方法在Executors上執行各個Map
        *任務時調用
15.     */
16.   def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:
      TaskContext): ShuffleWriter[K, V]
17.   /**
        * 獲取在Reduce階段讀取分區的ShuffleReader,對應讀取的分區由[startPartition
        * to endPartition-1]區間指定。該方法在Executors上執行,在各個Reduce任務時調用
        *
18      */
19.   def getReader[K, C](
20
21.       handle: ShuffleHandle,
22.       startPartition: Int,
23.       endPartition: Int,
24.       context: TaskContext): ShuffleReader[K, C]
25.
26.   /**
27.     *該接口和registerShuffle分別負責元數據的取消注冊與注冊
28.     *調用unregisterShuffle接口時,會移除ShuffleManager中對應的元數據信息
29.     */
30.   def unregisterShuffle(shuffleId: Int): Boolean
31.
32.   /**
        *返回一個可以基于塊坐標來獲取Shuffle 塊數據的ShuffleBlockResolver
33.     */
34.   def shuffleBlockResolver: ShuffleBlockResolver
35.
36.   /**終止ShuffleManager */
37.   def stop(): Unit
38. }
2.ShuffleHandle的源碼解析
1.  abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}

ShuffleHandle比較簡單,用于記錄Task與Shuffle相關的一些元數據,同時也可以作為不同具體Shuffle實現機制的一種標志信息,控制不同具體實現子類的選擇等。

3.ShuffleWriter的源碼解析

ShuffleWriter.scala的源碼如下。

1.   private[spark] abstract class ShuffleWriter[K, V] {
2.    /** Write a sequence of records to this task's output */
3.    @throws[IOException]
4.    def write(records: Iterator[Product2[K, V]]): Unit
5.
6.    /** Close this writer, passing along whether the map completed */
7.    def stop(success: Boolean): Option[MapStatus]
8.  }

繼承ShuffleWriter的每個具體子類會實現write接口,給出任務在輸出時的寫記錄的具體方法。

4.ShuffleReader的源碼解析

ShuffleReader.scala的源碼如下。

1.  private[spark] trait ShuffleReader[K, C] {
2.   /** Read the combined key-values for this reduce task */
3.   def read(): Iterator[Product2[K, C]]

繼承ShuffleReader的每個具體子類會實現read接口,計算時負責從上一階段Stage的輸出數據中讀取記錄。

5.ShuffleBlockResolver的源碼解析

ShuffleBlockResolver的源碼如下。

1.  /**
      *該特質的具體實現子類知道如何通過一個邏輯Shuffle塊標識信息來獲取一個塊數據。具體
      *實現可以使用文件或文件段來封裝Shuffle的數據。這是獲取Shuffle塊數據時使用的抽
      *象接口,在BlockStore中使用
2.     */
3.
4.
5.  trait ShuffleBlockResolver {
6.    type ShuffleId = Int
7.
8.    /**
       *獲取指定塊的數據。如果指定塊的數據無法獲取,則拋出異常
9.     */
10.   def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
11.
12.   def stop(): Unit
13. }

繼承ShuffleBlockResolver的每個具體子類會實現getBlockData接口,給出具體的獲取塊數據的方法。

目前在ShuffleBlockResolver的各個具體子類中,除給出獲取數據的接口外,通常會提供如何解析塊數據信息的接口,即提供了寫數據塊時的物理塊與邏輯塊之間映射關系的解析方法。

7.2.4 Shuffle數據讀寫的源碼解析

1.Shuffle寫數據的源碼解析

從Spark Shuffle的整體框架中可以看到,ShuffleManager提供了Shuffle相關數據塊的寫入與讀取,即對應的接口getWriter與getReader。

在解析Shuffle框架數據讀取過程中,可以構建一個具有ShuffleDependency的RDD,查看執行過程中,Shuffle框架中的數據讀寫接口getWriter與getReader如何使用,通過這種具體案例的方式來加深對源碼的理解。

Spark中Shuffle具體的執行機制可以參考本書的其他章節,在此僅分析與Shuffle直接相關的內容。通過DAG調度機制的解析,可以知道Spark中一個作業可以根據寬依賴切分Stages,而在Stages中,相應的Tasks也包含兩種,即ResultTask與ShuffleMapTask。其中,一個ShuffleMapTask會基于ShuffleDependency中指定的分區器,將一個RDD的元素拆分到多個buckets中,此時通過ShuffleManager的getWriter接口獲取數據與buckets的映射關系。而ResultTask對應的是一個將輸出返回給應用程序Driver端的Task,在該Task執行過程中,最終都會調用RDD的compute對內部數據進行計算,而在帶有ShuffleDependency的RDD中,在compute計算時,會通過ShuffleManager的getReader接口,獲取上一個Stage的Shuffle輸出結果作為本次Task的輸入數據。

首先來看ShuffleMapTask中的寫數據流程,具體代碼如下所示。

ShuffleMapTask.scala的源碼如下。

1.  override def runTask(context: TaskContext): MapStatus = {
2.     ......
3.  //首先從SparkEnv獲取ShuffleManager
4.  //然后從ShuffleDependency中獲取注冊到ShuffleManager時得到的shuffleHandle
5.  //根據shuffleHandle和當前Task對應的分區ID,獲取ShuffleWriter
6.  //最后根據獲取的ShuffleWriter,調用其write接口,寫入當前分區的數據
7.  var writer: ShuffleWriter[Any, Any] = null
8.      try {
9.        val manager = SparkEnv.get.shuffleManager
10.      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId,
         context)
11.       writer.write(rdd.iterator(partition, context).asInstanceOf
          [Iterator[_ <: Product2[Any, Any]]])
12.       writer.stop(success = true).get
13.     } catch {
14.       ......
15.     }
16.   }
2.Shuffle讀數據的源碼解析

對應的數據讀取器,從RDD的5個抽象接口可知,RDD的數據流最終會經過算子操作,即RDD中的compute方法。下面以包含寬依賴的RDD、CoGroupedRDD為例,查看如何獲取Shuffle的數據。具體代碼如下所示。

Spark 1.6.0版本的CoGroupedRDD.scala的源碼如下。

1.  //對指定分區進行計算的抽象接口,以下為CoGroupedRDD具體子類中該方法的實現
2.  override def compute(s: Partition, context: TaskContext): Iterator[(K,
    Array[Iterable[_]])] = {
3.      val split = s.asInstanceOf[CoGroupPartition]
4.      val numRdds = dependencies.length
5.
6.      //A list of (rdditerator, dependency number) pairs
7.      val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
8.      for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
9.        case    oneToOneDependency:       OneToOneDependency[Product2[K,     Any]]
          @unchecked =>
10.         val dependencyPartition = split.narrowDeps(depNum).get.split
11.         //Read them from the parent
12.         val it = oneToOneDependency.rdd.iterator(dependencyPartition,
            context)
13. rddIterators += ((it, depNum))
14.
15.  case shuffleDependency: ShuffleDependency[_, _, _] =>
16. //首先從SparkEnv獲取ShuffleManager
17. //然后從ShuffleDependency中獲取注冊到ShuffleManager時得到的shuffleHandle
18. //根據shuffleHandle和當前Task對應的分區ID,獲取ShuffleWriter
19. //最后根據獲取的ShuffleReader,調用其read接口,讀取Shuffle的Map輸出
20.
21. val it = SparkEnv.get.shuffleManager
22.           .getReader(shuffleDependency.shuffleHandle, split.index,
              split.index + 1, context)
23.           .read()
24. rddIterators += ((it, depNum))
25.     }
26.
27.     val map = createExternalMap(numRdds)
28.     for ((it, depNum) <- rddIterators) {
29.       map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2,
          depNum))))
30.    }
31.    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
32.    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
33.    context.internalMetricsToAccumulators(
34.      InternalAccumulator.PEAK_EXECUTION_MEMORY).add
         (map.peakMemoryUsedBytes)
35.    new InterruptibleIterator(context,
36.      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
37.  }

Spark 2.2.0版本的CoGroupedRDD.scala的源碼與Spark 1.6.0版本相比具有如下特點:上段代碼中第28~29行的context.internalMetricsToAccumulators方法調整為context.taskMetrics方法,用于任務的度量監控,監控內存的峰值使用情況。

1.  ......
2.  context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
3.  .......

從代碼中可以看到,帶寬依賴的RDD的compute操作中,最終是通過SparkEnv中的ShuffleManager實例的getReader方法,獲取數據讀取器的,然后再次調用讀取器的read讀取指定分區范圍的Shuffle數據。注意,是帶寬依賴的RDD,而非ShuffleRDD,除了ShuffleRDD外,還有其他RDD也可以帶上寬依賴的,如前面給出的CoGroupedRDD。

目前支持的幾種具體Shuffle實現機制在讀取數據的處理上都是一樣的。從源碼角度可以看到,當前繼承了ShuffleReader這一數據讀取器的接口的具體子類只有BlockStoreShuffleReader,因此,本章內容僅在此對各種Shuffle實現機制的數據讀取進行解析,后續各實現機制中不再重復描述。

源碼解析的第一步仍然是查看該類的描述信息,具體如下所示。

1.  /**
2.   *通過從其他節點上請求讀取   Shuffle  數據來接收并讀取指定范圍[起始分區, 結束分區)
     *——對應為左閉右開區間
3.   *通過從其他節點上請求讀取Shuffle數據來接收并讀取指定范圍[起始分區,結束分區]
4.   *——對應為左閉右開區間
5.   */

從注釋上可以看出,讀取器負責上一Stage為下一Stage輸出數據塊的讀取。從前面對ShuffleReader接口的解析可知,繼承的具體子類需要實現真正的數據讀取操作,即實現read方法。因此,該方法便是需要重點關注的源碼。一些關鍵的代碼如下所示。

Spark 2.1.1版本的BlockStoreShuffleReader.scala的源碼如下。

1.   //為該Reduce任務讀取并合并key-values 值
2.  override def read(): Iterator[Product2[K, C]] = {
3.   //真正的數據Iterator讀取是通過ShuffleBlockFetcherIterator來完成的
4.      val blockFetcherItr = new ShuffleBlockFetcherIterator(
5.        context,
6.        blockManager.shuffleClient,
7.        blockManager,
8.        //可以看到,當ShuffleMapTask完成后注冊到mapOutputTracker的元數據信息
          //同樣會通過mapOutputTracker來獲取,在此同時還指定了獲取的分區范圍
          //通過該方法的返回值類型
9.
10.
11.
12.       mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId,
          startPartition, endPartition),
13.      //默認讀取時的數據大小限制為48m,對應后續并行的讀取,都是一種數據讀取的控制策
         //略,一方面可以避免目標機器占用過多帶寬,同時也可以啟動并行機制,加快讀取速度
14.
15.
16.
17.       SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight",
          "48m") * 1024 * 1024,
18.       SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight",
          Int.MaxValue))
19.
20.     //在此針對前面獲取的各個數據塊唯一標識ID信息及其對應的輸入流進行處理
21.     val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
22.       serializerManager.wrapStream(blockId, inputStream)
23.     }
24.
25.     val serializerInstance = dep.serializer.newInstance()
26.
27.     //為每個流stream創建一個鍵/值迭代器
28.     val recordIter = wrappedStreams.flatMap { wrappedStream =>
29.       //注意:askey Value Iterator在內部迭代器Next Iterator中包裹一個鍵/值對,
          //當Input Stream中的數據已讀取,Next Iterator確保Close()方法被調用
30.
31.
32.       serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
33.     }
34.
35.     //為每個記錄更新上下文任務度量
36.     val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
37.     val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
38.       recordIter.map { record =>
39.         readMetrics.incRecordsRead(1)
40.         record
41.       },
42.       context.taskMetrics().mergeShuffleReadMetrics())
43.
44.     //為了支持任務取消,這里必須使用可中斷迭代器
45.    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context,
       metricIter)
46.  //對讀取到的數據進行聚合處理
47.     val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator
        .isDefined) {
48.      //如果在Map端已經做了聚合的優化操作,則對讀取到的聚合結果進行聚合,注意此時的
         //聚合操作與數據類型和Map端未做優化時是不同的
49.
50.
51.
52.   if (dep.mapSideCombine) {
53.         //對讀取到的數據進行聚合處理
54.         val combinedKeyValuesIterator = interruptibleIter.asInstanceOf
            [Iterator[(K, C)]]
55.
56.      //Map端各分區針對Key進行合并后的結果再次聚合,Map的合并可以大大減少網絡傳輸
         //的數據量
57.
58.         dep.aggregator.get.combineCombinersByKey
            (combinedKeyValuesIterator, context)
59.       } else {
60.         //我們無需關心值的類型,但應確保聚合是兼容的,其將把值的類型轉化成聚合以后的
            //C類型
61.
62.
63.         val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator
            [(K, Nothing)]]
64.         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
65.       }
66.     } else {
67.       require(!dep.mapSideCombine, "Map-side combine without Aggregator
          specified!")
68.       interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
69.     }
70.      //在基于Sort的Shuffle實現過程中,默認基于PartitionId進行排序,在分區的內
         //部,數據是沒有排序的,因此添加了keyOrdering變量,提供是否需要針對分區內的
         //數據進行排序的標識信息
71.     //如果定義了排序,則對輸出結果進行排序
72.     dep.keyOrdering match {
73.       case Some(keyOrd: Ordering[K]) =>
74.
75.           //為了減少內存的壓力,避免GC開銷,引入了外部排序器對數據進行排序當內存不足
              //以容納排序的數據量時,會根據配置的spark.shuffle.spill屬性來決定是否需要
              //spill到磁盤中,默認情況下會打開spill開關,若不打開spill開關,數據量比
              //較大時會引發內存溢出問題(Out of Memory,OOM)
76.         val sorter =
77.           new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
              serializer = dep.serializer)
78.         sorter.insertAll(aggregatedIter)
79.         context.taskMetrics().incMemoryBytesSpilled(sorter.
            memoryBytesSpilled)
80.         context.taskMetrics().incDiskBytesSpilled(sorter.
            diskBytesSpilled)
81.         context.taskMetrics().incPeakExecutionMemory
            (sorter.peakMemoryUsedBytes)
82.         CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]]
            (sorter.iterator, sorter.stop())
83.       case None =>
84.      //不需要排序分區內部數據時直接返回
85.         aggregatedIter
86.     }
87.   }
88. }

Spark 2.2.0版本的BlockStoreShuffleReader.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第4行blockFetcherItr名稱更改為wrappedStreams。

 上段代碼中第17行之前新增代碼serializerManager.wrapStream。

 上段代碼中第18行之后新增配置參數REDUCER_MAX_REQ_SIZE_SHUFFLE_ TO_MEM:shuffle時可請求內存的最大大小(以字節為單位)。

 上段代碼中第18行之后新增配置參數spark.shuffle.detectCorrupt:檢測獲取塊blocks中是否有任何損壞。

 上段代碼中第21~23行刪除。

 上段代碼中第28行wrappedStream調整為case (blockId, wrappedStream)。

1.   .......
2.      val wrappedStreams = new ShuffleBlockFetcherIterator(
3.       ......
4.        serializerManager.wrapStream,
5.      ......
          SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
6.        SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
7.  ......
8.     val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream)
       =>
9.  .......

下面進一步解析數據讀取的部分細節。首先是數據塊獲取、讀取的ShuffleBlock-FetcherIterator類,在類的構造體中調用了initialize方法(構造體中的表達式會在構造實例時執行),該方法中會根據數據塊所在位置(本地節點或遠程節點)分別進行讀取,其中關鍵代碼如下所示。

ShuffleBlockFetcherIterator的源碼如下。

1.     private[this] def initialize(): Unit = {
2.     //任務完成進行回調清理(在成功案例和失敗案例中調用)
3.     context.addTaskCompletionListener(_ => cleanup())
4.     //本地與遠程的數據讀取方式不同,因此先進行拆分,注意拆分時會考慮一次獲取的數據
       //大小(拆分時會同時考慮并行數)封裝請求,最后會將剩余不足該大小的數據獲取也封裝
       //為一個請求
5.
6.
7.
8.     val remoteRequests = splitLocalRemoteBlocks()
9.     //存入需要遠程讀取的數據塊請求信息
10.    fetchRequests ++= Utils.randomize(remoteRequests)
11.    assert ((0 == reqsInFlight) == (0 == bytesInFlight),
12.      "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
13.      ", expected bytesInFlight = 0 but found bytesInFlight = " +
         bytesInFlight)
14.
15.    //發送數據獲取請求
16.    fetchUpToMaxBytes()
17.
18.    val numFetches = remoteRequests.size - fetchRequests.size
19.    logInfo("Started " + numFetches + " remote fetches in" +
       Utils.getUsedTimeMs(startTime))
20.
21.     //除了遠程數據獲取外,下面是獲取本地數據塊的方法調用
22.    fetchLocalBlocks()
23.    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
24.  }

與Hadoop一樣,Spark計算框架也基于數據本地性,即移動數據而非移動計算的原則,因此在獲取數據塊時,也會考慮數據本地性,盡量從本地讀取已有的數據塊,然后再遠程讀取。

另外,數據塊的本地性是通過ShuffleBlockFetcherIterator實例構建時所傳入的位置信息來判斷的,而該信息由MapOutputTracker實例的getMapSizesByExecutorId方法提供,可以參考該方法的返回值類型查看相關的位置信息,返回值類型為:Seq[(BlockManagerId, Seq[(BlockId, Long)])]。其中,BlockManagerId是BlockManager的唯一標識信息,BlockId是數據塊的唯一信息,對應的Seq[(BlockId, Long)]表示一組數據塊標識ID及其數據塊大小的元組信息。

最后簡單分析一下如何設置分區內部的排序標識,當需要對分區內的數據進行排序時,會設置RDD中的寬依賴(ShuffleDependency)實例的keyOrdering變量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法給出解析,具體代碼如下所示。

OrderedRDDFunctions的源碼如下。

1.    def sortByKey(ascending: Boolean = true, numPartitions: Int =
      self.partitions.length)
2.        : RDD[(K, V)] = self.withScope
3.    {
4.     //注意,這里設置了該方法構建的RDD使用的分區器
       //根據Range而非Hash進行分區,對應的Range信息需要計算并將結果
       //反饋到Driver端,因此對應調用RDD中的Action,即會觸發一個Job的執行
5.  val part = new RangePartitioner(numPartitions, self, ascending)
6.      //在構建RDD實例后,設置Key的排序算法,即Ordering實例
7.      new ShuffledRDD[K, V, V](self, part)
8.        .setKeyOrdering(if (ascending) ordering else ordering.reverse)
9.    }

當需要對分區內部的數據進行排序時,構建RDD的同時會設置Key值的排序算法,結合前面的read代碼,當指定Key值的排序算法時,就會使用外部排序器對分區內的數據進行排序。

主站蜘蛛池模板: 安达市| 蕲春县| 廊坊市| 大新县| 天水市| 和龙市| 贵溪市| 张家界市| 武山县| 新闻| 响水县| 益阳市| 广元市| 孝昌县| 灵寿县| 海晏县| 北京市| 密云县| 吴忠市| 犍为县| 通城县| 齐齐哈尔市| 盐山县| 定陶县| 邮箱| 自治县| 金门县| 北辰区| 永安市| 含山县| 沾益县| 临泉县| 嘉禾县| 夹江县| 太保市| 梁山县| 施甸县| 台南县| 奈曼旗| 台前县| 甘洛县|