官术网_书友最值得收藏!

7.6 Shuffle與Storage模塊間的交互

在Spark中,存儲模塊被抽象成Storage。顧名思義,Storage是存儲的意思,代表著Spark中的數據存儲系統,負責管理和實現數據塊(Block)的存放。其中存取數據的最小單元是Block,數據由不同的Block組成,所有操作都是以Block為單位進行的。從本質上講,RDD中的Partition和Storage中的Block是等價的,只是所處的模塊不同,看待的角度不一樣而已。

Storage抽象模塊的實現分為兩個層次,如圖7-13所示。

(1)通信層:通信層是典型的Master-Slave結構,Master和Slave之間傳輸控制和狀態信息。通信層主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等類實現。

(2)存儲層:負責把數據存儲到內存、磁盤或者堆外內存中,有時還需要為數據在遠程節點上生成副本,這些都由存儲層提供的接口實現。Spark 2.2.0具體的存儲層的實現類有DiskStore和MemoryStore。

圖7-13 Storage存儲模塊

Shuffle模塊若要和Storage模塊進行交互,需要通過調用統一的操作類BlockManager來完成。如果把整個存儲模塊看成一個黑盒,BlockManager就是黑盒上留出的一個供外部調用的接口。

7.6.1 Shuffle注冊的交互

Spark中BlockManager在Driver端的創建,在SparkContext創建的時候會根據具體的配置創建SparkEnv對象,源碼如下所示。

SparkContext.scala的源碼如下。

1.   _env = createSparkEnv(_conf, isLocal, listenerBus)
2.      SparkEnv.set(_env)
3.  .......
4.  private[spark] def createSparkEnv(
5.        conf: SparkConf,
6.        isLocal: Boolean,
7.        listenerBus: LiveListenerBus): SparkEnv = {
8.  //創建Driver端的運行環境
9.      SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext
        .numDriverCores(master))
10.   }

createSparkEnv方法中,傳入SparkConf配置對象、isLocal標志,以及LiveListenerBus,方法中使用SparkEnv對象的createDriverEnv方法創建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,將會創建BlockManager、BlockManagerMaster等對象,完成Storage在Driver端的部署。

SparkEnv中創建BlockManager、BlockManagerMaster關鍵源碼如下所示。

SparkEnv.scala的源碼如下。

1.     val blockTransferService =
2.        new NettyBlockTransferService(conf, securityManager, bindAddress,
          advertiseAddress, blockManagerPort, numUsableCores)
3.
4.  //創建BlockManagerMasterEndpoint
5.      val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
6.        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
7.    //創建BlockManagerMasterEndpoint
8.        new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
9.        conf, isDriver)
10.  //創建BlockManager
11.     //注:blockManager無效,直到initialize()被調用
12.     val blockManager = new BlockManager(executorId, rpcEnv,
        blockManagerMaster,
13.       serializerManager, conf, memoryManager, mapOutputTracker,
          shuffleManager,
14.       blockTransferService, securityManager, numUsableCores)

使用new關鍵字實例化出BlockManagerMaster,傳入BlockManager的構造函數,實例化出BlockManager對象。這里的BlockManagerMaster和BlockManager屬于聚合關系。BlockManager主要對外提供統一的訪問接口,BlockManagerMaster主要對內提供各節點之間的指令通信服務。

構建BlockManager時,傳入shuffleManager參數,shuffleManager是在SparkEnv中創建的,將shuffleManager傳入到BlockManager中,BlockManager就擁有shuffleManager的成員變量,從而可以調用shuffleManager的相關方法。

BlockManagerMaster在Driver端和Executors中的創建稍有差別。首先來看在Driver端創建的情形。創建BlockManagerMaster傳入的isDriver參數,isDriver為true,表示在Driver端創建,否則視為在Slave節點上創建。

當SparkContext中執行_env.blockManager.initialize(_applicationId)代碼時,會調用Driver端BlockManager的initialize方法。Initialize方法的源碼如下所示。

SparkContext.scala的源碼如下。

1.  _env.blockManager.initialize(_applicationId)

Spark 2.1.1版本的BlockManager.scala的源碼如下。

1.   def initialize(appId: String): Unit = {
2.   //調用blockTransferService的init方法,blockTransferService用于在不同節點
     //fetch數據、傳送數據
3.     blockTransferService.init(this)
4.   //shuffleClient用于讀取其他Executor上的shuffle files
5.      shuffleClient.init(appId)
6.
7.      blockReplicationPolicy = {
8.        val priorityClass = conf.get(
9.          "spark.storage.replication.policy", classOf
            [RandomBlockReplicationPolicy].getName)
10.       val clazz = Utils.classForName(priorityClass)
11.       val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
12.       logInfo(s"Using $priorityClass for block replication policy")
13.       ret
14.     }
15.
16.     val id =
17.       BlockManagerId(executorId, blockTransferService.hostName,
          blockTransferService.port, None)
18.
19.   //向blockManagerMaster注冊BlockManager。在registerBlockManager方法中傳
      //入了  slaveEndpoint,slaveEndpoint      為  BlockManager   中的 RPC  對象,用于和
      //blockManagerMaster通信
20.     val idFromMaster = master.registerBlockManager(
21.       id,
22.       maxMemory,
23.       slaveEndpoint)
24.   //得到blockManagerId
25.     blockManagerId = if (idFromMaster != null) idFromMaster else id
26.
27. //得到shuffleServerId
28.     shuffleServerId = if (externalShuffleServiceEnabled) {
29.       logInfo(s"external shuffle service port = $externalShuffleServicePort")
30.       BlockManagerId(executorId, blockTransferService.hostName,
          externalShuffleServicePort)
31.     } else {
32.       blockManagerId
33.     }
34.  //注冊shuffleServer
35.     //如果存在,將注冊Executors配置與本地shuffle服務
36.     if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
37.       registerWithExternalShuffleServer()
38.     }
39.
40.     logInfo(s"Initialized BlockManager: $blockManagerId")
41.   }

Spark 2.2.0版本的BlockManager.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第22行刪除maxMemory。

 上段代碼中第22行之后新增參數maxOnHeapMemory:最大的堆內存大小。

 上段代碼中第22行之后新增參數maxOffHeapMemory:最大的堆外存大小。

1.  .....
2.        maxOnHeapMemory,
3.        maxOffHeapMemory,
4.  ...

如上面的源碼所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。

(1)初始化BlockTransferService。

(2)初始化ShuffleClient。

(3)創建BlockManagerId。

(4)將BlockManager注冊到BlockManagerMaster上。

(5)若ShuffleService可用,則注冊ShuffleService。

在BlockManager的initialize方法上右擊Find Usages,可以看到initialize方法在兩個地方得到調用:一個是SparkContext;另一個是Executor。啟動Executor時,會調用BlockManager的initialize方法。Executor中調用initialize方法的源碼如下所示。

Executor.scala的源碼如下。

1.  //CoarseGrainedExecutorBackend中實例化Executor,isLocal設置成false,即
     //Executor中isLocal始終為false
2.
3.   if (!isLocal) {
4.  //向度量系統注冊
5.    //env.metricsSystem.registerSource(executorSource)
6.   //調用BlockManager的initialize方法,initialize方法將向BlockManagerMaster
     //注冊,完成Executor中的BlockManager向Driver中的BlockManager注冊
7.      env.blockManager.initialize(conf.getAppId)
8.    }

上面代碼中調用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注冊。使用方法master.registerBlockManager (id,maxMemory,slaveEndpoint)完成注冊,registerBlockManager方法中傳入Id、maxMemory、salveEndPoint引用,分別表示Executor中的BlockManager、最大內存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一個RPC端點,使用它完成同BlockManagerMaster的通信。BlockManager收到注冊請求后將Executor中注冊的BlockManagerInfo存入哈希表中,以便通過BlockManagerSlaveEndpoint向Executor發送控制命令。

ShuffleManager是一個用于shuffle系統的可插拔接口。在Driver端SparkEnv中創建ShuffleManager,在每個Executor上也會創建?;趕park.shuffle.manager進行設置。Driver使用ShuffleManager注冊到shuffles系統,Executors(或Driver在本地運行的任務)可以請求讀取和寫入數據。這將被SparkEnv的SparkConf和isDriver布爾值作為參數。

ShuffleManager.scala的源碼如下。

1.   private[spark] trait ShuffleManager {
2.
3.    /**
4.      *注冊一個shuffle管理器,獲取一個句柄傳遞給任務
5.      */
6.    def registerShuffle[K, V, C](
7.        shuffleId: Int,
8.        numMaps: Int,
9.        dependency: ShuffleDependency[K, V, C]): ShuffleHandle
10.
11.   /**為給定分區獲取一個寫入器。Executors節點通過Map任務調用*/
12.   def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:
      TaskContext): ShuffleWriter[K, V]
