官术网_书友最值得收藏!

7.3 Hash Based Shuffle

本節講解Hash Based Shuffle,包括Hash Based Shuffle概述、Hash Based Shuffle內核、Hash Based Shuffle的數據讀寫的源碼解析等內容。

7.3.1 概述

在Spark 1.1之前,Spark中只實現了一種Shuffle方式,即基于Hash的Shuffle。在Spark 1.1版本中引入了基于Sort的Shuffle實現方式,并且在Spark 1.2版本之后,默認的實現方式從基于Hash的Shuffle,修改為基于Sort的Shuffle實現方式,即使用的ShuffleManager從默認的hash修改為sort。說明在Spark 2.0版本中,Hash的Shuffle方式已經不再使用。

Spark之所以一開始就提供基于Hash的Shuffle實現機制,其主要目的之一就是為了避免不需要的排序(這也是Hadoop Map Reduce被人詬病的地方,將Sort作為固定步驟,導致許多不必要的開銷)。但基于Hash的Shuffle實現機制在處理超大規模數據集的時候,由于過程中會產生大量的文件,導致過度的磁盤I/O開銷和內存開銷,會極大地影響性能。

但在一些特定的應用場景下,采用基于Hash的實現Shuffle機制的性能會超過基于Sort的Shuffle實現機制。關于基于Hash與基于Sort的Shuffle實現機制的性能測試方面,可以參考Spark創始人之一的ReynoldXin給的測試:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing”。

相關數據可以參考https://issues.apache.org/jira/browse/SPARK-3280。

因此,在Spark 1.2版本中修改為默認基于Sort的Shuffle實現機制時,同時也給出了特定應用場景下回退的機制。

7.3.2 Hash Based Shuffle內核

1.基于Hash的Shuffle實現機制的內核框架

基于Hash的Shuffle實現,ShuffleManager的具體實現子類為HashShuffleManager,對應的具體實現機制如圖7-3所示。

圖7-3 基于哈希算法的Shuffle實現機制的內核框架

其中,HashShuffleManager是ShuffleManager的基于哈希算法實現方式的具體實現子類。數據塊的讀寫分別由BlockStoreShuffleReader與HashShuffleWriter實現;數據塊的文件解析器則由具體子類FileShuffleBlockResolver實現;BaseShuffleHandle是ShuffleHandle接口的基本實現,保存Shuffle注冊的信息。

HashShuffleManager繼承自ShuffleManager,對應實現了各個抽象接口。基于Hash的Shuffle,內部使用的各組件的具體子類如下所示。

(1)BaseShuffleHandle:攜帶了Shuffle最基本的元數據信息,包括shuffleId、numMaps和dependency。

(2)BlockStoreShuffleReader:負責寫入的Shuffle數據塊的讀操作。

(3)FileShuffleBlockResolver:負責管理,為Shuffle任務分配基于磁盤的塊數據的Writer。每個ShuffleShuffle任務為每個Reduce分配一個文件。

(4)HashShuffleWriter:負責Shuffle數據塊的寫操作。

在此與解析整個Shuffle過程一樣,以HashShuffleManager類作為入口進行解析。

首先看一下HashShuffleManager具體子類的注釋,如下所示。

Spark 1.6.0版本的HashShuffleManager.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。

