- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 7225字
- 2019-12-12 17:30:01
7.2 Shuffle的框架
本節講解Shuffle的框架、Shuffle的框架內核、Shuffle數據讀寫的源碼解析。Spark Shuffle從基于Hash的Shuffle,引入了Shuffle Consolidate機制(即文件合并機制),演進到基于Sort的Shuffle實現方式。隨著Tungsten計劃的引入與優化,引入了基于Tungsten-Sort的Shuffle實現方式。
7.2.1 Shuffle的框架演進
Spark的Shuffle框架演進歷史可以從框架本身的演進、Shuffle具體實現機制的演進兩部分進行解析。
框架本身的演進可以從面向接口編程的原則出發,結合Build設計模式進行理解。整個Spark的Shuffle框架從Spark 1.1版本開始,提供便于測試、擴展的可插拔式框架。
而對應Shuffle的具體實現機制的演進部分,可以跟蹤Shuffle實現細節在各個版本中的變更。具體體現在Shuffle數據的寫入或讀取,以及讀寫相關的數據塊解析方式。下面簡單描述一下整個演進過程。
在Spark 1.1之前,Spark中只實現了一種Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的實現方式中,每個Mapper階段的Task都會為每個Reduce階段的Task生成一個文件,通常會產生大量的文件(即對應為M×R個中間文件,其中,M表示Mapper階段的Task個數,R表示Reduce階段的Task個數)。伴隨大量的隨機磁盤I/O操作與大量的內存開銷。
為了緩解上述問題,在Spark 0.8.1版本中為基于Hash的Shuffle的實現引入了Shuffle Consolidate機制(即文件合并機制),將Mapper端生成的中間文件進行合并的處理機制。通過將配置屬性spark.shuffle.consolidateFiles設置為true,減少中間生成的文件數量。通過文件合并,可以將中間文件的生成方式修改為每個執行單位(類似于Hadoop的Slot)為每個Reduce階段的Task生成一個文件。其中,執行單位對應為:每個Mapper階段的Cores數/每個Task分配的Cores數(默認為1)。最終可以將文件個數從M×R修改為E×C/T×R,其中,E表示Executors個數,C表示可用Cores個數,T表示Task分配的Cores個數。
基于Hash的Shuffle的實現方式中,生成的中間結果文件的個數都會依賴于Reduce階段的Task個數,即Reduce端的并行度,因此文件數仍然不可控,無法真正解決問題。為了更好地解決問題,在Spark 1.1版本引入了基于Sort的Shuffle實現方式,并且在Spark 1.2版本之后,默認的實現方式也從基于Hash的Shuffle,修改為基于Sort的Shuffle實現方式,即使用的ShuffleManager從默認的hash修改為sort。首先,每個Mapper階段的Task不會為每個Reduce階段的Task生成一個單獨的文件;而是全部寫到一個數據(Data)文件中,同時生成一個索引(Index)文件,Reduce階段的各個Task可以通過該索引文件獲取相關的數據。避免產生大量文件的直接收益就是降低隨機磁盤I/O與內存的開銷。最終生成的文件個數減少到2M,其中M表示Mapper階段的Task個數,每個Mapper階段的Task分別生成兩個文件(1個數據文件、1個索引文件),最終的文件個數為M個數據文件與M個索引文件。因此,最終文件個數是2×M個。
隨著Tungsten計劃的引入與優化,從Spark 1.4版本開始(Tungsten計劃目前在Spark 1.5與Spark 1.6兩個版本中分別實現了第一與第二兩個階段),在Shuffle過程中也引入了基于Tungsten-Sort的Shuffle實現方式,通過Tungsten項目所做的優化,可以極大提高Spark在數據處理上的性能。
為了更合理、更高效地使用內存,在Spark的Shuffle實現方式演進過程中,引進了外部排序等處理機制(針對基于Sort的Shuffle機制。基于Hash的Shuffle機制從最原始的全部放入內存改為記錄級寫入)。同時,為了保存Shuffle結果提高性能以及支持資源動態分配等特性,也引進了外部Shuffle服務等機制。
7.2.2 Shuffle的框架內核
Shuffle框架的設計可以從兩方面理解:一方面,為了Shuffle模塊更加內聚并與其他模塊解耦;另一方面,為了更方便替換、測試、擴展Shuffle的不同實現方式。從Spark 1.1版本開始,引進了可插拔式的Shuffle框架(通過將Shuffle相關的實現封裝到一個統一的對外接口,提供一種具體實現可插拔的框架)。Spark框架中,通過ShuffleManager來管理各種不同實現機制的Shuffle過程,由ShuffleManager統一構建、管理具體實現子類來實現Shuffle框架的可插拔的Shuffle機制。
在詳細描述Shuffle框架實現細節之前,先給出可插拔式Shuffle的整體架構的類圖,如圖7-2所示。