13.
14.   /**
        *獲取讀取器匯聚一定范圍的分區(從   startPartition     到 endPartition-1)。在
        *Executors節點,通過reduce 任務調用
15.     */
16.
17.   def getReader[K, C](
18.       handle: ShuffleHandle,
19.       startPartition: Int,
20.       endPartition: Int,
21.       context: TaskContext): ShuffleReader[K, C]
22.
23.   /**
24.     *從ShuffleManager移除一個shuffle的元數據
25.     * @return如果元數據成功刪除,則返回true,否則返回false
26.     */
27.   def unregisterShuffle(shuffleId: Int): Boolean
28.
29.   /**
        * 返回一個能夠根據塊坐標來檢索shuffle 塊數據的解析器
30.     */
31.   def shuffleBlockResolver: ShuffleBlockResolver
32.
33.   /** 關閉ShuffleManager */
34.   def stop(): Unit
35. }

Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具體讀取shuffle數據,是一個trait。在ShuffleBlockResolver中已無getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,這是核心。

ShuffleBlockResolver的源碼如下。

1.     trait ShuffleBlockResolver {
2.    type ShuffleId = Int
3.
4.    /**
        *為指定的塊檢索數據。如果塊數據不可用,則拋出一個未指明的異常
5.      */
6.    def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
7.
8.    def stop(): Unit
9.  }

Spark 2.0版本中通過IndexShuffleBlockResolver來具體實現ShuffleBlockResolver (SortBasedShuffle方式),已無FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver創建和維護邏輯塊和物理文件位置之間的shuffle blocks映射關系。來自于相同map task任務的shuffle blocks數據存儲在單個合并數據文件中;數據文件中的數據塊的偏移量存儲在單獨的索引文件中。將shuffleBlockId + reduce ID set to 0 + ".后綴"作為數據shuffle data的shuffleBlockId名字。其中,文件名后綴為".data"的是數據文件;文件名后綴為".index"的是索引文件。

7.6.2 Shuffle寫數據的交互

基于Sort的Shuffle實現的ShuffleHandle包含BypassMergeSortShuffleHandle與BaseShuffleHandle。兩種ShuffleHandle寫數據的方法可以參考SortShuffleManager類的getWriter方法,關鍵代碼如下所示。

SortShuffleManager的getWriter的源碼如下。

1.     override def getWriter[K, V](
2.  .......
3.      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K
        @unchecked, V @unchecked] =>
4.        new BypassMergeSortShuffleWriter(
5.           env.blockManager,
6.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
7.         ........
8.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
9.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
10.    }
11.  }

在對應構建的兩種數據寫入器類BypassMergeSortShuffleWriter與SortShuffleWriter中,都是通過變量shuffleBlockResolver對邏輯數據塊與物理數據塊的映射進行解析。BypassMergeSortShuffleWriter寫數據的具體實現位于實現的write方法中,其中調用的createTempShuffleBlock方法描述了各個分區所生成的中間臨時文件的格式與對應的BlockId。SortShuffleWriter寫數據的具體實現位于實現的write方法中。

7.6.3 Shuffle讀數據的交互

SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是獲取數據的閱讀器,getReader方法中創建了一個BlockStoreShuffleReader實例。SortShuffleManager. scala的read()方法的源碼如下。

1.  override def getReader[K, C](
2.      handle: ShuffleHandle,
3.      startPartition: Int,
4.      endPartition: Int,
5.      context: TaskContext): ShuffleReader[K, C] = {
6.    new BlockStoreShuffleReader(
7.      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition,
        endPartition, context)
8.  }

BlockStoreShuffleReader實例的read方法,首先實例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一個閱讀器,里面有一個成員blockManager。blockManager是內存和磁盤上數據讀寫的統一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()劃分本地和遠程的blocks,Utils.randomize(remoteRequests)把遠程請求通過隨機的方式添加到隊列中,fetchUpToMaxBytes()發送遠程請求獲取我們的blocks,fetchLocalBlocks()獲取本地的blocks。

7.6.4 BlockManager架構原理、運行流程圖和源碼解密

BlockManager是管理整個Spark運行時數據的讀寫,包含數據存儲本身,在數據存儲的基礎上進行數據讀寫。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相對而言是一個比較大的模塊,Spark中有非常多的模塊:調度模塊、資源管理模塊等。BlockManager是另外一個非常重要的模塊。BlockManager本身的源碼量非常大。本節從BlockManager原理流程對BlockManager做深刻地講解。在Shuffle讀寫數據的時候,我們需要讀寫BlockManager。因此,BlockManager是至關重要的內容。

編寫一個業務代碼WordCount.scala,通過觀察WordCount運行時BlockManager的日志來理解BlockManager的運行。

WordCount.scala的代碼如下。

1.  package com.dt.spark.sparksql
2.
3.  import org.apache.log4j.{Level, Logger}
4.  import org.apache.spark.SparkConf
5.  import org.apache.spark.SparkContext
6.  import org.apache.spark.internal.config
7.  import org.apache.spark.rdd.RDD
8.
9.  /**
10.   * 使用Scala開發本地測試的Spark WordCount程序
11.   *
12.   * @author DT大數據夢工廠
13.   *         新浪微博:http://weibo.com/ilovepains/
14.   */
15. object WordCount {
16.   def main(args: Array[String]) {
17.     Logger.getLogger("org").setLevel(Level.ALL)
18.
19.     /**
20.       *第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
21.       *例如,通過setMaster設置程序要鏈接的Spark集群的Master的URL,如果設置
22.       *為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(如只有
23.       *1GB的內存)的初學者       *
24.       */
25.     val conf = new SparkConf() //創建SparkConf對象
26.     conf.setAppName("Wow,My First Spark App!") //設置應用程序的名稱,在程序
                                                            //運行的監控界面可以看到名稱
27.     conf.setMaster("local") //此時,程序在本地運行,不需要安裝Spark集群
28.     /**
29.       * 第2步:創建SparkContext對象
30.       * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
          * R等都必須有一個SparkContext
31.       * SparkContext    核心作用:初始化  Spark  應用程序運行所需要的核心組件,包括
          * DAGScheduler、TaskScheduler、SchedulerBackend
32.       * 同時還會負責Spark程序往Master注冊程序等
33.       * SparkContext是整個Spark應用程序中最重要的一個對象
34.       */
35.     val sc = new SparkContext(conf)
                                 //創建SparkContext對象,通過傳入SparkConf
                                 //實例來定制Spark運行的具體參數和配置信息
36.     /**
37.       * 第 3  步:根據具體的數據來源(如  HDFS、HBase、Local FS、DB、S3      等)通過
          * SparkContext創建RDD
38.       * RDD的創建基本有三種方式:根據外部的數據來源(如HDFS)、根據Scala集合、由
          * 其他的RDD操作
39.       * 數據會被RDD劃分為一系列的Partitions,分配到每個Partition的數據屬于一
          * 個Task的處理范疇
40.       */
41.     //val lines: RDD[String] = sc.textFile("D://Big_Data_Software spark-
        1.6.0-bin-hadoop2.6README.md", 1) //讀取本地文件并設置為一個Partition
42.     // val lines = sc.textFile("D://Big_Data_Software spark-1.6.0-bin-
        hadoop2.6//README.md", 1) //讀取本地文件并設置為一個Partition
43.
44.     val lines = sc.textFile("data/wordcount/helloSpark.txt", 1)
                                          //讀取本地文件并設置為一個Partition
45.     /**
46.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
47.       * 第4.1步:將每一行的字符串拆分成單個單詞
48.       */
49.
50.     val words = lines.flatMap { line => line.split(" ") }
    //對每一行的字符串進行單詞拆分并把所有行的拆分結果通過flat合并成為一個大的單詞集合
51.
52.     /**
53.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
54.       * 第4.2步:在單詞拆分的基礎上,對每個單詞實例計數為1,也就是word => (word, 1)
55.       */
56.     val pairs = words.map { word => (word, 1) }
57.
58.     /**
59.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
60.       * 第4.3步:在每個單詞實例計數為1基礎上,統計每個單詞在文件中出現的總次數
61.       */
62.     val wordCountsOdered = pairs.reduceByKey(_ + _).map(pair => (pair._2,
        pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
         //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)
63.     wordCountsOdered.collect.foreach(wordNumberPair => println
        (wordNumberPair._1 + " : " + wordNumberPair._2))
64.     while (true) {
65.
66.     }
67.     sc.stop()
68.
69.   }
70. }

在IDEA中運行一個業務程序WordCount.scala,日志中顯示:

 SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中數據的讀寫都和BlockManager關聯。

 SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster進行注冊。

 DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盤存儲的,里面有我們的數據。可以訪問Temp目錄下以blockmgr-開頭的文件的內容。

WordCount運行結果如下。

1.   Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
     properties
2.  17/06/06 05:37:57 INFO SparkContext: Running Spark version 2.1.0
3.  ......
4.  17/06/06 05:38:01 INFO SparkEnv: Registering MapOutputTracker
5.  17/06/06 05:38:01 DEBUG MapOutputTrackerMasterEndpoint: init
6.  17/06/06 05:38:01 INFO SparkEnv: Registering BlockManagerMaster
7.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache
    .spark.storage.DefaultTopologyMapper for getting topology information
8.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint:
    BlockManagerMasterEndpoint up
9.  17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at
    C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828
    872c98804
10. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook
11. 17/06/06 05:38:01 DEBUG ShutdownHookManager: Adding shutdown hook
12. 17/06/06 05:38:01 INFO MemoryStore: MemoryStore started with capacity
    637.2 MB
13. 17/06/06 05:38:02 INFO SparkEnv: Registering OutputCommitCoordinator
14. 17/06/06 05:38:02 DEBUG OutputCommitCoordinator$OutputCommitCoordinator-
    Endpoint: init
15. ........

從Application啟動的角度觀察BlockManager:

(1)Application啟動時會在SparkEnv中注冊BlockManagerMaster以及MapOutputTracker,其中,

a)BlockManagerMaster:對整個集群的Block數據進行管理。

b)MapOutputTrackerMaster:跟蹤所有的Mapper的輸出。