1.  /**
      *使用Hash的ShuffleManager具體實現子類,針對每個Mapper都會為各個Reduce分
      *區構建一個輸出文件(也可能是多個任務復用文件)
2.    */
3.  private[spark]      class    HashShuffleManager(conf:         SparkConf)   extends
    ShuffleManager with Logging {
4.  ......
2.基于Hash的Shuffle實現方式一

為了避免Hadoop中基于Sort方式的Shuffle所帶來的不必要的排序開銷,Spark在開始時采用了基于Hash的Shuffle方式。但這種方式存在不少缺陷,這些缺陷大部分是由于在基于Hash的Shuffle實現過程中創建了太多的文件所造成的。在這種方式下,每個Mapper端的Task運行時都會為每個Reduce端的Task生成一個文件,具體如圖7-4所示。

圖7-4 基于Hash的Shuffle實現方式——文件的輸出細節圖

Executor-Mapper表示執行Mapper端的Tasks的工作點,可以分布到集群中的多臺機器節點上,并且可以以不同的形式出現,如以Spark Standalone部署模式中的Executor出現,也可以以Spark On Yarn部署模式中的容器形式出現,關鍵是它代表了實際執行Mapper端的Tasks的工作點的抽象概念。其中,M表示Mapper端的Task的個數,R表示Reduce端的Task的個數。

對應在右側的本地文件系統是在該工作點上所生成的文件,其中R表示Reduce端的分區個數。生成的文件名格式為:shuffle_shuffleId_mapId_reduceId,其中的shuffle_shuffleId_1_1表示mapId為1,同時reduceId也為1。

在Mapper端,每個分區對應啟動一個Task,而每個Task會為每個Reducer端的Task生成一個文件,因此最終生成的文件個數為M×R。

由于這種實現方式下,對應生成文件個數僅與Mapper端和Reducer端各自的分區數有關,因此圖中將Mapper端的全部M個Task抽象到一個Executor-Mapper中,實際場景中通常是分布到集群中的各個工作點中。

生成的各個文件位于本地文件系統的指定目錄中,該目錄地址由配置屬性spark.local.dir設置。說明:分區數與Task數,一個是靜態的數據分塊個數,一個是數據分塊對應執行的動態任務個數,因此,在特定的、描述個數的場景下,兩者是一樣的。

3.基于Hash的Shuffle實現方式二

為了減少Hash所生成的文件個數,對基于Hash的Shuffle實現方式進行了優化,引入文件合并的機制,該機制設置的開關為配置屬性spark.shuffle.consolidateFiles。在引入文件合并的機制后,當設置配置屬性為true,即啟動文件合并時,在Mapper端的輸出文件會進行合并,在一定程度上可以大量減少文件的生成,降低不必要的開銷。文件合并的實現方式可以參考圖7-5。

圖7-5 基于Hash的Shuffle的合并文件機制的輸出細節圖

Executor-Mapper表示集群中分配的某個工作點,其中,C表示在該工作點上所分配到的內核(Core)個數,T表示在該工作點上為每個Task分配的內核個數。C/T表示在該工作點上調度時最大的Task并行個數。

右側的本地文件系統是在該工作點上所生成的文件,其中R表示Reduce端的分區個數。生成的文件名格式為:merged_shuffle_shuffleId_bucketId_fileId,其中的merged_shuffle_ shuffleId_1_1表示bucketId為1,同時fileId也為1。

在Mapper端,Task會復用文件組,由于最大并行個數為C/T,因此文件組最多分配C/T個,當某個Task運行結束后,會釋放該文件組,之后調度的Task則復用前一個Task所釋放的文件組,因此會復用同一個文件。最終在該工作點上生成的文件總數為C/T*R,如果設工作點個數為E,則總的文件數為E*C/T*R。

4.基于Hash的Shuffle機制的優缺點

1)優點

 可以省略不必要的排序開銷。

 避免了排序所需的內存開銷。

2)缺點

 生成的文件過多,會對文件系統造成壓力。

 大量小文件的隨機讀寫會帶來一定的磁盤開銷。

 數據塊寫入時所需的緩存空間也會隨之增加,會對內存造成壓力。

7.3.3 Hash Based Shuffle數據讀寫的源碼解析

1.基于Hash的Shuffle實現方式一的源碼解析

下面針對Spark 1.6版本中的基于Hash的Shuffle實現在數據寫方面進行源碼解析(Spark2.0版本中已無Hash的Shuffle實現方式)。在基于Hash的Shuffle實現機制中,采用HashShuffleWriter作為數據寫入器。在HashShuffleWriter中控制Shuffle寫數據的關鍵代碼如下所示。

