官术网_书友最值得收藏!

7.5 Tungsten Sorted Based Shuffle

本節講解Tungsten Sorted Based Shuffle,包括Tungsten Sorted Based Shuffle概述、Tungsten Sorted Based內核、Tungsten Sorted Based數據讀寫的源碼解析等內容。

7.5.1 概述

基于Tungsten Sort的Shuffle實現機制主要是借助Tungsten項目所做的優化來高效處理Shuffle。

Spark提供了配置屬性,用于選擇具體的Shuffle實現機制,但需要說明的是,雖然默認情況下Spark默認開啟的是基于Sort的Shuffle實現機制(對應spark.shuffle.manager的默認值),但實際上,參考Shuffle的框架內核部分可知基于Sort的Shuffle實現機制與基于Tungsten Sort的Shuffle實現機制都是使用SortShuffleManager,而內部使用的具體的實現機制,是通過提供的兩個方法進行判斷的。對應非基于Tungsten Sort時,通過SortShuffleWriter. shouldBypassMergeSort方法判斷是否需要回退到Hash風格的Shuffle實現機制,當該方法返回的條件不滿足時,則通過SortShuffleManager.canUseSerializedShuffle方法判斷是否需要采用基于Tungsten Sort的Shuffle實現機制,而當這兩個方法返回都為false,即都不滿足對應的條件時,會自動采用常規意義上的基于Sort的Shuffle實現機制。

因此,當設置了spark.shuffle.manager=tungsten-sort時,也不能保證就一定采用基于Tungsten Sort的Shuffle實現機制。有興趣的讀者可以參考Spark 1.5及之前的注冊方法的實現,該實現中SortShuffleManager的注冊方法僅構建了BaseShuffleHandle實例,同時對應的getWriter中也只對應構建了BaseShuffleHandle實例。

7.5.2 Tungsten Sorted Based Shuffle內核

基于Tungsten Sort的Shuffle實現機制的入口點仍然是SortShuffleManager類,與同樣在SortShuffleManager類控制下的其他兩種實現機制不同的是,基于Tungsten Sort的Shuffle實現機制使用的ShuffleHandle與ShuffleWriter分別為SerializedShuffleHandle與UnsafeShuffleWriter。因此,對應的具體實現機制可以用圖7-12來表示,對應如下。

在Sorted Based Shuffle中,SortShuffleManager根據內部采用的不同實現細節,分別給出兩種排序模式,而基于TungstenSort的Shuffle實現機制對應的就是序列化排序模式。

從圖7-12中可以看到基于Sort的Shuffle實現機制,具體的寫入器的選擇與注冊得到的ShuffleHandle類型有關,參考SortShuffleManager類的registerShuffle方法。

registerShuffle方法中會判斷是否滿足序列化模式的條件,如果滿足,則使用基于TungstenSort的Shuffle實現機制,對應在代碼中,表現為使用類型為SerializedShuffleHandle的ShuffleHandle。上述代碼進一步說明了在spark.shuffle.manager設置為sort時,內部會自動選擇具體的實現機制。對應代碼的先后順序,就是選擇的先后順序。

對應的序列化排序(Serialized sorting)模式需要滿足的條件如下所示。

圖7-12 基于TungstenSort的Shuffle實現機制的框架類圖

(1)Shuffle依賴中不帶聚合操作或沒有對輸出進行排序的要求。

(2)Shuffle的序列化器支持序列化值的重定位(當前僅支持KryoSerializer以及Spark SQL子框架自定義的序列化器)。

(3)Shuffle過程中的輸出分區個數少于16 777 216個。

實際上,使用過程中還有其他一些限制,如引入那個Page形式的內存管理模型后,內部單條記錄的長度不能超過128 MB(具體內存模型可以參考PackedRecordPointer類)。另外,分區個數的限制也是該內存模型導致的(同樣參考PackedRecordPointer類)。

所以,目前使用基于TungstenSort的Shuffle實現機制條件還是比較苛刻的。

7.5.3 Tungsten Sorted Based Shuffle數據讀寫的源碼解析

對應這種SerializedShuffleHandle及其相關的Shuffle數據寫入器類型的相關代碼,可以參考SortShuffleManager類的getWriter方法,關鍵代碼如下所示。

SortShuffleManager.scala的源碼如下。