BlockManagerMaster中有一個引用driverEndpoint,isDriver判斷是否運行在Driver上。

BlockManagerMaster的源碼如下。

1.   private[spark]
2.  class BlockManagerMaster(
3.      var driverEndpoint: RpcEndpointRef,
4.      conf: SparkConf,
5.      isDriver: Boolean)
6.    extends Logging {

BlockManagerMaster注冊給SparkEnv,SparkEnv在SparkContext中。

SparkContext.scala的源碼如下。

1.     ......
2.    private var _env: SparkEnv = _
3.  ......
4.    _env = createSparkEnv(_conf, isLocal, listenerBus)
5.      SparkEnv.set(_env)

進入createSparkEnv方法:

1.  private[spark] def createSparkEnv(
2.      conf: SparkConf,
3.      isLocal: Boolean,
4.      listenerBus: LiveListenerBus): SparkEnv = {
5.    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.
      numDriverCores(master))
6.  }

進入SparkEnv.scala的createDriverEnv方法:

1.   private[spark] def createDriverEnv(
2.  ......
3.      create(
4.        conf,
5.        SparkContext.DRIVER_IDENTIFIER,
6.        bindAddress,
7.        advertiseAddress,
8.        port,
9.        isLocal,
10.       numCores,
11.       ioEncryptionKey,
12.       listenerBus = listenerBus,
13.       mockOutputCommitCoordinator = mockOutputCommitCoordinator
14.     )
15.   }
16. ......

SparkEnv.scala的createDriverEnv中調用了create方法,判斷是否是Driver。create方法的源碼如下。

1.    private def create(
2.        conf: SparkConf,
3.        executorId: String,
4.        bindAddress: String,
5.        advertiseAddress: String,
6.        port: Int,
7.        isLocal: Boolean,
8.        numUsableCores: Int,
9.        ioEncryptionKey: Option[Array[Byte]],
10.       listenerBus: LiveListenerBus = null,
11.       mockOutputCommitCoordinator:         Option[OutputCommitCoordinator]  =
          None): SparkEnv = {
12.
13.     val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
14.      ......
15.     if (isDriver) {
16.       conf.set("spark.driver.port", rpcEnv.address.port.toString)
17.     } else if (rpcEnv.address != null) {
18.       conf.set("spark.executor.port", rpcEnv.address.port.toString)
19.       logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port
          .toString}")
20.     }
21.  ......
22.    val mapOutputTracker = if (isDriver) {
23.       new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
24.     } else {
25.       new MapOutputTrackerWorker(conf)
26.     }
27. ......
28. SparkContext.scala
29. private[spark] val DRIVER_IDENTIFIER = "driver"
30. ......

在SparkEnv.scala的createDriverEnv中調用new()函數創建一個MapOutputTrackerMaster。MapOutputTrackerMaster的源碼如下。