Spark 1.6.0版本的HashShuffleWriter.scala的源碼(Spark 2.2版本已無HashShuffle-Manager方式)如下。

1.  private[spark] class HashShuffleWriter[K, V](
2.      shuffleBlockResolver: FileShuffleBlockResolver,
3.      handle: BaseShuffleHandle[K, V, _],
4.  mapId: Int,
5.      context: TaskContext)
6.    extends ShuffleWriter[K, V] with Logging {
7.
8.    //控制每個Writer輸出時的切片個數,對應分區個數
9.    private val dep = handle.dependency
10.   private val numOutputSplits = dep.partitioner.numPartitions
11.
12.   ......
13.   //獲取數據讀寫的塊管理器
14.   private val blockManager = SparkEnv.get.blockManager
15.   private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
16.
17. //從  FileShuffleBlockResolver      的 forMapTask方法中獲取指定的    shuffleId  對應
    //的mapId
18. //對應分區個數構建的數據塊寫的ShuffleWriterGroup實例
19.   private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId,
      mapId, numOutputSplits, ser, writeMetrics)
20.
21.
22.   /** Task輸出時一組記錄的寫入 */
23.
24.   override def write(records: Iterator[Product2[K, V]]): Unit = {
25. //判斷在寫時是否需要先聚合,即定義了Map端Combine時,先對數據進行聚合再寫入,否則
    //直接返回需要寫入的一批記錄
26.
27.   val iter = if (dep.aggregator.isDefined) {
28.       if (dep.mapSideCombine) {
29.         dep.aggregator.get.combineValuesByKey(records, context)
30.       } else {
31.         records
32.       }
33.     } else {
34.       require(!dep.mapSideCombine, "Map-side combine without Aggregator
          specified!")
35.       records
36.     }
37.
38.     //根據分區器,獲取每條記錄對應的bucketId(即所在Reduce序號),根據bucketId
        //從FileShuffleBlockResolver構建的ShuffleWriterGroup中,獲取DiskBlock-
        //ObjectWriter實例,對應磁盤數據塊的數據寫入器
39.     for (elem<- iter) {
40.       val bucketId = dep.partitioner.getPartition(elem._1)
41.       shuffle.writers(bucketId).write(elem._1, elem._2)
42.     }
43.   }
44.      ......
45. }

當需要在Map端進行聚合時,使用的是聚合器(Aggregator)的combineValuesByKey方法,在該方法中使用ExternalAppendOnlyMap類對記錄集進行處理,處理時如果內存不足,會引發Spill操作。早期的實現會直接緩存到內存,在數據量比較大時容易引發內存泄漏。

在HashShuffleManager中,ShuffleBlockResolver特質使用的具體子類為FileShuffleBlock-Resolver,即指定了具體如何從一個邏輯Shuffle塊標識信息來獲取一個塊數據,對應為下面第7行調用的forMapTask方法,具體代碼如下所示:

Spark 1.6.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。

1.  /**
2.    *針對給定的 Map Task,指定一個ShuffleWriterGroup實例,在數據塊寫入器成功
3.    *關閉時,會注冊為完成狀態
4.    */
5.
6.
7.   def forMapTask(shuffleId: Int, mapId: Int, numReduces: Int, serializer:
     Serializer,
8.  writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
9.      new ShuffleWriterGroup {
10.       //在FileShuffleBlockResolver中維護著當前Map Task對應shuffleId標識的
          //Shuffle中,指定numReduces個數的Reduce的各個狀態
11.
12. shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReduces))
13.       private val shuffleState = shuffleStates(shuffleId)
14.
15. ......
16.           //根據Reduce端的任務個數,構建元素類型為DiskBlockObjectWriter的數組,
              //DiskBlockObjectWriter負責具體數據的磁盤寫入
17.    //原則上,Shuffle的輸出可以存放在各種提供存儲機制的系統上,但為了容錯性等方面的
       //考慮,目前的Shuffle實行機制都會寫入到磁盤中