1.        /** 為指定的分區提供一個數據寫入器。該方法在Map端的Tasks中調用*/
2.  override def getWriter[K, V](
3.        handle: ShuffleHandle,
4.        mapId: Int,
5.        context: TaskContext): ShuffleWriter[K, V] = {
6.      numMapsForShuffle.putIfAbsent(
7.        handle.shuffleId,        handle.asInstanceOf[BaseShuffleHandle[_,  _,
          _]].numMaps)
8.      val env = SparkEnv.get
9.      handle match {
10.   //SerializedShuffleHandle對應的寫入器為UnsafeShuffleWriter
      //使用的數據塊邏輯與物理映射關系仍然為IndexShuffleBlockResolver,對應
      //SortShuffleManager中的變量,因此相同
11.      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V
         @unchecked] =>
12.        new UnsafeShuffleWriter(
13.          env.blockManager,
14.          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
15.          context.taskMemoryManager(),
16.          unsafeShuffleHandle,
17.          mapId,
18.          context,
19.          env.conf)
20.      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K
@         unchecked, V @unchecked] =>
21.        new BypassMergeSortShuffleWriter(
22.          env.blockManager,
23.          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
24.          bypassMergeSortHandle,
25.          mapId,
26.          context,
27.          env.conf)
28.      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
29.        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
30
31.    }
32.  }

數據寫入器類UnsafeShuffleWriter中使用SortShuffleManager實例中的變量shuffleBlockResolver來對邏輯數據塊與物理數據塊的映射進行解析,而該變量使用的是與基于Hash的Shuffle實現機制不同的解析類,即當前使用的IndexShuffleBlockResolver。

UnsafeShuffleWriter構建時傳入了一個與其他兩種基于Sorted的Shuffle實現機制不同的參數:context.taskMemoryManager(),在此構建了一個TaskMemoryManager實例并傳入UnsafeShuffleWriter。TaskMemoryManager與Task是一對一的關系,負責管理分配給Task的內存。

下面開始解析寫數據塊的UnsafeShuffleWriter類的源碼實現。首先來看其write的方法。

UnsafeShuffleWriter.scala的源碼如下。