1.   private[spark] class MapOutputTrackerMaster(conf: SparkConf,
2.      broadcastManager: BroadcastManager, isLocal: Boolean)
3.    extends MapOutputTracker(conf) {
4.  ......

然后看一下blockManagerMaster。在SparkEnv.scala中調用new()函數創建一個blockManagerMaster。

1.  val blockManagerMaster = new BlockManagerMaster
    (registerOrLookupEndpoint(
2.   BlockManagerMaster.DRIVER_ENDPOINT_NAME,
3.   new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
4.   conf, isDriver)

BlockManagerMaster對整個集群的Block數據進行管理,Block是Spark數據管理的單位,與數據存儲沒有關系,數據可能存在磁盤上,也可能存儲在內存中,還可能存儲在offline,如Alluxio上。源碼如下。

1.  private[spark]
2.  class BlockManagerMaster(
3.      var driverEndpoint: RpcEndpointRef,
4.      conf: SparkConf,
5.      isDriver: Boolean)
6.    extends Logging {
7.  .....

構建BlockManagerMaster的時候調用new()函數創建一個BlockManagerMasterEndpoint,這是循環消息體。

1.      private[spark]
2.  class BlockManagerMasterEndpoint(
3.      override val rpcEnv: RpcEnv,
4.      val isLocal: Boolean,
5.      conf: SparkConf,
6.      listenerBus: LiveListenerBus)
7.    extends ThreadSafeRpcEndpoint with Logging {

(2)BlockManagerMasterEndpoint本身是一個消息體,會負責通過遠程消息通信的方式去管理所有節點的BlockManager。

查看WordCount在IDEA中的運行日志,日志中顯示BlockManagerMasterEndpoint: Registering block manager,向block manager進行注冊。

1.  ......
2.   17/06/06 05:38:02 INFO BlockManager: Using org.apache.spark.storage.
     RandomBlockReplicationPolicy for block replication policy
3.  17/06/06 05:38:02 INFO BlockManagerMaster: Registering BlockManager
    BlockManagerId(driver, 192.168.93.1, 63572, None)
4.  17/06/06 05:38:02 DEBUG DefaultTopologyMapper: Got a request for 192.168.
    93.1
5.  17/06/06 05:38:02 INFO BlockManagerMasterEndpoint: Registering block
    manager 192.168.93.1:63572 with 637.2 MB RAM, BlockManagerId(driver,
    192.168.93.1, 63572, None)
6.  17/06/06 05:38:02 INFO BlockManagerMaster: Registered BlockManager
    BlockManagerId(driver, 192.168.93.1, 63572, None)
7.  17/06/06     05:38:02     INFO    BlockManager:      Initialized     BlockManager:
    BlockManagerId(driver, 192.168.93.1, 63572, None)
8.  .......

(3)每啟動一個ExecutorBackend,都會實例化BlockManager,并通過遠程通信的方式注冊給BlockManagerMaster;實質上是Executor中的BlockManager在啟動的時候注冊給了Driver上的BlockManagerMasterEndpoint。

(4)MemoryStore是BlockManager中專門負責內存數據存儲和讀寫的類。

查看WordCount在IDEA中的運行日志,日志中顯示MemoryStore: Block broadcast_0 stored as values in memory,數據存儲在內存中。

1.   .......
2.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0 stored as values
    in memory (estimated size 208.5 KB, free 637.0 MB)
3.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as
    bytes in memory (estimated size 20.0 KB, free 637.0 MB)
4.  17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in
    memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB)
5.  .......

Spark讀寫數據是以block為單位的,MemoryStore將block數據存儲在內存中。MemoryStore.scala的源碼如下。

1.   private[spark] class MemoryStore(
2.      conf: SparkConf,
3.      blockInfoManager: BlockInfoManager,
4.      serializerManager: SerializerManager,
5.      memoryManager: MemoryManager,
6.      blockEvictionHandler: BlockEvictionHandler)
7.    extends Logging {
8.  ......

(5)DiskStore是BlockManager中專門負責基于磁盤的數據存儲和讀寫的類。

Spark 2.1.1版本的DiskStore.scala的源碼如下。

1.   private[spark] class DiskStore(conf: SparkConf, diskManager:
     DiskBlockManager) extends Logging {
2.  .......

Spark 2.2.0版本的DiskStore.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第1行新增加了securityManager安全管理的成員變量。

1.  .......
2.     securityManager: SecurityManager) extends Logging {

(6)DiskBlockManager:管理Logical Block與Disk上的Physical Block之間的映射關系并負責磁盤文件的創建、讀寫等。

查看WordCount在IDEA中的運行日志,日志中顯示INFO DiskBlockManager: Created local directory。DiskBlockManager負責磁盤文件的管理。

1.   .....
2.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache.
    spark.storage.DefaultTopologyMapper for getting topology information
3.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint:
    BlockManagerMasterEndpoint up
4.  17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at
    C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828
    872c98804
5.  17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook
6.  .......

DiskBlockManager負責管理邏輯級別和物理級別的映射關系,根據BlockID映射一個文件。在目錄spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件進行hash生成。通過createLocalDirs生成本地目錄。DiskBlockManager的源碼如下。

1.   private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
     Boolean) extends Logging {
2.  ......
3.  private def createLocalDirs(conf: SparkConf): Array[File] = {
4.      Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
5.        try {
6.          val localDir = Utils.createDirectory(rootDir, "blockmgr")
7.          logInfo(s"Created local directory at $localDir")
8.          Some(localDir)
9.        } catch {
10.         case e: IOException =>
11.           logError(s"Failed to create local dir in $rootDir. Ignoring this
              directory.", e)
12.           None
13.       }
14.     }
15.   }

從Job運行的角度來觀察BlockManager:

查看WordCount.scala的運行日志:日志中顯示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,將BlockManagerInfo的廣播變量加入到內存中。

1.  ......
2.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as
    bytes in memory (estimated size 20.0 KB, free 637.0 MB)
3.  17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in
    memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB)
4.  ......

Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元數據,BlockManagerInfo的成員變量包括blockManagerId、系統當前時間timeMs、最大堆內內存maxOnHeapMem、最大堆外內存maxOffHeapMem、slaveEndpoint。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。

1.  private[spark] class BlockManagerInfo(
2.     val blockManagerId: BlockManagerId,
3.     timeMs: Long,
4.     val maxMem: Long,
5.     val slaveEndpoint: RpcEndpointRef)
6.   extends Logging {

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第4行刪除maxMem。

 上段代碼中第4行之后新增maxOnHeapMem成員變量:最大的堆內內存大小。

 上段代碼中第4行之后新增maxOffHeapMem成員變量:最大的堆外內存大小。

1.  ......
2.      val maxOnHeapMem: Long,
3.      val maxOffHeapMem: Long,
4.   ......
5.  extends Logging {

集群中每啟動一個節點,就創建一個BlockManager,BlockManager是在每個節點(Driver及Executors)上運行的管理器,用于存放和檢索本地和遠程不同的存儲塊(內存、磁盤和堆外內存)。BlockManagerInfo中的BlockManagerId標明是哪個BlockManager,slaveEndpoint是消息循環體,用于消息通信。

(1)首先通過MemoryStore存儲廣播變量。

(2)在Driver中是通過BlockManagerInfo來管理集群中每個ExecutorBackend中的BlockManager中的元數據信息的。

(3)當改變了具體的ExecutorBackend上的Block信息后,就必須發消息給Driver中的BlockManagerMaster來更新相應的BlockManagerInfo。

(4)當執行第二個Stage的時候,第二個Stage會向Driver中的MapOutputTracker-MasterEndpoint發消息請求上一個Stage中相應的輸出,此時MapOutputTrackerMaster會把上一個Stage的輸出數據的元數據信息發送給當前請求的Stage。圖7-14是BlockManager工作原理和運行機制簡圖:

圖7-14 BlockManager工作原理和運行機制簡圖

BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下。

1.  def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.
    get(blockId))

其中的BlockStatus是一個case class。

1.     case class BlockStatus(storageLevel: StorageLevel, memSize: Long,
       diskSize: Long) {
2.    def isCached: Boolean = memSize + diskSize > 0
3.  }

BlockTransferService.scala進行網絡連接操作,獲取遠程數據。

1.   private[spark]
2.  abstract class BlockTransferService extends ShuffleClient with Closeable
    with Logging {

7.6.5 BlockManager解密進階:BlockManager初始化和注冊解密、BlockManagerMaster工作解密、BlockTransferService解密、本地數據讀寫解密、遠程數據讀寫解密

BlockManager既可以運行在Driver上,也可以運行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave結構,一切的調度、一切的工作由Master觸發,Executor在啟動的時候一定會啟動BlockManager。BlockManager主要提供了讀和寫數據的接口,可以從本地讀寫數據,也可以從遠程讀寫數據。讀寫數據可以基于磁盤,也可以基于內存以及OffHeap。OffHeap就是堆外空間(如Alluxion是分布式內存管理系統,與基于內存計算的Spark系統形成天衣無縫的組合,在大數據領域中,Spark+Alluxion+Kafka是非常有用的組合)。

從整個程序運行的角度看,Driver也是Executor的一種,BlockManager可以運行在Driver上,也可以運行在Executor上。BlockManager.scala的源碼如下。

1.   private[spark] class BlockManager(
2.      executorId: String,
3.      rpcEnv: RpcEnv,
4.      val master: BlockManagerMaster,
5.      val serializerManager: SerializerManager,
6.      val conf: SparkConf,
7.      memoryManager: MemoryManager,
8.      mapOutputTracker: MapOutputTracker,
9.      shuffleManager: ShuffleManager,
10.     val blockTransferService: BlockTransferService,
11.     securityManager: SecurityManager,
12.     numUsableCores: Int)
13.   extends BlockDataManager with BlockEvictionHandler with Logging {
14. ......
15. val diskBlockManager = {
16.     //如果外部服務不為shuffle 文件提供服務執行清理文件
17.     val deleteFilesOnStop =
18.       !externalShuffleServiceEnabled || executorId == SparkContext.
          DRIVER_IDENTIFIER
19.     new DiskBlockManager(conf, deleteFilesOnStop)
20.   }
21. ......
22. private val futureExecutionContext = ExecutionContext.fromExecutorService(
23.     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
24. ......
25.   private[spark] val memoryStore =
26.    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager,
       this)
27.   private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
28.   memoryManager.setMemoryStore(memoryStore)
29. ......
30.   def initialize(appId: String): Unit = {
31. .......

BlockManager中的成員變量中:BlockManagerMaster對整個集群的BlockManagerMaster進行管理;serializerManager是默認的序列化器;MemoryManager是內存管理;MapOutputTracker是Shuffle輸出的時候,要記錄ShuffleMapTask輸出的位置,以供下一個Stage使用,因此需要進行記錄。BlockTransferService是進行網絡操作的,如果要連同另外一個BlockManager進行數據讀寫操作,就需要BlockTransferService。Block是Spark運行時數據的最小抽象單位,可能放入內存中,也可能放入磁盤中,還可能放在Alluxio上。

SecurityManager是安全管理;numUsableCores是可用的Cores。

BlockManager中DiskBlockManager管理磁盤的讀寫,創建并維護磁盤上邏輯塊和物理塊之間的邏輯映射位置。一個block被映射到根據BlockId生成的一個文件,塊文件哈希列在目錄spark.local.dir中(如果設置了SPARK LOCAL DIRS),或在目錄(SPARK LOCAL DIRS)中。

然后在BlockManager中創建一個緩存池:block-manager-future以及memoryStore 、diskStore。

Shuffle讀寫數據的時候是通過BlockManager進行管理的。

Spark 2.1.1版本的BlockManager.scala的源碼如下。

1.    var blockManagerId: BlockManagerId = _
2.
3.   //服務此Executor的shuffle文件的服務器的地址,這或者是外部的服務,或者只是我們
     //自己的Executor的BlockManager
4.   private[spark] var shuffleServerId: BlockManagerId = _
5.
6.   //客戶端讀取其他Executors的shuffle文件。這或者是一個外部服務,或者只是
     //標準BlockTransferService 直接連接到其他Executors
7.
8.   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
9.     val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle",
       numUsableCores)
10.    new ExternalShuffleClient(transConf, securityManager,
       securityManager.isAuthenticationEnabled(),
11.      securityManager.isSaslEncryptionEnabled())
12.  } else {
13.    blockTransferService
14.  }

Spark 2.2.0版本的BlockManager.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第11行ExternalShuffleClient類中去掉securityManager.isSaslEncryptionEnabled()成員變量。

1.   ......
2.      new ExternalShuffleClient(transConf, securityManager,
        securityManager.isAuthenticationEnabled())
3.  ......

BlockManager.scala中,BlockManager實例對象通過調用initialize方法才能正式工作,傳入參數是appId,基于應用程序的ID初始化BlockManager。initialize不是在構造器的時候被使用,因為BlockManager實例化的時候還不知道應用程序的ID,應用程序ID是應用程序啟動時,ExecutorBackend向Master注冊時候獲得的。

BlockManager.scala的initialize方法中的BlockTransferService進行網絡通信。ShuffleClient是BlockManagerWorker每次啟動時向BlockManagerMaster注冊。BlockManager.scala的initialize方法中調用了registerBlockManager,向Master進行注冊,告訴BlockManagerMaster把自己注冊進去。

Spark 2.1.1版本BlockManagerMaster.scala的registerBlockManager的源碼如下。

1.   def registerBlockManager(
2.       blockManagerId: BlockManagerId,
3.       maxMemSize: Long,
4.       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
5.     logInfo(s"Registering BlockManager $blockManagerId")
6.     val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
7.       RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
8.     logInfo(s"Registered BlockManager $updatedId")
9.     updatedId
10.  }

Spark 2.2.0版本的BlockManagerMaster.scala的registerBlockManager的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第3行maxMemSize刪除。

 上段代碼中第3行之后新增參數maxOnHeapMemSize:最大的堆內內存大小。

 上段代碼中第3行之后新增參數maxOffHeapMemSize:最大的堆外內存大小。

 上段代碼中第6行driverEndpoint.askWithRetry方法調整為driverEndpoint.askSync方法。

 上段代碼中第7行RegisterBlockManager新增maxOnHeapMemSize、maxOffHeapMemSize兩個參數。

1.   ......
2.     maxOnHeapMemSize: Long,
3.     maxOffHeapMemSize: Long,
4.     ......
5.   val updatedId = driverEndpoint.askSync[BlockManagerId](
6.     RegisterBlockManager(blockManagerId, maxOnHeapMemSize,
       maxOffHeapMemSize, slaveEndpoint))
7.  ......

registerBlockManager方法的RegisterBlockManager是一個case class。

Spark 2.1.1版本的BlockManagerMessages.scala的源碼如下。

1.   case class RegisterBlockManager(
2.    blockManagerId: BlockManagerId,
3.    maxMemSize: Long,
4.    sender: RpcEndpointRef)
5.  extends ToBlockManagerMaster

Spark 2.2.0版本的BlockManagerMessages.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第3行maxMemSize刪除。

 上段代碼中第3行之后新增成員變量maxOnHeapMemSize:最大堆內內存大小。

 上段代碼中第3行之后新增成員變量maxOffHeapMemSize:最大堆外內存大小。

1.    ......
2.      maxOnHeapMemSize: Long,
3.      maxOffHeapMemSize: Long,
4.  ......

在Executor實例化的時候,要初始化blockManager。blockManager在initialize中將應用程序ID傳進去。

Executor.scala的源碼如下。

1.    if (!isLocal) {
2.    env.metricsSystem.registerSource(executorSource)
3.    env.blockManager.initialize(conf.getAppId)
4.  }

Executor.scala中,Executor每隔10s向Master發送心跳消息,如收不到心跳消息,blockManager須重新注冊。

Spark 2.1.1版本的Executor.scala的源碼如下。

1.  .......
2.  val message = Heartbeat(executorId, accumUpdates.toArray,
    env.blockManager.blockManagerId)
3.      try {
4.        val response = heartbeatReceiverRef.askWithRetry
          [HeartbeatResponse](
5.            message, RpcTimeout(conf, "spark.executor.heartbeatInterval",
              "10s"))
6.        if (response.reregisterBlockManager) {
7.          logInfo("Told to re-register on heartbeat")
8.          env.blockManager.reregister()
9.        }
10.       heartbeatFailures = 0
11.     } catch {
12.       case NonFatal(e) =>
13.         logWarning("Issue communicating with driver in heartbeater", e)
14.         heartbeatFailures += 1
15.         if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
16.           logError(s"Exit as unable to send heartbeats to driver " +
17.             s"more than $HEARTBEAT_MAX_FAILURES times")
18.           System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
19.         }
20.     }
21.   }
22. .......

Spark 2.2.0版本的Executor.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第4行heartbeatReceiverRef.askWithRetry方法調整為heartbeatReceiverRef.askSync方法。

1.  .......
2.      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
3.            message, RpcTimeout(conf, "spark.executor.heartbeatInterval",
              "10s"))

回到BlockManagerMaster.scala的registerBlockManager:

registerBlockManager中RegisterBlockManager傳入的slaveEndpoint是:具體的Executor啟動時會啟動一個BlockManagerSlaveEndpoint,會接收BlockManagerMaster發過來的指令。在initialize方法中通過master.registerBlockManager傳入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中調用new()函數創建的BlockManagerSlaveEndpoint。

總結一下:

(1)當Executor實例化的時候,會通過BlockManager.initialize來實例化Executor上的BlockManager,并且創建BlockManagerSlaveEndpoint這個消息循環體來接受Driver中BlockManagerMaster發過來的指令,如刪除Block等。

1.  env.blockManager.initialize(conf.getAppId)

BlockManagerSlaveEndpoint.scala的源碼如下。

1.  class BlockManagerSlaveEndpoint(
2.    override val rpcEnv: RpcEnv,
3.    blockManager: BlockManager,
4.    mapOutputTracker: MapOutputTracker)
5.  extends ThreadSafeRpcEndpoint with Logging {

(2)當BlockManagerSlaveEndpoint實例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注冊。

BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是構建BlockManagerMaster時傳進去的。

(3)BlockManagerMasterEndpoint接收到Executor上的注冊信息并進行處理。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。

1.  class BlockManagerMasterEndpoint(
2.      override val rpcEnv: RpcEnv,
3.  ......
4.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
5.      case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
6.        context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
7.  ......

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第5行RegisterBlockManager新增成員變量:maxOnHeapMemSize、maxOffHeapMemSize。

 上段代碼中第6行register新增成員變量:maxOnHeapMemSize、maxOffHeapMemSize。

1.  ......
2.  override def receiveAndReply(context: RpcCallContext): PartialFunction
    [Any, Unit] = {
3.      case RegisterBlockManager(blockManagerId, maxOnHeapMemSize,
        maxOffHeapMemSize, slaveEndpoint) =>
4.        context.reply(register(blockManagerId, maxOnHeapMemSize,
          maxOffHeapMemSize, slaveEndpoint))
5.  ......

BlockManagerMasterEndpoint的register注冊方法,為每個Executor的BlockManager生成對應的BlockManagerInfo。BlockManagerInfo是一個HashMap[BlockManagerId, BlockManagerInfo]。

register注冊方法源碼如下。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。

1.  private val blockManagerInfo = new mutable.HashMap[BlockManagerId,
    BlockManagerInfo]
2.  ......
3.   private def register(
4.        idWithoutTopologyInfo: BlockManagerId,
5.        maxMemSize: Long,
6.        slaveEndpoint: RpcEndpointRef): BlockManagerId = {
7.      //dummy id不應包含拓撲信息
8.      //我們在這里得到信息和回應一個塊標識符
9.      val id = BlockManagerId(
10.       idWithoutTopologyInfo.executorId,
11.       idWithoutTopologyInfo.host,
12.       idWithoutTopologyInfo.port,
13.       topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
14.
15.     val time = System.currentTimeMillis()
16.     if (!blockManagerInfo.contains(id)) {
17.       blockManagerIdByExecutor.get(id.executorId) match {
18.         case Some(oldId) =>
19.           //同一個Executor 的塊管理器已經存在,所以刪除它(假定已掛掉)
20.           logError("Got two different block manager registrations on same
              executor - "
21.               + s" will replace old one $oldId with new one $id")
22.           removeExecutor(id.executorId)
23.         case None =>
24.       }
25.       logInfo("Registering block manager %s with %s RAM, %s".format(
26.         id.hostPort, Utils.bytesToString(maxMemSize), id))
27.
28.       blockManagerIdByExecutor(id.executorId) = id
29.
30.       blockManagerInfo(id) = new BlockManagerInfo(
31.         id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
32.     }
33.     listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
34.     id
35.   }......

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第5行刪掉maxMemSize。

 上段代碼中第5行之后Register新增參數maxOnHeapMemSize:最大堆內內存大?。籱axOffHeapMemSize:最大堆外內存大小。

 上段代碼中第26行日志打印時新增最大堆內內存大小、最大堆外內存大小的信息。

 上段代碼中第31行構建BlockManagerInfo實例時傳入maxOnHeapMemSize、maxOffHeapMemSize。

 上段代碼中第33行listenerBus監控系統增加對最大堆內內存大小、最大堆外內存大小信息的監控。

1.  .......
2.       maxOnHeapMemSize: Long,
3.        maxOffHeapMemSize: Long,
4.  ......
5.          id.hostPort, Utils.bytesToString(maxOnHeapMemSize +
            maxOffHeapMemSize), id))
6.  .......
7.
8.        blockManagerInfo(id) = new BlockManagerInfo(
9.          id, System.currentTimeMillis(), maxOnHeapMemSize,
            maxOffHeapMemSize, slaveEndpoint)
10. .......
11. listenerBus.post(SparkListenerBlockManagerAdded(time, id,
    maxOnHeapMemSize + maxOffHeapMemSize,
12.         Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
13. .......

BlockManagerMasterEndpoint中,BlockManagerId是一個class,標明了BlockManager在哪個Executor中,以及host主機名、port端口等信息。

BlockManagerId.scala的源碼如下。

1.  class BlockManagerId private (
2.     private var executorId_ : String,
3.     private var host_ : String,
4.     private var port_ : Int,
5.     private var topologyInfo_ : Option[String])
6.   extends Externalizable {

BlockManagerMasterEndpoint中,BlockManagerInfo包含內存、slaveEndpoint等信息。

回到BlockManagerMasterEndpoint的register注冊方法:如果blockManagerInfo沒有包含BlockManagerId,根據BlockManagerId.executorId查詢BlockManagerId,如果匹配到舊的BlockManagerId,就進行清理。

BlockManagerMasterEndpoint的removeExecutor方法如下。

1.    private def removeExecutor(execId: String) {
2.    logInfo("Trying to remove executor " + execId + " from
      BlockManagerMaster.")
3.    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
4.  }

進入removeBlockManager方法,從blockManagerIdByExecutor數據結構中清理掉block manager信息,從blockManagerInfo數據結構中清理掉所有的blocks信息。removeBlockManager源碼如下。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源碼如下。

1.   private def removeBlockManager(blockManagerId: BlockManagerId) {
2.      val info = blockManagerInfo(blockManagerId)
3.
4.      //從blockManagerIdByExecutor刪除塊管理
5.      blockManagerIdByExecutor -= blockManagerId.executorId
6.
7.      //將它從blockManagerInfo 刪除所有的塊
8.      blockManagerInfo.remove(blockManagerId)
9.      val iterator = info.blocks.keySet.iterator
10.     while (iterator.hasNext) {
11.       val blockId = iterator.next
12.       val locations = blockLocations.get(blockId)
13.      locations -= blockManagerId
14.      if (locations.size == 0) {
15.        blockLocations.remove(blockId)
16.      }
17.    }
18.    listenerBus.post(SparkListenerBlockManagerRemoved(System.
       currentTimeMillis(), blockManagerId))
19.    logInfo(s"Removing block manager $blockManagerId")
20.  }

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第16~20行整體替換為以下代碼:新增數據復制處理。

1.   .......
2.         //如果沒有塊管理器,就注銷這個塊。否則,如果主動復制啟用,塊block是一個RDD
           //或測試塊 block(后者用于單元測試),我們發送一條消息隨機選擇Executor的位
           //置來復制給定塊block。注意,我們忽略了其他塊block類型(如廣播broadcast/
           //shuffle blocks),因為復制在這種情況下沒有多大意義
3.     ......
4.          logWarning(s"No more replicas available for $blockId !")
5.        } else if (proactivelyReplicate && (blockId.isRDD || blockId.
          isInstanceOf[TestBlockId])) {
6.
7.          //假設Executor未能找出故障前存在的副本數量
8.          val maxReplicas = locations.size + 1
9.          val i = (new Random(blockId.hashCode)).nextInt(locations.size)
10.         val blockLocations = locations.toSeq
11.         val candidateBMId = blockLocations(i)
12.         blockManagerInfo.get(candidateBMId).foreach { bm =>
13.           val remainingLocations = locations.toSeq.filter(bm => bm !=
              candidateBMId)
14.           val replicateMsg = ReplicateBlock(blockId, remainingLocations,
              maxReplicas)
15.           bm.slaveEndpoint.ask[Boolean](replicateMsg)
16.         }
17.       }
18.     }
19. .........

removeBlockManager中的一行代碼blockLocations.remove的remove方法如下。

HashMap.java的源碼如下。

1.  public V remove(Object key) {
2.      Node<K,V> e;
3.      return (e = removeNode(hash(key), key, null, false, true)) == null ?
4.          null : e.value;
5.  }

回到BlockManagerMasterEndpoint的register注冊方法:然后在blockManagerIdByExecutor中加入BlockManagerId,將BlockManagerId加入BlockManagerInfo信息,在listenerBus中進行監聽,函數返回BlockManagerId,完成注冊。

回到BlockManager.scala,在initialize方法通過master.registerBlockManager注冊成功以后,將返回值賦值給idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。

reportAllBlocks方法:具體的Executor須向Driver不斷地匯報自己的狀態。

BlockManager.scala的reportAllBlocks方法的源碼如下。

1.       private def reportAllBlocks(): Unit = {
2.     logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
3.     for ((blockId, info) <- blockInfoManager.entries) {
4.       val status = getCurrentBlockStatus(blockId, info)
5.       if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
6.         logError(s"Failed to report $blockId to master; giving up.")
7.         return
8.       }
9.     }
10.  }

reportAllBlocks方法中調用了getCurrentBlockStatus,包括內存、磁盤等信息。

getCurrentBlockStatus的源碼如下。

1.   private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo):
     BlockStatus = {
2.      info.synchronized {
3.        info.level match {
4.          case null =>
5.            BlockStatus.empty
6.          case level =>
7.            val inMem = level.useMemory && memoryStore.contains(blockId)
8.            val onDisk = level.useDisk && diskStore.contains(blockId)
9.            val deserialized = if (inMem) level.deserialized else false
10.           val replication = if (inMem || onDisk) level.replication else 1
11.           val storageLevel = StorageLevel(
12.             useDisk = onDisk,
13.             useMemory = inMem,
14.             useOffHeap = level.useOffHeap,
15.             deserialized = deserialized,
16.             replication = replication)
17.           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
18.           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
19.           BlockStatus(storageLevel, memSize, diskSize)
20.       }
21.     }
22.   }

getCurrentBlockStatus方法中的BlockStatus,包含存儲級別StorageLevel、內存大小、磁盤大小等信息。

BlockManagerMasterEndpoint.scala的BlockStatus的源碼如下。

1.  case   class    BlockStatus(storageLevel:        StorageLevel,     memSize:  Long,
    diskSize: Long) {
2.    def isCached: Boolean = memSize + diskSize > 0
3.  }
4.  ......
5.    object BlockStatus {
6.    def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L,
      diskSize = 0L)
7.  }

回到BlockManager.scala,其中的getLocationBlockIds方法比較重要,根據BlockId獲取這個BlockId所在的BlockManager。

BlockManager.scala的getLocationBlockIds的源碼如下。

1.   private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq
     [BlockManagerId]] = {
2.    val startTimeMs = System.currentTimeMillis
3.    val locations = master.getLocations(blockIds).toArray
4.    logDebug("Got multiple block location in %s".format
      (Utils.getUsedTimeMs(startTimeMs)))
5.    locations
6.  }

getLocationBlockIds方法中根據BlockId通過master.getLocations向Master獲取位置信息,因為master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint發送GetLocationsMultipleBlockIds消息。

Spark 2.1.1版本的BlockManagerMaster.scala的getLocations方法的源碼如下。

1.  def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq
    [BlockManagerId]] = {
2.    driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
3.      GetLocationsMultipleBlockIds(blockIds))
4.  }

Spark 2.2.0版本的BlockManagerMaster.scala的getLocations方法的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第2行driverEndpoint.askWithRetry方法調整為driverEndpoint. askSync方法。

1.  ......
2.      driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
3.  .......

getLocations中的GetLocationsMultipleBlockIds是一個case class。

1.  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId])
   extends ToBlockManagerMaster

