- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 4091字
- 2019-12-12 17:30:01
7.3 Hash Based Shuffle
本節講解Hash Based Shuffle,包括Hash Based Shuffle概述、Hash Based Shuffle內核、Hash Based Shuffle的數據讀寫的源碼解析等內容。
7.3.1 概述
在Spark 1.1之前,Spark中只實現了一種Shuffle方式,即基于Hash的Shuffle。在Spark 1.1版本中引入了基于Sort的Shuffle實現方式,并且在Spark 1.2版本之后,默認的實現方式從基于Hash的Shuffle,修改為基于Sort的Shuffle實現方式,即使用的ShuffleManager從默認的hash修改為sort。說明在Spark 2.0版本中,Hash的Shuffle方式已經不再使用。
Spark之所以一開始就提供基于Hash的Shuffle實現機制,其主要目的之一就是為了避免不需要的排序(這也是Hadoop Map Reduce被人詬病的地方,將Sort作為固定步驟,導致許多不必要的開銷)。但基于Hash的Shuffle實現機制在處理超大規模數據集的時候,由于過程中會產生大量的文件,導致過度的磁盤I/O開銷和內存開銷,會極大地影響性能。
但在一些特定的應用場景下,采用基于Hash的實現Shuffle機制的性能會超過基于Sort的Shuffle實現機制。關于基于Hash與基于Sort的Shuffle實現機制的性能測試方面,可以參考Spark創始人之一的ReynoldXin給的測試:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing”。
相關數據可以參考https://issues.apache.org/jira/browse/SPARK-3280。
因此,在Spark 1.2版本中修改為默認基于Sort的Shuffle實現機制時,同時也給出了特定應用場景下回退的機制。
7.3.2 Hash Based Shuffle內核
1.基于Hash的Shuffle實現機制的內核框架
基于Hash的Shuffle實現,ShuffleManager的具體實現子類為HashShuffleManager,對應的具體實現機制如圖7-3所示。