18.
19.  val writers: Array[DiskBlockObjectWriter] = {
20.         //這里的邏輯Bucket的Id值即對應的Reduce的任務序號,或者說分區ID
21.         Array.tabulate[DiskBlockObjectWriter](numReduces) { bucketId =>
22.         //針對每個Map端分區的Id與Bucket的Id構建數據塊的邏輯標識
23.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
24.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
25.           val tmp = Utils.tempFileWith(blockFile)
26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize,
    writeMetrics)
27.         }
28.       }
29. ......
30.       //任務完成時回調的釋放寫入器方法
31.       override def releaseWriters(success: Boolean) {
32. shuffleState.completedMapTasks.add(mapId)
33.       }
34.     }
35.   }

其中,ShuffleBlockId實例構建的源碼如下所示。

1.  case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
    extends BlockId {
2.    override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_"
      + reduceId
3.  }

從name方法的重載上可以看出,后續構建的文件與代碼中的mapId、reduceId的關系。當然,所有同一個Shuffle的輸出數據塊,都會帶上shuffleId這個唯一標識的,因此全局角度上,邏輯數據塊name不會重復(針對一些推測機制或失敗重試機制之類的場景而已,邏輯name沒有帶上時間信息,因此缺少多次執行的輸出區別,但在管理這些信息時會維護一個時間作為有效性判斷)。

2.基于Hash的Shuffle實現方式二的源碼解析

下面通過詳細解析FileShuffleBlockResolver源碼來加深對文件合并機制的理解。

由于在Spark 1.6中,文件合并機制已經刪除,因此下面基于Spark 1.5版本的代碼對文件合并機制的具體實現細節進行解析。以下代碼位于FileShuffleBlockResolver類中。

合并機制的關鍵控制代碼如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。

1.  /**
2.    *獲取一個針對特定Map Task的ShuffleWriterGroup
3.    */
4.
5.
6.    def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer:
      Serializer,
7.  writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
8.      new ShuffleWriterGroup {
9.  ......
10.       val writers: Array[DiskBlockObjectWriter] = if
         (consolidateShuffleFiles) {
11. //獲取未使用的文件組
12. fileGroup = getUnusedFileGroup()
13.         Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
14. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
15. //注意獲取磁盤寫入器時,傳入的第二個參數與未使用文件合并機制時的差異
16. //fileGroup(bucketId):構造器方式調用,對應apply的方法調用
17.blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance,
    bufferSize,
18. writeMetrics)
19.         }
20.       } else {
21.         Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
22. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
23. //根據ShuffleBlockId信息獲取文件名
24.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
25.           val tmp = Utils.tempFileWith(blockFile)
26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize,
    writeMetrics)
27.         }
28.       }
29. ......
30. writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
31.       override def releaseWriters(success: Boolean) {
32. //帶文件合并機制時,寫入器在釋放后的處理
33. //3個關鍵信息mapId、offsets、lengths
34.         if (consolidateShuffleFiles) {
35.           if (success) {
36.             val offsets = writers.map(_.fileSegment().offset)
37.             val lengths = writers.map(_.fileSegment().length)
38. fileGroup.recordMapOutput(mapId, offsets, lengths)
39.           }
40. //回收文件組,便于后續復用
41. recycleFileGroup(fileGroup)
42.         } else {
43. shuffleState.completedMapTasks.add(mapId)
44.         }
45.       }

其中,第10行中的consolidateShuffleFiles變量,是判斷是否設置了文件合并機制,當設置consolidateShuffleFiles為true后,會繼續調用getUnusedFileGroup方法,在該方法中會獲取未使用的文件組,即重新分配或已經釋放可以復用的文件組。

獲取未使用的文件組(ShuffleFileGroup)的相關代碼getUnusedFileGroup如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。