1.      public void write(scala.collection.Iterator<Product2<K, V>> records)
        throws IOException {
2.   ......
3.     boolean success = false;
4.     try {
5.     //對輸入的記錄集 records,循環將每條記錄插入到外部排序器
6.       while (records.hasNext()) {
7.         insertRecordIntoSorter(records.next());
8.       }
9.       closeAndWriteOutput();
10.      //生成最終的兩個結果文件,和Sorted Based Shuffle的實現機制一樣,每個Map
         //端的任務對應生成一個數據(Data)文件和對應的索引(Index)文件
11.
12.
13.      success = true;
14.    } finally {
15.      if (sorter != null) {
16.     try {
17.       sorter.cleanupResources();
18.     } catch (Exception e) {
19.  .......

寫過程的關鍵步驟有三步。

(1)通過insertRecordIntoSorter(records.next())方法將每條記錄插入外部排序器。

(2)closeAndWriteOutput方法寫數據文件與索引文件,在寫的過程中,會先合并外部排序器在插入過程中生成的Spill中間文件。

(3)sorter.cleanupResources()最后釋放外部排序器的資源。

首先查看將每條記錄插入外部排序器(ShuffleExternalSorter)時所使用的insertRecordIntoSorter方法,其關鍵代碼如下所示。

UnsafeShuffleWriter.scala的源碼如下。

1.        void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
2.      assert(sorter != null);
3.    //對于多次訪問的Key值,使用局部變量,可以避免多次函數調用
4.      final K key = record._1();
5.      final int partitionId = partitioner.getPartition(key);
6.   //先復位存放每條記錄的緩沖區,內部使用ByteArrayOutputStream存放每條記錄,容量
     //為1MB
7.
8.
9.      serBuffer.reset();
10.  //進一步使用序列化器從serBuffer緩沖區構建序列化輸出流,將記錄寫入到緩沖區
11.     serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
12.     serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
13.     serOutputStream.flush();
14.
15.     final int serializedRecordSize = serBuffer.size();
16.     assert (serializedRecordSize > 0);
17.    //將記錄插入到外部排序器中,serBuffer是一個字節數組,內部數據存放的偏移量為
       //Platform.BYTE_ARRAY_OFFSET
18.
19.
20.     sorter.insertRecord(
21.       serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize,
          partitionId);
22.   }

下面繼續查看第二步寫數據文件與索引文件的closeAndWriteOutput方法,其關鍵代碼如下所示。

closeAndWriteOutput的源碼如下。

1.   void closeAndWriteOutput() throws IOException {
2.    assert(sorter != null);
3.    updatePeakMemoryUsed();
4.    //設為null,用于GC垃圾回收
5.    serBuffer = null;
6.    serOutputStream = null;
7.    //關閉外部排序器,并獲取全部Spill信息
8.    final SpillInfo[] spills = sorter.closeAndGetSpills();
9.    sorter = null;
10.   final long[] partitionLengths;
11.     //通過塊解析器獲取輸出文件名
12.     final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
13.     //在后續合并Spill文件時先使用臨時文件名,最終再重命名為真正的輸出文件名,
        //即在writeIndexFileAndCommit方法中會重復通過塊解析器獲取輸出文件名
14.     final File tmp = Utils.tempFileWith(output);
15.     try {
16.       try {
17.         partitionLengths = mergeSpills(spills, tmp);
18.       } finally {
19.         for (SpillInfo spill : spills) {
20.           if (spill.file.exists() && ! spill.file.delete()) {
21.             logger.error("Error while deleting spill file {}",
                spill.file.getPath());
22.           }
23.         }
24.       }
25.
26.  //將合并Spill后獲取的分區及其數據量信息寫入索引文件,并將臨時數據文件重命名為
     //真正的數據文件名
27.
28.       shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
          partitionLengths, tmp);
29.     } finally {
30.       if (tmp.exists() && !tmp.delete()) {
31.         logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
32.       }
33.     }
34.    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
       partitionLengths);
35.   }

closeAndWriteOutput方法主要有以下三步。

(1)觸發外部排序器,獲取Spill信息。

(2)合并中間的Spill文件,生成數據文件,并返回各個分區對應的數據量信息。

(3)根據各個分區的數據量信息生成數據文件對應的索引文件。

writeIndexFileAndCommit方法和Sorted Based Shuffle機制的實現一樣,在此僅分析過程中不同的Spill文件合并步驟,即mergeSpills方法的具體實現。

UnsafeShuffleWriter.scala的mergeSpills方法的源碼如下。

1.   /**
       * 合并 0 個或多個Spill的中間文件,基于Spills的個數以及I/O壓縮碼選擇最
       * 快速的合并策略。返回包含合并文件中各個分區的數據長度的數組。
2.     * /
3.
4.
5.  private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws
    IOException {
6.
7.
8.  //獲取Shuffle的壓縮配置信息
9.      final    boolean   compressionEnabled       =  sparkConf.getBoolean("spark.
        shuffle.compress", true);
10.     final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.
        createCodec(sparkConf);
11.   //獲取是否啟動unsafe的快速合并
12.    final boolean fastMergeEnabled =
13.      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
14.
15.  //沒有壓縮或者當壓縮碼支持序列化流合并時,支持快速合并
16.    final boolean fastMergeIsSupported = !compressionEnabled ||
17.      CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams
         (compressionCodec);
18.    final boolean encryptionEnabled = blockManager.serializerManager().
       encryptionEnabled();
19.    try {
20.
21.        //沒有中間的Spills文件時,創建一個空文件,并返回包含分區數據長度的
           //空數組。后續讀取時會過濾掉空文件
22.
23.
24.      if (spills.length == 0) {
25.        new FileOutputStream(outputFile).close(); //Create an empty file
26.        return new long[partitioner.numPartitions()];
27.      } else if (spills.length == 1) {
28.
29.        //最后一個Spills文件已經更新 metrics 信息,因此不需要重復更新,直接
           //重命名Spills的中間臨時文件為目標輸出的數據文件,同時將該Spills中間
           //文件的各分區數據長度的數組返回即可
30.        Files.move(spills[0].file, outputFile);
31.        return spills[0].partitionLengths;
32.      } else {
33.        final long[] partitionLengths;
34.       //當存在多個Spill 中間文件時,根據不同的條件,采用不同的文件合并策略
35.
36.        if (fastMergeEnabled && fastMergeIsSupported) {
37.         //由spark.file.transferTo配置屬性控制,默認為true
38.          if (transferToEnabled && !encryptionEnabled) {
39.            logger.debug("Using transferTo-based fast merge");
40.
41.            //通過 NIO 的方式合并各個Spills的分區字節數據
42.            //僅在 I/O 壓縮碼和序列化器支持序列化流的合并時安全
43.
44.            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
45.          } else {
46.            logger.debug("Using fileStream-based fast merge");
47.            //使用 Java FileStreams文件流的方式進行合并
48.            partitionLengths = mergeSpillsWithFileStream(spills, outputFile,
               null);
49.          }
50.        } else {
51.          logger.debug("Using slow merge");
52.         partitionLengths = mergeSpillsWithFileStream(spills, outputFile,
            compressionCodec);
53.        }
54.        //更新Shuffle寫數據的度量信息
55.        writeMetrics.decBytesWritten(spills[spills.length - 1].file.
           length());
56.        writeMetrics.incBytesWritten(outputFile.length());
57.        return partitionLengths;
58.      }
59.    } catch (IOException e) {
60.      if (outputFile.exists() && !outputFile.delete()) {
61.
62.        logger.error("Unable to delete output file {}", outputFile.
           getPath());
63.      }
64.      throw e;
65.    }
66.  }

各種合并策略在性能上具有一定差異,會根據具體的條件采用,主要有基于Java NIO(New I/O)和基于普通文件流合并文件的方式。下面簡單描述一下基于文件合并流的處理過程,代碼如下所示:

UnsafeShuffleWriter.scala的mergeSpillsWithFileStream方法的源碼如下。

1.    /** 使用 Java FileStreams文件流的方式合并*/
2.  private long[] mergeSpillsWithFileStream(
3.        SpillInfo[] spills,
4.        File outputFile,
5.        @Nullable CompressionCodec compressionCodec) throws IOException {
6.      assert (spills.length >= 2);
7.      final int numPartitions = partitioner.numPartitions();
8.      final long[] partitionLengths = new long[numPartitions];
9.
10. //對應打開的輸入流的個數為Spills的臨時文件個數
11.     final InputStream[] spillInputStreams = new FileInputStream
        [spills.length];
12.
13.    //使用計數輸出流避免關閉基礎文件并詢問文件系統在每個分區寫入文件大小
14.
15.     final CountingOutputStream mergedFileOutputStream = new
        CountingOutputStream(
16.       new FileOutputStream(outputFile));
17.
18.     boolean threwException = true;
19.     try {
20.       //為每個Spills中間文件打開文件輸入流
21.       for (int i = 0; i < spills.length; i++) {
22.         spillInputStreams[i] = new FileInputStream(spills[i].file);
23.       }
24.       //遍歷分區
25.       for (int partition = 0; partition < numPartitions; partition++) {
26.         final long initialFileLength = mergedFileOutputStream.getByteCount();
27.         //屏蔽底層輸出流的close()調用,以便能夠關閉高層流,以確保所有數據都真正刷
            //新并清除內部狀態
28.
29.         OutputStream partitionOutput = new CloseShieldOutputStream(
30.           new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));
31.         partitionOutput = blockManager.serializerManager().wrapForEncryption
            (partitionOutput);
32.         if (compressionCodec != null) {
33.           partitionOutput = compressionCodec.compressedOutputStream
              (partitionOutput);
34.         }
35.
36.         //依次從各個 Spills 輸入流中讀取當前分區的數據長度指定個數的字節,到各個分
            //區對應的輸出文件流中
37.         for (int i = 0; i < spills.length; i++) {
38.           final long partitionLengthInSpill = spills[i].partitionLengths
              [partition];
39.           if (partitionLengthInSpill > 0) {
40.             InputStream partitionInputStream = new LimitedInputStream
                (spillInputStreams[i],
41.               partitionLengthInSpill, false);
42.             try {
43.               partitionInputStream = blockManager.serializerManager()
                  .wrapForEncryption(
44.                 partitionInputStream);
45.               if (compressionCodec != null) {
46.                 partitionInputStream = compressionCodec.compressedInputStream
                    (partitionInputStream);
47.               }
48.               ByteStreams.copy(partitionInputStream, partitionOutput);
49.             } finally {
50.               partitionInputStream.close();
51.             }
52.           }
53.         }
54.         partitionOutput.flush();
55.         partitionOutput.close();
56.  //將當前寫入的數據長度存入返回的數組中
57.         partitionLengths[partition] = (mergedFileOutputStream
            .getByteCount() - initialFileLength);
58.       }
59.       threwException = false;
60.     } finally {
61.       //為了避免屏蔽異常以后導致過早進入finally塊的異常處理,只能在清理過程中拋出
          //異常
62.
63.       for (InputStream stream : spillInputStreams) {
64.         Closeables.close(stream, threwException);
65.       }
66.       Closeables.close(mergedFileOutputStream, threwException);
67.     }
68.     return partitionLengths;
69.   }

基于NIO的文件合并流程基本類似,只是底層采用NIO的技術實現。

主站蜘蛛池模板: 台山市| 本溪| 东丰县| 婺源县| 峨山| 永年县| 黔东| 文登市| 遵义县| 巩留县| 固阳县| 郧西县| 绥阳县| 万山特区| 延安市| 新竹县| 高清| 方山县| 乌鲁木齐县| 建宁县| 海丰县| 博乐市| 乡城县| 腾冲县| 那曲县| 柳州市| 凤阳县| 新民市| 西吉县| 益阳市| 松江区| 时尚| 民权县| 中超| 樟树市| 措美县| 叶城县| 宜黄县| 偃师市| 长白| 昌乐县|