在BlockManagerMasterEndpoint側接收GetLocationsMultipleBlockIds消息。

BlockManagerMasterEndpoint.scala的receiveAndReply方法如下。

1.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
2.  ......
3.   case GetLocationsMultipleBlockIds(blockIds) =>
4.        context.reply(getLocationsMultipleBlockIds(blockIds))

進入getLocationsMultipleBlockIds方法,進行map操作,開始查詢位置信息。

1.   private def getLocationsMultipleBlockIds(
2.      blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
3.    blockIds.map(blockId => getLocations(blockId))
4.  }

進入getLocations方法,首先判斷內存緩存結構blockLocations中是否包含blockId,如果已包含,就獲取位置信息,否則返回空的信息。

1.  private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
2.    if (blockLocations.containsKey(blockId)) blockLocations.get
      (blockId).toSeq else Seq.empty
3.  }

其中,blockLocations是一個重要的數據結構,是一個JHashMap。Key是BlockId。Value是一個HashSet[BlockManagerId],使用HashSet。因為每個BlockId在磁盤上有副本,不同機器的位置不一樣,而且不同副本對應的BlockManagerId不一樣,位于不同的機器上,所以使用HashSet數據結構。

BlockManagerMasterEndpoint.scala的blockLocations的源碼如下。