1.  private def getUnusedFileGroup(): ShuffleFileGroup = {
2.  //獲取已經構建但未使用的文件組,如果獲取失敗,則重新構建一個文件組
3.          val fileGroup = shuffleState.unusedFileGroups.poll()
4.          if (fileGroup != null) fileGroup else newFileGroup()
5.        }
6.  //重新構建一個文件組的源碼
7.        private def newFileGroup(): ShuffleFileGroup = {
8.  //構建后會對文件編號進行遞增,該文件編號最終用在生成的文件名中
9.          val fileId = shuffleState.nextFileId.getAndIncrement()
10.         val files = Array.tabulate[File](numBuckets) { bucketId =>
11. //最終的文件名,可以通過文件名的組成及取值細節,加深對實現細節在文件個數上的差異的理解
12.
13.           val filename = physicalFileName(shuffleId, bucketId, fileId)
14. blockManager.diskBlockManager.getFile(filename)
15.         }
16. //構建并添加到shuffleState中,便于后續復用
17.         val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
18. shuffleState.allFileGroups.add(fileGroup)
19. fileGroup
20.       }

其中,第13行代碼對應生成的文件名,即物理文件名,相關代碼如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源碼(Spark 2.2版本已無HashShuffleManager方式)如下。

1.  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
2.      "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
3.    }

可以看到,與未使用文件合并時的基于Hash的Shuffle實現方式不同的是,在生成的文件名中沒有對應的mapId,取而代之的是與文件組相關的fileId,而fileId則是多個Mapper端的Task所共用的,在此僅從生成的物理文件名中也可以看出文件合并的某些實現細節。

另外,對應生成的文件組既然是復用的,當一個Mapper端的Task執行結束后,便會釋放該文件組(ShuffleFileGroup),之后繼續調度時便會復用該文件組。對應地,調度到某個Executor工作點上同時運行的Task最大個數,就對應了最多分配的文件組個數。

而在TaskSchedulerImpl調度Task時,各個Executor工作點上Task調度控制的源碼說明了在各個Executor工作點上調度并行的Task數,具體代碼如下所示。

1.  private def resourceOfferSingleTaskSet(
2.  taskSet: TaskSetManager,
3.  maxLocality: TaskLocality,
4.  shuffledOffers: Seq[WorkerOffer],
5.  availableCpus: Array[Int],
6.      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
7.  var launchedTask = false
8.    for (i <- 0 until shuffledOffers.size) {
9.  val execId = shuffledOffers(i).executorId
10. val host = shuffledOffers(i).host
11. //判斷當前Executor工作點上可用的內核個數是否滿足Task所需的內核個數
12. //CPUS_PER_TASK:表示設置的每個Task所需的內核個數
13. if (availableCpus(i) >= CPUS_PER_TASK) {
14. try {
15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
16. ......
17. launchedTask = true
18. }
19.      } catch {
20. ......
21.       }
22.     }
23.   }
24. return launchedTask
25. }

其中,設置每個Task所需的內核個數的配置屬性如下所示:

1.  //每個任務請求的CPU個數
2.  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

對于這些會影響Executor中并行執行的任務數的配置信息,設置時需要多方面考慮,包括內核個數與任務個數的合適比例,在內存模型中,為任務分配內存的具體策略等。任務分配內存的具體策略可以參考Spark官方給出的具體設計文檔,以及文檔中各種設計方式的權衡等內容。

主站蜘蛛池模板: 峨眉山市| 饶阳县| 遂宁市| 钟山县| 无为县| 治县。| 麦盖提县| 盘锦市| 罗甸县| 岚皋县| 乐东| 双辽市| 余干县| 伊吾县| 天水市| 扬中市| 泰来县| 星子县| 民乐县| 贞丰县| 普格县| 咸宁市| 东光县| 黑河市| 桦南县| 民和| 桑日县| 金阳县| 临清市| 台中县| 旅游| 天柱县| 利津县| 房山区| 繁峙县| 金堂县| 鄱阳县| 澳门| 井陉县| 台中市| 苏尼特左旗|