圖7-3 基于哈希算法的Shuffle實現機制的內核框架
其中,HashShuffleManager是ShuffleManager的基于哈希算法實現方式的具體實現子類。數據塊的讀寫分別由BlockStoreShuffleReader與HashShuffleWriter實現;數據塊的文件解析器則由具體子類FileShuffleBlockResolver實現;BaseShuffleHandle是ShuffleHandle接口的基本實現,保存Shuffle注冊的信息。
HashShuffleManager繼承自ShuffleManager,對應實現了各個抽象接口。基于Hash的Shuffle,內部使用的各組件的具體子類如下所示。
(1)BaseShuffleHandle:攜帶了Shuffle最基本的元數據信息,包括shuffleId、numMaps和dependency。
(2)BlockStoreShuffleReader:負責寫入的Shuffle數據塊的讀操作。
(3)FileShuffleBlockResolver:負責管理,為Shuffle任務分配基于磁盤的塊數據的Writer。每個ShuffleShuffle任務為每個Reduce分配一個文件。
(4)HashShuffleWriter:負責Shuffle數據塊的寫操作。
在此與解析整個Shuffle過程一樣,以HashShuffleManager類作為入口進行解析。
首先看一下HashShuffleManager具體子類的注釋,如下所示。
Spark 1.6.0版本的HashShuffleManager.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。
1. /** *使用Hash的ShuffleManager具體實現子類,針對每個Mapper都會為各個Reduce分 *區構建一個輸出文件(也可能是多個任務復用文件) 2. */ 3. private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { 4. ......
2.基于Hash的Shuffle實現方式一
為了避免Hadoop中基于Sort方式的Shuffle所帶來的不必要的排序開銷,Spark在開始時采用了基于Hash的Shuffle方式。但這種方式存在不少缺陷,這些缺陷大部分是由于在基于Hash的Shuffle實現過程中創建了太多的文件所造成的。在這種方式下,每個Mapper端的Task運行時都會為每個Reduce端的Task生成一個文件,具體如圖7-4所示。

圖7-4 基于Hash的Shuffle實現方式——文件的輸出細節圖
Executor-Mapper表示執行Mapper端的Tasks的工作點,可以分布到集群中的多臺機器節點上,并且可以以不同的形式出現,如以Spark Standalone部署模式中的Executor出現,也可以以Spark On Yarn部署模式中的容器形式出現,關鍵是它代表了實際執行Mapper端的Tasks的工作點的抽象概念。其中,M表示Mapper端的Task的個數,R表示Reduce端的Task的個數。
對應在右側的本地文件系統是在該工作點上所生成的文件,其中R表示Reduce端的分區個數。生成的文件名格式為:shuffle_shuffleId_mapId_reduceId,其中的shuffle_shuffleId_1_1表示mapId為1,同時reduceId也為1。
在Mapper端,每個分區對應啟動一個Task,而每個Task會為每個Reducer端的Task生成一個文件,因此最終生成的文件個數為M×R。
由于這種實現方式下,對應生成文件個數僅與Mapper端和Reducer端各自的分區數有關,因此圖中將Mapper端的全部M個Task抽象到一個Executor-Mapper中,實際場景中通常是分布到集群中的各個工作點中。
生成的各個文件位于本地文件系統的指定目錄中,該目錄地址由配置屬性spark.local.dir設置。說明:分區數與Task數,一個是靜態的數據分塊個數,一個是數據分塊對應執行的動態任務個數,因此,在特定的、描述個數的場景下,兩者是一樣的。
3.基于Hash的Shuffle實現方式二
為了減少Hash所生成的文件個數,對基于Hash的Shuffle實現方式進行了優化,引入文件合并的機制,該機制設置的開關為配置屬性spark.shuffle.consolidateFiles。在引入文件合并的機制后,當設置配置屬性為true,即啟動文件合并時,在Mapper端的輸出文件會進行合并,在一定程度上可以大量減少文件的生成,降低不必要的開銷。文件合并的實現方式可以參考圖7-5。

圖7-5 基于Hash的Shuffle的合并文件機制的輸出細節圖
Executor-Mapper表示集群中分配的某個工作點,其中,C表示在該工作點上所分配到的內核(Core)個數,T表示在該工作點上為每個Task分配的內核個數。C/T表示在該工作點上調度時最大的Task并行個數。
右側的本地文件系統是在該工作點上所生成的文件,其中R表示Reduce端的分區個數。生成的文件名格式為:merged_shuffle_shuffleId_bucketId_fileId,其中的merged_shuffle_ shuffleId_1_1表示bucketId為1,同時fileId也為1。
在Mapper端,Task會復用文件組,由于最大并行個數為C/T,因此文件組最多分配C/T個,當某個Task運行結束后,會釋放該文件組,之后調度的Task則復用前一個Task所釋放的文件組,因此會復用同一個文件。最終在該工作點上生成的文件總數為C/T*R,如果設工作點個數為E,則總的文件數為E*C/T*R。
4.基于Hash的Shuffle機制的優缺點
1)優點
可以省略不必要的排序開銷。
避免了排序所需的內存開銷。
2)缺點
生成的文件過多,會對文件系統造成壓力。
大量小文件的隨機讀寫會帶來一定的磁盤開銷。
數據塊寫入時所需的緩存空間也會隨之增加,會對內存造成壓力。
7.3.3 Hash Based Shuffle數據讀寫的源碼解析
1.基于Hash的Shuffle實現方式一的源碼解析
下面針對Spark 1.6版本中的基于Hash的Shuffle實現在數據寫方面進行源碼解析(Spark2.0版本中已無Hash的Shuffle實現方式)。在基于Hash的Shuffle實現機制中,采用HashShuffleWriter作為數據寫入器。在HashShuffleWriter中控制Shuffle寫數據的關鍵代碼如下所示。
Spark 1.6.0版本的HashShuffleWriter.scala的源碼(Spark 2.2版本已無HashShuffle-Manager方式)如下。
1. private[spark] class HashShuffleWriter[K, V]( 2. shuffleBlockResolver: FileShuffleBlockResolver, 3. handle: BaseShuffleHandle[K, V, _], 4. mapId: Int, 5. context: TaskContext) 6. extends ShuffleWriter[K, V] with Logging { 7. 8. //控制每個Writer輸出時的切片個數,對應分區個數 9. private val dep = handle.dependency 10. private val numOutputSplits = dep.partitioner.numPartitions 11. 12. ...... 13. //獲取數據讀寫的塊管理器 14. private val blockManager = SparkEnv.get.blockManager 15. private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) 16. 17. //從 FileShuffleBlockResolver 的 forMapTask方法中獲取指定的 shuffleId 對應 //的mapId 18. //對應分區個數構建的數據塊寫的ShuffleWriterGroup實例 19. private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) 20. 21. 22. /** Task輸出時一組記錄的寫入 */ 23. 24. override def write(records: Iterator[Product2[K, V]]): Unit = { 25. //判斷在寫時是否需要先聚合,即定義了Map端Combine時,先對數據進行聚合再寫入,否則 //直接返回需要寫入的一批記錄 26. 27. val iter = if (dep.aggregator.isDefined) { 28. if (dep.mapSideCombine) { 29. dep.aggregator.get.combineValuesByKey(records, context) 30. } else { 31. records 32. } 33. } else { 34. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 35. records 36. } 37. 38. //根據分區器,獲取每條記錄對應的bucketId(即所在Reduce序號),根據bucketId //從FileShuffleBlockResolver構建的ShuffleWriterGroup中,獲取DiskBlock- //ObjectWriter實例,對應磁盤數據塊的數據寫入器 39. for (elem<- iter) { 40. val bucketId = dep.partitioner.getPartition(elem._1) 41. shuffle.writers(bucketId).write(elem._1, elem._2) 42. } 43. } 44. ...... 45. }
當需要在Map端進行聚合時,使用的是聚合器(Aggregator)的combineValuesByKey方法,在該方法中使用ExternalAppendOnlyMap類對記錄集進行處理,處理時如果內存不足,會引發Spill操作。早期的實現會直接緩存到內存,在數據量比較大時容易引發內存泄漏。
在HashShuffleManager中,ShuffleBlockResolver特質使用的具體子類為FileShuffleBlock-Resolver,即指定了具體如何從一個邏輯Shuffle塊標識信息來獲取一個塊數據,對應為下面第7行調用的forMapTask方法,具體代碼如下所示:
Spark 1.6.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。
1. /** 2. *針對給定的 Map Task,指定一個ShuffleWriterGroup實例,在數據塊寫入器成功 3. *關閉時,會注冊為完成狀態 4. */ 5. 6. 7. def forMapTask(shuffleId: Int, mapId: Int, numReduces: Int, serializer: Serializer, 8. writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { 9. new ShuffleWriterGroup { 10. //在FileShuffleBlockResolver中維護著當前Map Task對應shuffleId標識的 //Shuffle中,指定numReduces個數的Reduce的各個狀態 11. 12. shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReduces)) 13. private val shuffleState = shuffleStates(shuffleId) 14. 15. ...... 16. //根據Reduce端的任務個數,構建元素類型為DiskBlockObjectWriter的數組, //DiskBlockObjectWriter負責具體數據的磁盤寫入 17. //原則上,Shuffle的輸出可以存放在各種提供存儲機制的系統上,但為了容錯性等方面的 //考慮,目前的Shuffle實行機制都會寫入到磁盤中 18. 19. val writers: Array[DiskBlockObjectWriter] = { 20. //這里的邏輯Bucket的Id值即對應的Reduce的任務序號,或者說分區ID 21. Array.tabulate[DiskBlockObjectWriter](numReduces) { bucketId => 22. //針對每個Map端分區的Id與Bucket的Id構建數據塊的邏輯標識 23. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 24. val blockFile = blockManager.diskBlockManager.getFile(blockId) 25. val tmp = Utils.tempFileWith(blockFile) 26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) 27. } 28. } 29. ...... 30. //任務完成時回調的釋放寫入器方法 31. override def releaseWriters(success: Boolean) { 32. shuffleState.completedMapTasks.add(mapId) 33. } 34. } 35. }
其中,ShuffleBlockId實例構建的源碼如下所示。
1. case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { 2. override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId 3. }
從name方法的重載上可以看出,后續構建的文件與代碼中的mapId、reduceId的關系。當然,所有同一個Shuffle的輸出數據塊,都會帶上shuffleId這個唯一標識的,因此全局角度上,邏輯數據塊name不會重復(針對一些推測機制或失敗重試機制之類的場景而已,邏輯name沒有帶上時間信息,因此缺少多次執行的輸出區別,但在管理這些信息時會維護一個時間作為有效性判斷)。
2.基于Hash的Shuffle實現方式二的源碼解析
下面通過詳細解析FileShuffleBlockResolver源碼來加深對文件合并機制的理解。
由于在Spark 1.6中,文件合并機制已經刪除,因此下面基于Spark 1.5版本的代碼對文件合并機制的具體實現細節進行解析。以下代碼位于FileShuffleBlockResolver類中。
合并機制的關鍵控制代碼如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。
1. /** 2. *獲取一個針對特定Map Task的ShuffleWriterGroup 3. */ 4. 5. 6. def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, 7. writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { 8. new ShuffleWriterGroup { 9. ...... 10. val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) { 11. //獲取未使用的文件組 12. fileGroup = getUnusedFileGroup() 13. Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => 14. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 15. //注意獲取磁盤寫入器時,傳入的第二個參數與未使用文件合并機制時的差異 16. //fileGroup(bucketId):構造器方式調用,對應apply的方法調用 17.blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, 18. writeMetrics) 19. } 20. } else { 21. Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => 22. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 23. //根據ShuffleBlockId信息獲取文件名 24. val blockFile = blockManager.diskBlockManager.getFile(blockId) 25. val tmp = Utils.tempFileWith(blockFile) 26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) 27. } 28. } 29. ...... 30. writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) 31. override def releaseWriters(success: Boolean) { 32. //帶文件合并機制時,寫入器在釋放后的處理 33. //3個關鍵信息mapId、offsets、lengths 34. if (consolidateShuffleFiles) { 35. if (success) { 36. val offsets = writers.map(_.fileSegment().offset) 37. val lengths = writers.map(_.fileSegment().length) 38. fileGroup.recordMapOutput(mapId, offsets, lengths) 39. } 40. //回收文件組,便于后續復用 41. recycleFileGroup(fileGroup) 42. } else { 43. shuffleState.completedMapTasks.add(mapId) 44. } 45. }
其中,第10行中的consolidateShuffleFiles變量,是判斷是否設置了文件合并機制,當設置consolidateShuffleFiles為true后,會繼續調用getUnusedFileGroup方法,在該方法中會獲取未使用的文件組,即重新分配或已經釋放可以復用的文件組。
獲取未使用的文件組(ShuffleFileGroup)的相關代碼getUnusedFileGroup如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。
1. private def getUnusedFileGroup(): ShuffleFileGroup = { 2. //獲取已經構建但未使用的文件組,如果獲取失敗,則重新構建一個文件組 3. val fileGroup = shuffleState.unusedFileGroups.poll() 4. if (fileGroup != null) fileGroup else newFileGroup() 5. } 6. //重新構建一個文件組的源碼 7. private def newFileGroup(): ShuffleFileGroup = { 8. //構建后會對文件編號進行遞增,該文件編號最終用在生成的文件名中 9. val fileId = shuffleState.nextFileId.getAndIncrement() 10. val files = Array.tabulate[File](numBuckets) { bucketId => 11. //最終的文件名,可以通過文件名的組成及取值細節,加深對實現細節在文件個數上的差異的理解 12. 13. val filename = physicalFileName(shuffleId, bucketId, fileId) 14. blockManager.diskBlockManager.getFile(filename) 15. } 16. //構建并添加到shuffleState中,便于后續復用 17. val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) 18. shuffleState.allFileGroups.add(fileGroup) 19. fileGroup 20. }
其中,第13行代碼對應生成的文件名,即物理文件名,相關代碼如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。
1. private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { 2. "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) 3. }
可以看到,與未使用文件合并時的基于Hash的Shuffle實現方式不同的是,在生成的文件名中沒有對應的mapId,取而代之的是與文件組相關的fileId,而fileId則是多個Mapper端的Task所共用的,在此僅從生成的物理文件名中也可以看出文件合并的某些實現細節。
另外,對應生成的文件組既然是復用的,當一個Mapper端的Task執行結束后,便會釋放該文件組(ShuffleFileGroup),之后繼續調度時便會復用該文件組。對應地,調度到某個Executor工作點上同時運行的Task最大個數,就對應了最多分配的文件組個數。
而在TaskSchedulerImpl調度Task時,各個Executor工作點上Task調度控制的源碼說明了在各個Executor工作點上調度并行的Task數,具體代碼如下所示。
1. private def resourceOfferSingleTaskSet( 2. taskSet: TaskSetManager, 3. maxLocality: TaskLocality, 4. shuffledOffers: Seq[WorkerOffer], 5. availableCpus: Array[Int], 6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { 7. var launchedTask = false 8. for (i <- 0 until shuffledOffers.size) { 9. val execId = shuffledOffers(i).executorId 10. val host = shuffledOffers(i).host 11. //判斷當前Executor工作點上可用的內核個數是否滿足Task所需的內核個數 12. //CPUS_PER_TASK:表示設置的每個Task所需的內核個數 13. if (availableCpus(i) >= CPUS_PER_TASK) { 14. try { 15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { 16. ...... 17. launchedTask = true 18. } 19. } catch { 20. ...... 21. } 22. } 23. } 24. return launchedTask 25. }
其中,設置每個Task所需的內核個數的配置屬性如下所示:
1. //每個任務請求的CPU個數 2. val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
對于這些會影響Executor中并行執行的任務數的配置信息,設置時需要多方面考慮,包括內核個數與任務個數的合適比例,在內存模型中,為任務分配內存的具體策略等。任務分配內存的具體策略可以參考Spark官方給出的具體設計文檔,以及文檔中各種設計方式的權衡等內容。