1.  private val blockLocations = new JHashMap[BlockId, mutable.HashSet
    [BlockManagerId]]

回到BlockManager.scala,getLocalValues是一個重要的方法,從blockInfoManager中獲取本地數據。

 首先根據blockId從blockInfoManager中獲取BlockInfo信息。

 從BlockInfo信息獲取level級別,根據level.useMemory && memoryStore.contains (blockId)判斷是否在內存中,如果在內存中,就從memoryStore中獲取數據。

 根據level.useDisk && diskStore.contains(blockId)判斷是否在磁盤中,如果在磁盤中,就從diskStore中獲取數據。

Spark 2.1.1版本的BlockManager.scala的getLocalValues方法的源碼如下。

1.   def getLocalValues(blockId: BlockId): Option[BlockResult] = {
2.      logDebug(s"Getting local block $blockId")
3.      blockInfoManager.lockForReading(blockId) match {
4.        case None =>
5.          logDebug(s"Block $blockId was not found")
6.          None
7.        case Some(info) =>
8.          val level = info.level
9.          logDebug(s"Level for block $blockId is $level")
10.         if (level.useMemory && memoryStore.contains(blockId)) {
11.           val iter: Iterator[Any] = if (level.deserialized) {
12.             memoryStore.getValues(blockId).get
13.           } else {
14.             serializerManager.dataDeserializeStream(
15.              blockId, memoryStore.getBytes(blockId).get.toInputStream())
                 (info.classTag)
16.           }
17.           val ci = CompletionIterator[Any, Iterator[Any]](iter,
              releaseLock(blockId))
18.           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
19.         } else if (level.useDisk && diskStore.contains(blockId)) {
20.           val iterToReturn: Iterator[Any] = {
21.             val diskBytes = diskStore.getBytes(blockId)
22.             if (level.deserialized) {
23.               val diskValues = serializerManager.dataDeserializeStream(
24.                 blockId,
25.                 diskBytes.toInputStream(dispose = true))(info.classTag)
26.               maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
27.             } else {
28.              val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
                 diskBytes)
29.                 .map {_.toInputStream(dispose = false)}
30.                 .getOrElse { diskBytes.toInputStream(dispose = true) }
31.               serializerManager.dataDeserializeStream(blockId, stream)
                (info.classTag)
32.           }
33.         }
34.         val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn,
            releaseLock(blockId))
35.         Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
36.       } else {
37.         handleLocalReadFailure(blockId)
38.       }
39.    }
40.  }