圖7-2 可插拔式Shuffle的整體架構的類圖
在DAG的調度過程中,Stage階段的劃分是根據是否有Shuffle過程,也就是當存在ShuffleDependency的寬依賴時,需要進行Shuffle,這時會將作業(Job)劃分成多個Stage。對應地,在源碼實現中,通過在劃分Stage的關鍵點——構建ShuffleDependency時——進行Shuffle注冊,獲取后續數據讀寫所需的ShuffleHandle。
Stage階段劃分后,最終每個作業(Job)提交后都會對應生成一個ResultStage與若干個ShuffleMapStage,其中ResultStage表示生成作業的最終結果所在的Stage。ResultStage與ShuffleMapStage中的Task分別對應了ResultTask與ShuffleMapTask。一個作業,除了最終的ResultStage,其他若干ShuffleMapStage中的各個ShuffleMapTask都需要將最終的數據根據相應的分區器(Partitioner)對數據進行分組(即將數據重組到新的各個分區中),然后持久化分組后的數據。對應地,每個RDD本身記錄了它的數據來源,在計算(compute)時會讀取所需數據,對于帶有寬依賴的RDD,讀取時會獲取在ShuffleMapTask中持久化的數據。
從圖7-2中可以看到,外部寬依賴相關的RDD與ShuffleManager之間的注冊交互,通過該注冊,每個RDD自帶的寬依賴(ShuffleDependency)內部會維護Shuffle的唯一標識信息ShuffleId以及與Shuffle過程具體讀寫相關的句柄ShuffleHandle,后續在ShuffleMapTask中啟動任務(Task)的運行時,可以通過該句柄獲取相關的Shuffle寫入器實例,實現具體的數據磁盤寫操作。
而在帶有寬依賴(ShuffleDependency)的RDD中,執行compute時會去讀取上一Stage為其輸出的Shuffle數據,此時同樣會通過該句柄獲取相關的Shuffle讀取器實例,實現具體數據的讀取操作。需要注意的是,當前Shuffle的讀寫過程中,與BlockManager的交互,是通過MapOutputTracker來跟蹤Shuffle過程中各個任務的輸出數據的。在任務完成等場景中,會將對應的MapStatus信息注冊到MapOutputTracker中,而在compute數據讀取過程中,也會通過該跟蹤器來獲取上一Stage的輸出數據在BlockManager中的位置,然后通過getReader得到的數據讀取器,從這些位置中讀取數據。
目前對Shuffle的輸出進行跟蹤的MapOutputTracker并沒有和Shuffle數據讀寫類一樣,也封裝到Shuffle的框架中。如果從代碼聚合與解耦等角度出發,也可以將MapOutputTracker合并到整個Shuffle框架中,然后在Shuffle寫入器輸出數據之后立即進行注冊,在數據讀取器讀取數據前獲取位置等(但對應的DAG等調度部分,也需要進行修改)。
ShuffleManager封裝了各種Shuffle機制的具體實現細節,包含的接口與屬性如下所示。
(1)registerShuffle:每個RDD在構建它的父依賴(這里特指ShuffleDependency)時,都會先注冊到ShuffleManager,獲取ShuffleHandler,用于后續數據塊的讀寫等。
(2)getWriter:可以通過ShuffleHandler獲取數據塊寫入器,寫數據時通過Shuffle的塊解析器shuffleBlockResolver,獲取寫入位置(通常將寫入位置抽象為Bucket,位置的選擇則由洗牌的規則,即Shuffle的分區器決定),然后將數據寫入到相應位置(理論上,位置可以位于任何能存儲數據的地方,包括磁盤、內存或其他存儲框架等,目前在可插拔框架的幾種實現中,Spark與Hadoop一樣都采用磁盤的方式進行存儲,主要目的是為了節約內存,同時提高容錯性)。
(3)getReader:可以通過ShuffleHandler獲取數據塊讀取器,然后通過Shuffle的塊解析器shuffleBlockResolver,獲取指定數據塊。
(4)unregisterShuffle:與注冊對應,用于刪除元數據等后續清理操作。
(5)shuffleBlockResolver:Shuffle的塊解析器,通過該解析器,為數據塊的讀寫提供支撐層,便于抽象具體的實現細節。
7.2.3 Shuffle框架的源碼解析
用戶可以通過自定義ShuffleManager接口,并通過指定的配置屬性進行設置,也可以通過該配置屬性指定Spark已經支持的ShuffleManager具體實現子類。
在SparkEnv源碼中可以看到設置的配置屬性,以及當前在Spark的ShuffleManager可插拔框架中已經提供的ShuffleManager具體實現。Spark 2.0版本中支持sort、tungsten-sort兩種方式。
Spark 2.1.1版本的SparkEnv.scala的源碼如下。
1. //用戶可以通過短格式的命名來指定所使用的ShuffleManager 2. val shortShuffleMgrNames = Map( 3. "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager] .getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort. SortShuffleManager].getName) 4. 5. //指定ShuffleManager的配置屬性:"spark.shuffle.manager" 6. //默認情況下使用"sort",即SortShuffleManager的實現 7. val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 8. val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName. toLowerCase, shuffleMgrName) 9. val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
Spark 2.2.0版本的SparkEnv.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第8行調用toLowerCase小寫轉換方法,設置Locale.ROOT區域表示。root locale是一個區域設置,其語言、地區、變量都設置為空("")字符串。
1. ...... 2. val shuffleMgrClass = 3. shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase (Locale.ROOT), shuffleMgrName) 4. .......
從代碼中可以看出,ShuffleManager是Spark Shuffle系統提供的一個可插拔式接口,可以通過spark.shuffle.manager配置屬性來設置自定義的ShuffleManager。
在Driver和每個Executor的SparkEnv實例化過程中,都會創建一個ShuffleManager,用于管理塊數據,提供集群塊數據的讀寫,包括數據的本地讀寫和讀取遠程節點的塊數據。
Shuffle系統的框架可以以ShuffleManager為入口進行解析。在ShuffleManager中指定了整個Shuffle框架使用的各個組件,包括如何注冊到ShuffleManager,以獲取一個用于數據讀寫的處理句柄ShuffleHandle,通過ShuffleHandle獲取特定的數據讀寫接口:ShuffleWriter與ShuffleReader,以及如何獲取塊數據信息的解析接口ShuffleBlockResolver。下面通過源碼分別對這幾個比較重要的組件進行解析。
1.ShuffleManager的源碼解析
由于ShuffleManager是Spark Shuffle系統提供的一個可插拔式接口,提供具體實現子類或自定義具體實現子類時,都需要重寫ShuffleManager類的抽象接口。下面首先分析ShuffleManager的源碼。
ShuffleManager.scala的源碼如下。
1. 2. //Shuffle系統的可插拔接口。在Driver和每個Executor的SparkEnv實例中創建 3. private[spark] trait ShuffleManager { 4. 5. /** 6. *在Driver端向ShuffleManager注冊一個Shuffle,獲取一個Handle 7. *在具體Tasks中會通過該Handle來讀寫數據 8. */ 9. def registerShuffle[K, V, C]( 10. shuffleId: Int, 11. numMaps: Int, 12. dependency: ShuffleDependency[K, V, C]): ShuffleHandle 13. 14. /** *獲取對應給定的分區使用的ShuffleWriter,該方法在Executors上執行各個Map *任務時調用 15. */ 16. def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] 17. /** * 獲取在Reduce階段讀取分區的ShuffleReader,對應讀取的分區由[startPartition * to endPartition-1]區間指定。該方法在Executors上執行,在各個Reduce任務時調用 * 18 */ 19. def getReader[K, C]( 20 21. handle: ShuffleHandle, 22. startPartition: Int, 23. endPartition: Int, 24. context: TaskContext): ShuffleReader[K, C] 25. 26. /** 27. *該接口和registerShuffle分別負責元數據的取消注冊與注冊 28. *調用unregisterShuffle接口時,會移除ShuffleManager中對應的元數據信息 29. */ 30. def unregisterShuffle(shuffleId: Int): Boolean 31. 32. /** *返回一個可以基于塊坐標來獲取Shuffle 塊數據的ShuffleBlockResolver 33. */ 34. def shuffleBlockResolver: ShuffleBlockResolver 35. 36. /**終止ShuffleManager */ 37. def stop(): Unit 38. }
2.ShuffleHandle的源碼解析
1. abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
ShuffleHandle比較簡單,用于記錄Task與Shuffle相關的一些元數據,同時也可以作為不同具體Shuffle實現機制的一種標志信息,控制不同具體實現子類的選擇等。
3.ShuffleWriter的源碼解析
ShuffleWriter.scala的源碼如下。
1. private[spark] abstract class ShuffleWriter[K, V] { 2. /** Write a sequence of records to this task's output */ 3. @throws[IOException] 4. def write(records: Iterator[Product2[K, V]]): Unit 5. 6. /** Close this writer, passing along whether the map completed */ 7. def stop(success: Boolean): Option[MapStatus] 8. }
繼承ShuffleWriter的每個具體子類會實現write接口,給出任務在輸出時的寫記錄的具體方法。
4.ShuffleReader的源碼解析
ShuffleReader.scala的源碼如下。
1. private[spark] trait ShuffleReader[K, C] { 2. /** Read the combined key-values for this reduce task */ 3. def read(): Iterator[Product2[K, C]]
繼承ShuffleReader的每個具體子類會實現read接口,計算時負責從上一階段Stage的輸出數據中讀取記錄。
5.ShuffleBlockResolver的源碼解析
ShuffleBlockResolver的源碼如下。
1. /** *該特質的具體實現子類知道如何通過一個邏輯Shuffle塊標識信息來獲取一個塊數據。具體 *實現可以使用文件或文件段來封裝Shuffle的數據。這是獲取Shuffle塊數據時使用的抽 *象接口,在BlockStore中使用 2. */ 3. 4. 5. trait ShuffleBlockResolver { 6. type ShuffleId = Int 7. 8. /** *獲取指定塊的數據。如果指定塊的數據無法獲取,則拋出異常 9. */ 10. def getBlockData(blockId: ShuffleBlockId): ManagedBuffer 11. 12. def stop(): Unit 13. }
繼承ShuffleBlockResolver的每個具體子類會實現getBlockData接口,給出具體的獲取塊數據的方法。
目前在ShuffleBlockResolver的各個具體子類中,除給出獲取數據的接口外,通常會提供如何解析塊數據信息的接口,即提供了寫數據塊時的物理塊與邏輯塊之間映射關系的解析方法。
7.2.4 Shuffle數據讀寫的源碼解析
1.Shuffle寫數據的源碼解析
從Spark Shuffle的整體框架中可以看到,ShuffleManager提供了Shuffle相關數據塊的寫入與讀取,即對應的接口getWriter與getReader。
在解析Shuffle框架數據讀取過程中,可以構建一個具有ShuffleDependency的RDD,查看執行過程中,Shuffle框架中的數據讀寫接口getWriter與getReader如何使用,通過這種具體案例的方式來加深對源碼的理解。
Spark中Shuffle具體的執行機制可以參考本書的其他章節,在此僅分析與Shuffle直接相關的內容。通過DAG調度機制的解析,可以知道Spark中一個作業可以根據寬依賴切分Stages,而在Stages中,相應的Tasks也包含兩種,即ResultTask與ShuffleMapTask。其中,一個ShuffleMapTask會基于ShuffleDependency中指定的分區器,將一個RDD的元素拆分到多個buckets中,此時通過ShuffleManager的getWriter接口獲取數據與buckets的映射關系。而ResultTask對應的是一個將輸出返回給應用程序Driver端的Task,在該Task執行過程中,最終都會調用RDD的compute對內部數據進行計算,而在帶有ShuffleDependency的RDD中,在compute計算時,會通過ShuffleManager的getReader接口,獲取上一個Stage的Shuffle輸出結果作為本次Task的輸入數據。
首先來看ShuffleMapTask中的寫數據流程,具體代碼如下所示。
ShuffleMapTask.scala的源碼如下。
1. override def runTask(context: TaskContext): MapStatus = { 2. ...... 3. //首先從SparkEnv獲取ShuffleManager 4. //然后從ShuffleDependency中獲取注冊到ShuffleManager時得到的shuffleHandle 5. //根據shuffleHandle和當前Task對應的分區ID,獲取ShuffleWriter 6. //最后根據獲取的ShuffleWriter,調用其write接口,寫入當前分區的數據 7. var writer: ShuffleWriter[Any, Any] = null 8. try { 9. val manager = SparkEnv.get.shuffleManager 10. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 11. writer.write(rdd.iterator(partition, context).asInstanceOf [Iterator[_ <: Product2[Any, Any]]]) 12. writer.stop(success = true).get 13. } catch { 14. ...... 15. } 16. }
2.Shuffle讀數據的源碼解析
對應的數據讀取器,從RDD的5個抽象接口可知,RDD的數據流最終會經過算子操作,即RDD中的compute方法。下面以包含寬依賴的RDD、CoGroupedRDD為例,查看如何獲取Shuffle的數據。具體代碼如下所示。
Spark 1.6.0版本的CoGroupedRDD.scala的源碼如下。
1. //對指定分區進行計算的抽象接口,以下為CoGroupedRDD具體子類中該方法的實現 2. override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { 3. val split = s.asInstanceOf[CoGroupPartition] 4. val numRdds = dependencies.length 5. 6. //A list of (rdditerator, dependency number) pairs 7. val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] 8. for ((dep, depNum) <- dependencies.zipWithIndex) dep match { 9. case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => 10. val dependencyPartition = split.narrowDeps(depNum).get.split 11. //Read them from the parent 12. val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) 13. rddIterators += ((it, depNum)) 14. 15. case shuffleDependency: ShuffleDependency[_, _, _] => 16. //首先從SparkEnv獲取ShuffleManager 17. //然后從ShuffleDependency中獲取注冊到ShuffleManager時得到的shuffleHandle 18. //根據shuffleHandle和當前Task對應的分區ID,獲取ShuffleWriter 19. //最后根據獲取的ShuffleReader,調用其read接口,讀取Shuffle的Map輸出 20. 21. val it = SparkEnv.get.shuffleManager 22. .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) 23. .read() 24. rddIterators += ((it, depNum)) 25. } 26. 27. val map = createExternalMap(numRdds) 28. for ((it, depNum) <- rddIterators) { 29. map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) 30. } 31. context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) 32. context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) 33. context.internalMetricsToAccumulators( 34. InternalAccumulator.PEAK_EXECUTION_MEMORY).add (map.peakMemoryUsedBytes) 35. new InterruptibleIterator(context, 36. map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) 37. }
Spark 2.2.0版本的CoGroupedRDD.scala的源碼與Spark 1.6.0版本相比具有如下特點:上段代碼中第28~29行的context.internalMetricsToAccumulators方法調整為context.taskMetrics方法,用于任務的度量監控,監控內存的峰值使用情況。
1. ...... 2. context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) 3. .......
從代碼中可以看到,帶寬依賴的RDD的compute操作中,最終是通過SparkEnv中的ShuffleManager實例的getReader方法,獲取數據讀取器的,然后再次調用讀取器的read讀取指定分區范圍的Shuffle數據。注意,是帶寬依賴的RDD,而非ShuffleRDD,除了ShuffleRDD外,還有其他RDD也可以帶上寬依賴的,如前面給出的CoGroupedRDD。
目前支持的幾種具體Shuffle實現機制在讀取數據的處理上都是一樣的。從源碼角度可以看到,當前繼承了ShuffleReader這一數據讀取器的接口的具體子類只有BlockStoreShuffleReader,因此,本章內容僅在此對各種Shuffle實現機制的數據讀取進行解析,后續各實現機制中不再重復描述。
源碼解析的第一步仍然是查看該類的描述信息,具體如下所示。
1. /** 2. *通過從其他節點上請求讀取 Shuffle 數據來接收并讀取指定范圍[起始分區, 結束分區) *——對應為左閉右開區間 3. *通過從其他節點上請求讀取Shuffle數據來接收并讀取指定范圍[起始分區,結束分區] 4. *——對應為左閉右開區間 5. */
從注釋上可以看出,讀取器負責上一Stage為下一Stage輸出數據塊的讀取。從前面對ShuffleReader接口的解析可知,繼承的具體子類需要實現真正的數據讀取操作,即實現read方法。因此,該方法便是需要重點關注的源碼。一些關鍵的代碼如下所示。
Spark 2.1.1版本的BlockStoreShuffleReader.scala的源碼如下。
1. //為該Reduce任務讀取并合并key-values 值 2. override def read(): Iterator[Product2[K, C]] = { 3. //真正的數據Iterator讀取是通過ShuffleBlockFetcherIterator來完成的 4. val blockFetcherItr = new ShuffleBlockFetcherIterator( 5. context, 6. blockManager.shuffleClient, 7. blockManager, 8. //可以看到,當ShuffleMapTask完成后注冊到mapOutputTracker的元數據信息 //同樣會通過mapOutputTracker來獲取,在此同時還指定了獲取的分區范圍 //通過該方法的返回值類型 9. 10. 11. 12. mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), 13. //默認讀取時的數據大小限制為48m,對應后續并行的讀取,都是一種數據讀取的控制策 //略,一方面可以避免目標機器占用過多帶寬,同時也可以啟動并行機制,加快讀取速度 14. 15. 16. 17. SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, 18. SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) 19. 20. //在此針對前面獲取的各個數據塊唯一標識ID信息及其對應的輸入流進行處理 21. val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => 22. serializerManager.wrapStream(blockId, inputStream) 23. } 24. 25. val serializerInstance = dep.serializer.newInstance() 26. 27. //為每個流stream創建一個鍵/值迭代器 28. val recordIter = wrappedStreams.flatMap { wrappedStream => 29. //注意:askey Value Iterator在內部迭代器Next Iterator中包裹一個鍵/值對, //當Input Stream中的數據已讀取,Next Iterator確保Close()方法被調用 30. 31. 32. serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator 33. } 34. 35. //為每個記錄更新上下文任務度量 36. val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() 37. val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( 38. recordIter.map { record => 39. readMetrics.incRecordsRead(1) 40. record 41. }, 42. context.taskMetrics().mergeShuffleReadMetrics()) 43. 44. //為了支持任務取消,這里必須使用可中斷迭代器 45. val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) 46. //對讀取到的數據進行聚合處理 47. val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator .isDefined) { 48. //如果在Map端已經做了聚合的優化操作,則對讀取到的聚合結果進行聚合,注意此時的 //聚合操作與數據類型和Map端未做優化時是不同的 49. 50. 51. 52. if (dep.mapSideCombine) { 53. //對讀取到的數據進行聚合處理 54. val combinedKeyValuesIterator = interruptibleIter.asInstanceOf [Iterator[(K, C)]] 55. 56. //Map端各分區針對Key進行合并后的結果再次聚合,Map的合并可以大大減少網絡傳輸 //的數據量 57. 58. dep.aggregator.get.combineCombinersByKey (combinedKeyValuesIterator, context) 59. } else { 60. //我們無需關心值的類型,但應確保聚合是兼容的,其將把值的類型轉化成聚合以后的 //C類型 61. 62. 63. val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator [(K, Nothing)]] 64. dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) 65. } 66. } else { 67. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 68. interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] 69. } 70. //在基于Sort的Shuffle實現過程中,默認基于PartitionId進行排序,在分區的內 //部,數據是沒有排序的,因此添加了keyOrdering變量,提供是否需要針對分區內的 //數據進行排序的標識信息 71. //如果定義了排序,則對輸出結果進行排序 72. dep.keyOrdering match { 73. case Some(keyOrd: Ordering[K]) => 74. 75. //為了減少內存的壓力,避免GC開銷,引入了外部排序器對數據進行排序當內存不足 //以容納排序的數據量時,會根據配置的spark.shuffle.spill屬性來決定是否需要 //spill到磁盤中,默認情況下會打開spill開關,若不打開spill開關,數據量比 //較大時會引發內存溢出問題(Out of Memory,OOM) 76. val sorter = 77. new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) 78. sorter.insertAll(aggregatedIter) 79. context.taskMetrics().incMemoryBytesSpilled(sorter. memoryBytesSpilled) 80. context.taskMetrics().incDiskBytesSpilled(sorter. diskBytesSpilled) 81. context.taskMetrics().incPeakExecutionMemory (sorter.peakMemoryUsedBytes) 82. CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]] (sorter.iterator, sorter.stop()) 83. case None => 84. //不需要排序分區內部數據時直接返回 85. aggregatedIter 86. } 87. } 88. }
Spark 2.2.0版本的BlockStoreShuffleReader.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第4行blockFetcherItr名稱更改為wrappedStreams。
上段代碼中第17行之前新增代碼serializerManager.wrapStream。
上段代碼中第18行之后新增配置參數REDUCER_MAX_REQ_SIZE_SHUFFLE_ TO_MEM:shuffle時可請求內存的最大大小(以字節為單位)。
上段代碼中第18行之后新增配置參數spark.shuffle.detectCorrupt:檢測獲取塊blocks中是否有任何損壞。
上段代碼中第21~23行刪除。
上段代碼中第28行wrappedStream調整為case (blockId, wrappedStream)。
1. ....... 2. val wrappedStreams = new ShuffleBlockFetcherIterator( 3. ...... 4. serializerManager.wrapStream, 5. ...... SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM), 6. SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true)) 7. ...... 8. val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => 9. .......
下面進一步解析數據讀取的部分細節。首先是數據塊獲取、讀取的ShuffleBlock-FetcherIterator類,在類的構造體中調用了initialize方法(構造體中的表達式會在構造實例時執行),該方法中會根據數據塊所在位置(本地節點或遠程節點)分別進行讀取,其中關鍵代碼如下所示。
ShuffleBlockFetcherIterator的源碼如下。
1. private[this] def initialize(): Unit = { 2. //任務完成進行回調清理(在成功案例和失敗案例中調用) 3. context.addTaskCompletionListener(_ => cleanup()) 4. //本地與遠程的數據讀取方式不同,因此先進行拆分,注意拆分時會考慮一次獲取的數據 //大小(拆分時會同時考慮并行數)封裝請求,最后會將剩余不足該大小的數據獲取也封裝 //為一個請求 5. 6. 7. 8. val remoteRequests = splitLocalRemoteBlocks() 9. //存入需要遠程讀取的數據塊請求信息 10. fetchRequests ++= Utils.randomize(remoteRequests) 11. assert ((0 == reqsInFlight) == (0 == bytesInFlight), 12. "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight + 13. ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) 14. 15. //發送數據獲取請求 16. fetchUpToMaxBytes() 17. 18. val numFetches = remoteRequests.size - fetchRequests.size 19. logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) 20. 21. //除了遠程數據獲取外,下面是獲取本地數據塊的方法調用 22. fetchLocalBlocks() 23. logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) 24. }
與Hadoop一樣,Spark計算框架也基于數據本地性,即移動數據而非移動計算的原則,因此在獲取數據塊時,也會考慮數據本地性,盡量從本地讀取已有的數據塊,然后再遠程讀取。
另外,數據塊的本地性是通過ShuffleBlockFetcherIterator實例構建時所傳入的位置信息來判斷的,而該信息由MapOutputTracker實例的getMapSizesByExecutorId方法提供,可以參考該方法的返回值類型查看相關的位置信息,返回值類型為:Seq[(BlockManagerId, Seq[(BlockId, Long)])]。其中,BlockManagerId是BlockManager的唯一標識信息,BlockId是數據塊的唯一信息,對應的Seq[(BlockId, Long)]表示一組數據塊標識ID及其數據塊大小的元組信息。
最后簡單分析一下如何設置分區內部的排序標識,當需要對分區內的數據進行排序時,會設置RDD中的寬依賴(ShuffleDependency)實例的keyOrdering變量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法給出解析,具體代碼如下所示。
OrderedRDDFunctions的源碼如下。
1. def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) 2. : RDD[(K, V)] = self.withScope 3. { 4. //注意,這里設置了該方法構建的RDD使用的分區器 //根據Range而非Hash進行分區,對應的Range信息需要計算并將結果 //反饋到Driver端,因此對應調用RDD中的Action,即會觸發一個Job的執行 5. val part = new RangePartitioner(numPartitions, self, ascending) 6. //在構建RDD實例后,設置Key的排序算法,即Ordering實例 7. new ShuffledRDD[K, V, V](self, part) 8. .setKeyOrdering(if (ascending) ordering else ordering.reverse) 9. }
當需要對分區內部的數據進行排序時,構建RDD的同時會設置Key值的排序算法,結合前面的read代碼,當指定Key值的排序算法時,就會使用外部排序器對分區內的數據進行排序。