Spark 2.2.0版本的BlockManager.scala的getLocalValues方法的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第9行之后新增taskAttemptId的創建。

 上段代碼中第17行releaseLock新增一個參數taskAttemptId。

 上段代碼中第21、25、28、30行diskBytes更新為diskData。

 上段代碼中第21行之后新增val iterToReturn: Iterator[Any]。

 上段代碼中第25行diskData.toInputStream方法刪掉dispose = true參數。

 上段代碼中第34行CompletionIterator的第二個參數調整為releaseLockAndDispose (blockId, diskData, taskAttemptId)。

1.   .......
2.         val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
3.       ......
4.           //在迭代器iterator完成觸發時,我們需要從一個沒有TaskContext上下文的
             //線程捕獲taskId,參閱spark-18406討論
5.           val ci = CompletionIterator[Any, Iterator[Any]](iter, {
6.             releaseLock(blockId, taskAttemptId)
7.        ........
8.           val diskData = diskStore.getBytes(blockId)
9.           val iterToReturn: Iterator[Any] = {
10.          ........
11.            diskData.toInputStream())(info.classTag)
12.         ........
13.             val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
                diskData)
14.           ........
15.              .getOrElse { diskData.toInputStream() }
16.          ........
17.            releaseLockAndDispose(blockId, diskData, taskAttemptId)
18.    ........

回到BlockManager.scala,getRemoteValues方法從遠程的BlockManager中獲取block數據,在JVM中不需要去獲取鎖。

BlockManager.scala的getRemoteValues方法的源碼如下。

1.  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option
    [BlockResult] = {
2.    val ct = implicitly[ClassTag[T]]
3.    getRemoteBytes(blockId).map { data =>
4.      val values =
5.        serializerManager.dataDeserializeStream(blockId, data.toInputStream
          (dispose = true))(ct)
6.      new BlockResult(values, DataReadMethod.Network, data.size)
7.    }
8.  }

getRemoteValues方法中調用getRemoteBytes,獲取遠程的數據,如果獲取的失敗次數超過最大的獲取次數(locations.size),就提示失敗,返回空值;如果獲取到遠程數據,就返回。

getRemoteBytes方法調用blockTransferService.fetchBlockSync方法實現遠程獲取數據。

BlockTransferService.scala的fetchBlockSync方法的源碼如下。

Spark 2.1.1版本的BlockTransferService.scala的fetchBlockSync方法的源碼如下。

1.   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
     String): ManagedBuffer = {
2.      //線程等待的監視器
3.      val result = Promise[ManagedBuffer]()
4.      fetchBlocks(host, port, execId, Array(blockId),
5.        new BlockFetchingListener {
6.          override def onBlockFetchFailure(blockId:             String,  exception:
            Throwable): Unit = {
7.            result.failure(exception)
8.          }
9.          override def onBlockFetchSuccess(blockId: String, data:
            ManagedBuffer): Unit = {
10.           val ret = ByteBuffer.allocate(data.size.toInt)
11.           ret.put(data.nioByteBuffer())
12.           ret.flip()
13.           result.success(new NioManagedBuffer(ret))
14.         }
15.       })
16.     ThreadUtils.awaitResult(result.future, Duration.Inf)
17.   }

Spark 2.2.0版本的BlockTransferService.scala的fetchBlockSync方法的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第15行fetchBlocks方法新增了shuffleFiles = null參數。fetchBlocks方法用于異步從遠程節點獲取序列塊,僅在調用[init]之后可用。注意,這個API需要一個序列,可以實現批處理請求,而不是返回一個future,底層實現可以調用onBlockFetchSuccess盡快獲取塊的數據,而不是等待所有塊被取出來。

1.    ......
2.        }, shuffleFiles = null)
3.  .......

fetchBlockSync中調用fetchBlocks方法,NettyBlockTransferService繼承自BlockTransferService,是BlockTransferService實現子類。

Spark 2.1.1版本的NettyBlockTransferService的fetchBlocks的源碼如下。

1.   override def fetchBlocks(
2.        host: String,
3.        port: Int,
4.        execId: String,
5.        blockIds: Array[String],
6.        listener: BlockFetchingListener): Unit = {
7.      logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
8.      try {
9.        val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
10.         override def createAndStart(blockIds: Array[String], listener:
            BlockFetchingListener) {
11.          val client = clientFactory.createClient(host, port)
12.          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray,
             listener).start()
13.        }
14.      }
15.
16.      val maxRetries = transportConf.maxIORetries()
17.      if (maxRetries > 0) {
18.        //注意,Fetcher將正確處理maxRetries等于0的情況;避免它在代碼中產生Bug,
           //一旦確定了穩定性,就應該刪除if語句
19.        new RetryingBlockFetcher(transportConf, blockFetchStarter,
           blockIds, listener).start()
20.      } else {
21.        blockFetchStarter.createAndStart(blockIds, listener)
22.      }
23.    } catch {
24.      case e: Exception =>
25.        logError("Exception while beginning fetchBlocks", e)
26.        blockIds.foreach(listener.onBlockFetchFailure(_, e))
27.    }
28.  }

Spark 2.2.0版本的NettyBlockTransferService的fetchBlocks的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第6行fetchBlocks方法新增了shuffleFiles參數。

1.   .......
2.        shuffleFiles: Array[File]): Unit = {
3.  .......

回到BlockManager.scala,無論是doPutBytes(),還是doPutIterator()方法中,都會使用doPut方法。

BlockManager.scala的doPut方法的源碼如下。

1.   private def doPut[T](
2.       blockId: BlockId,
3.       level: StorageLevel,
4.       classTag: ClassTag[_],
5.       tellMaster: Boolean,
6.       keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T]
         = {
7.     require(blockId != null, "BlockId is null")
8.     require(level != null && level.isValid, "StorageLevel is null or
       invalid")
9.
10.    val putBlockInfo = {
11.      val newInfo = new BlockInfo(level, classTag, tellMaster)
12.      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
13.        newInfo
14.      } else {
15.        logWarning(s"Block $blockId already exists on this machine; not
           re-adding it")
16.        if (!keepReadLock) {
17.    //在現有的塊上lockNewBlockForWriting 返回一個讀鎖,所以我們必須釋放它
18.          releaseLock(blockId)
19.        }
20.        return None
21.      }
22.    }
23.
24.    val startTimeMs = System.currentTimeMillis
25.    var exceptionWasThrown: Boolean = true
26.    val result: Option[T] = try {
27.      val res = putBody(putBlockInfo)
28.      exceptionWasThrown = false
29.     ......
30.     result
31.  }

doPut方法中,lockNewBlockForWriting寫入一個新的塊前先嘗試獲得適當的鎖,如果我們是第一個寫塊,獲得寫入鎖后繼續后續操作。否則,如果另一個線程已經寫入塊,須等待寫入完成,才能獲取讀取鎖,調用new()函數創建一個BlockInfo賦值給putBlockInfo,然后通過putBody(putBlockInfo)將數據存入。putBody是一個匿名函數,輸入BlockInfo,輸出的是一個泛型Option[T]。putBody函數體內容是doPutIterator方法(doPutBytes方法也類似調用doPut)調用doPut時傳入的。

BlockManager.scala的doPutIterator調用doput方法,在其putBody匿名函數體中進行判斷:

如果是level.useMemory,則在memoryStore中放入數據。

如果是level.useDisk,則在diskStore中放入數據。

如果level.replication大于1,則在其他節點中存入副本數據。

其中,BlockManager.scala的replicate方法的副本復制源碼如下。

Spark 2.1.1版本的BlockManager.scala的replicate方法的源碼如下。

1.   private def replicate(
2.        blockId: BlockId,
3.        data: ChunkedByteBuffer,
4.        level: StorageLevel,
5.        classTag: ClassTag[_]): Unit = {
6.  ......
7.  while(numFailures <= maxReplicationFailures &&
8.          !peersForReplication.isEmpty &&
9.          peersReplicatedTo.size != numPeersToReplicateTo) {
10.       val peer = peersForReplication.head
11.       try {
12.         val onePeerStartTime = System.nanoTime
13.         logTrace(s"Trying to replicate $blockId of ${data.size} bytes to
            $peer")
14.         blockTransferService.uploadBlockSync(
15.           peer.host,
16.           peer.port,
17.           peer.executorId,
18.           blockId,
19.           new NettyManagedBuffer(data.toNetty),
20.           tLevel,
21.           classTag)
22. ......

Spark 2.2.0版本的BlockManager.scala的replicate方法的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第5行replicate方法中新增了existingReplicas參數。

 上段代碼中第19行uploadBlockSync方法的第5個參數由NettyManagedBuffer實例調整為BlockManagerManagedBuffer實例。

1.      .......
2.        existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
3.  ......
4.       new    BlockManagerManagedBuffer(blockInfoManager,            blockId,  data,
         false),
5.   ......

replicate方法中調用了blockTransferService.uploadBlockSync方法。

BlockTransferService.scala的uploadBlockSync的源碼如下。

1.   def uploadBlockSync(
2.        hostname: String,
3.        port: Int,
4.        execId: String,
5.        blockId: BlockId,
6.        blockData: ManagedBuffer,
7.        level: StorageLevel,
8.        classTag: ClassTag[_]): Unit = {
9.      val future = uploadBlock(hostname, port, execId, blockId, blockData,
        level, classTag)
10.     ThreadUtils.awaitResult(future, Duration.Inf)
11.   }
12. }

uploadBlockSync中又調用uploadBlock方法,BlockTransferService.scala的uploadBlock方法無具體實現,NettyBlockTransferService是BlockTransferService的子類,具體實現uploadBlock方法。

NettyBlockTransferService的uploadBlock的源碼如下。

1.   override def uploadBlock(
2.        hostname: String,
3.        port: Int,
4.        execId: String,
5.        blockId: BlockId,
6.        blockData: ManagedBuffer,
7.        level: StorageLevel,
8.        classTag: ClassTag[_]): Future[Unit] = {
9.      val result = Promise[Unit]()
10.     val client = clientFactory.createClient(hostname, port)
11.
12.     //使用JavaSerializer序列號器將StorageLevel和ClassTag序列化。其他一切都
        //用我們的二進制協議編碼
13.     val metadata = JavaUtils.bufferToArray(serializer.newInstance().
        serialize((level, classTag)))
14.
15.     //為了序列化,轉換或復制NIO緩沖到數組
16.     val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
17.
18.     client.sendRpc(new       UploadBlock(appId,      execId,     blockId.toString,
        metadata, array).toByteBuffer,
19.       new RpcResponseCallback {
20.         override def onSuccess(response: ByteBuffer): Unit = {
21.           logTrace(s"Successfully uploaded block $blockId")
22.           result.success((): Unit)
23.         }
24.        override def onFailure(e: Throwable): Unit = {
25.          logError(s"Error while uploading block $blockId", e)
26.          result.failure(e)
27.        }
28.      })
29.
30.    result.future
31.  }

回到BlockManager.scala,看一下dropFromMemory方法。如果存儲級別定位為MEMORY_AND_DISK,那么數據可能放在內存和磁盤中,內存夠的情況下不會放到磁盤上;如果內存不夠,就放到磁盤上,這時就會調用dropFromMemory。如果存儲級別不是定義為MEMORY_AND_DISK,而只是存儲在內存中,內存不夠時,緩存的數據此時就會丟棄。如果仍需要數據,那就要重新計算。

Spark 2.1.1版本的BlockManager.scala的dropFromMemory的源碼如下。

1.   private[storage] override def dropFromMemory[T: ClassTag](
2.        blockId: BlockId,
3.        data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
4.      logInfo(s"Dropping block $blockId from memory")
5.      val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
6.      var blockIsUpdated = false
7.      val level = info.level
8.
9.      //如果存儲級別要求,則保存到磁盤
10.     if (level.useDisk && !diskStore.contains(blockId)) {
11.       logInfo(s"Writing block $blockId to disk")
12.       data() match {
13.         case Left(elements) =>
14.           diskStore.put(blockId) { fileOutputStream =>
15.             serializerManager.dataSerializeStream(
16.               blockId,
17.               fileOutputStream,
18.               elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
19.           }
20.         case Right(bytes) =>
21.           diskStore.putBytes(blockId, bytes)
22.       }
23.       blockIsUpdated = true
24.     }
25.
26.     //實際由內存存儲
27.     val droppedMemorySize =
28.       if   (memoryStore.contains(blockId))         memoryStore.getSize(blockId)
          else 0L
29.     val blockIsRemoved = memoryStore.remove(blockId)
30.     if (blockIsRemoved) {
31.       blockIsUpdated = true
32.     } else {
33.       logWarning(s"Block $blockId could not be dropped from memory as it
          does not exist")
34.     }
35.
36.     val status = getCurrentBlockStatus(blockId, info)
37.     if (info.tellMaster) {
38.       reportBlockStatus(blockId, status, droppedMemorySize)
39.     }
40.    if (blockIsUpdated) {
41.      addUpdatedBlockStatusToTaskMetrics(blockId, status)
42.    }
43.    status.storageLevel
44.  }

Spark 2.2.0版本的BlockManager.scala的dropFromMemory的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第14行fileOutputStream名稱調整為channel。

 上段代碼中第14行之后新增代碼:val out = Channels.newOutputStream(channel)。

 上段代碼中第17行fileOutputStream調整為out。

1.    ........
2.   diskStore.put(blockId) { channel =>
3.             val out = Channels.newOutputStream(channel)
4.      ........
5.               out,
6.  ........

總結:dropFromMemory是指在內存不夠的時候,嘗試釋放一部分內存給要使用內存的應用,釋放的這部分內存數據需考慮是丟棄,還是放到磁盤上。如果丟棄,如5000個步驟作為一個Stage,前面4000個步驟進行了Cache,Cache時可能有100萬個partition分區單位,其中丟棄了100個,丟棄的100個數據就要重新計算;但是,如果設置了同時放到內存和磁盤,此時會放入磁盤中,下次如果需要,就可以從磁盤中讀取數據,而不是重新計算。

主站蜘蛛池模板: 岳西县| 遂昌县| 若羌县| 固原市| 肥乡县| 九寨沟县| 子长县| 镇雄县| 临漳县| 库车县| 平江县| 青州市| 武强县| 肃宁县| 九寨沟县| 沙坪坝区| 铜山县| 遵化市| 景德镇市| 宜宾市| 铜陵市| 页游| 安泽县| 金昌市| 长寿区| 嵊泗县| 越西县| 玉门市| 兰溪市| 大埔区| 石门县| 始兴县| 都昌县| 多伦县| 玉龙| 诸城市| 玉山县| 延安市| 阳山县| 百色市| 新兴县|