- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 11925字
- 2019-12-12 17:30:02
7.6 Shuffle與Storage模塊間的交互
在Spark中,存儲模塊被抽象成Storage。顧名思義,Storage是存儲的意思,代表著Spark中的數據存儲系統,負責管理和實現數據塊(Block)的存放。其中存取數據的最小單元是Block,數據由不同的Block組成,所有操作都是以Block為單位進行的。從本質上講,RDD中的Partition和Storage中的Block是等價的,只是所處的模塊不同,看待的角度不一樣而已。
Storage抽象模塊的實現分為兩個層次,如圖7-13所示。
(1)通信層:通信層是典型的Master-Slave結構,Master和Slave之間傳輸控制和狀態信息。通信層主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等類實現。
(2)存儲層:負責把數據存儲到內存、磁盤或者堆外內存中,有時還需要為數據在遠程節點上生成副本,這些都由存儲層提供的接口實現。Spark 2.2.0具體的存儲層的實現類有DiskStore和MemoryStore。

圖7-13 Storage存儲模塊
Shuffle模塊若要和Storage模塊進行交互,需要通過調用統一的操作類BlockManager來完成。如果把整個存儲模塊看成一個黑盒,BlockManager就是黑盒上留出的一個供外部調用的接口。
7.6.1 Shuffle注冊的交互
Spark中BlockManager在Driver端的創建,在SparkContext創建的時候會根據具體的配置創建SparkEnv對象,源碼如下所示。
SparkContext.scala的源碼如下。
1. _env = createSparkEnv(_conf, isLocal, listenerBus) 2. SparkEnv.set(_env) 3. ....... 4. private[spark] def createSparkEnv( 5. conf: SparkConf, 6. isLocal: Boolean, 7. listenerBus: LiveListenerBus): SparkEnv = { 8. //創建Driver端的運行環境 9. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext .numDriverCores(master)) 10. }
createSparkEnv方法中,傳入SparkConf配置對象、isLocal標志,以及LiveListenerBus,方法中使用SparkEnv對象的createDriverEnv方法創建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,將會創建BlockManager、BlockManagerMaster等對象,完成Storage在Driver端的部署。
SparkEnv中創建BlockManager、BlockManagerMaster關鍵源碼如下所示。
SparkEnv.scala的源碼如下。
1. val blockTransferService = 2. new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores) 3. 4. //創建BlockManagerMasterEndpoint 5. val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( 6. BlockManagerMaster.DRIVER_ENDPOINT_NAME, 7. //創建BlockManagerMasterEndpoint 8. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), 9. conf, isDriver) 10. //創建BlockManager 11. //注:blockManager無效,直到initialize()被調用 12. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, 13. serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, 14. blockTransferService, securityManager, numUsableCores)
使用new關鍵字實例化出BlockManagerMaster,傳入BlockManager的構造函數,實例化出BlockManager對象。這里的BlockManagerMaster和BlockManager屬于聚合關系。BlockManager主要對外提供統一的訪問接口,BlockManagerMaster主要對內提供各節點之間的指令通信服務。
構建BlockManager時,傳入shuffleManager參數,shuffleManager是在SparkEnv中創建的,將shuffleManager傳入到BlockManager中,BlockManager就擁有shuffleManager的成員變量,從而可以調用shuffleManager的相關方法。
BlockManagerMaster在Driver端和Executors中的創建稍有差別。首先來看在Driver端創建的情形。創建BlockManagerMaster傳入的isDriver參數,isDriver為true,表示在Driver端創建,否則視為在Slave節點上創建。
當SparkContext中執行_env.blockManager.initialize(_applicationId)代碼時,會調用Driver端BlockManager的initialize方法。Initialize方法的源碼如下所示。
SparkContext.scala的源碼如下。
1. _env.blockManager.initialize(_applicationId)
Spark 2.1.1版本的BlockManager.scala的源碼如下。
1. def initialize(appId: String): Unit = { 2. //調用blockTransferService的init方法,blockTransferService用于在不同節點 //fetch數據、傳送數據 3. blockTransferService.init(this) 4. //shuffleClient用于讀取其他Executor上的shuffle files 5. shuffleClient.init(appId) 6. 7. blockReplicationPolicy = { 8. val priorityClass = conf.get( 9. "spark.storage.replication.policy", classOf [RandomBlockReplicationPolicy].getName) 10. val clazz = Utils.classForName(priorityClass) 11. val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] 12. logInfo(s"Using $priorityClass for block replication policy") 13. ret 14. } 15. 16. val id = 17. BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) 18. 19. //向blockManagerMaster注冊BlockManager。在registerBlockManager方法中傳 //入了 slaveEndpoint,slaveEndpoint 為 BlockManager 中的 RPC 對象,用于和 //blockManagerMaster通信 20. val idFromMaster = master.registerBlockManager( 21. id, 22. maxMemory, 23. slaveEndpoint) 24. //得到blockManagerId 25. blockManagerId = if (idFromMaster != null) idFromMaster else id 26. 27. //得到shuffleServerId 28. shuffleServerId = if (externalShuffleServiceEnabled) { 29. logInfo(s"external shuffle service port = $externalShuffleServicePort") 30. BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) 31. } else { 32. blockManagerId 33. } 34. //注冊shuffleServer 35. //如果存在,將注冊Executors配置與本地shuffle服務 36. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { 37. registerWithExternalShuffleServer() 38. } 39. 40. logInfo(s"Initialized BlockManager: $blockManagerId") 41. }
Spark 2.2.0版本的BlockManager.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第22行刪除maxMemory。
上段代碼中第22行之后新增參數maxOnHeapMemory:最大的堆內存大小。
上段代碼中第22行之后新增參數maxOffHeapMemory:最大的堆外存大小。
1. ..... 2. maxOnHeapMemory, 3. maxOffHeapMemory, 4. ...
如上面的源碼所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。
(1)初始化BlockTransferService。
(2)初始化ShuffleClient。
(3)創建BlockManagerId。
(4)將BlockManager注冊到BlockManagerMaster上。
(5)若ShuffleService可用,則注冊ShuffleService。
在BlockManager的initialize方法上右擊Find Usages,可以看到initialize方法在兩個地方得到調用:一個是SparkContext;另一個是Executor。啟動Executor時,會調用BlockManager的initialize方法。Executor中調用initialize方法的源碼如下所示。
Executor.scala的源碼如下。
1. //CoarseGrainedExecutorBackend中實例化Executor,isLocal設置成false,即 //Executor中isLocal始終為false 2. 3. if (!isLocal) { 4. //向度量系統注冊 5. //env.metricsSystem.registerSource(executorSource) 6. //調用BlockManager的initialize方法,initialize方法將向BlockManagerMaster //注冊,完成Executor中的BlockManager向Driver中的BlockManager注冊 7. env.blockManager.initialize(conf.getAppId) 8. }
上面代碼中調用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注冊。使用方法master.registerBlockManager (id,maxMemory,slaveEndpoint)完成注冊,registerBlockManager方法中傳入Id、maxMemory、salveEndPoint引用,分別表示Executor中的BlockManager、最大內存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一個RPC端點,使用它完成同BlockManagerMaster的通信。BlockManager收到注冊請求后將Executor中注冊的BlockManagerInfo存入哈希表中,以便通過BlockManagerSlaveEndpoint向Executor發送控制命令。
ShuffleManager是一個用于shuffle系統的可插拔接口。在Driver端SparkEnv中創建ShuffleManager,在每個Executor上也會創建?;趕park.shuffle.manager進行設置。Driver使用ShuffleManager注冊到shuffles系統,Executors(或Driver在本地運行的任務)可以請求讀取和寫入數據。這將被SparkEnv的SparkConf和isDriver布爾值作為參數。
ShuffleManager.scala的源碼如下。
1. private[spark] trait ShuffleManager { 2. 3. /** 4. *注冊一個shuffle管理器,獲取一個句柄傳遞給任務 5. */ 6. def registerShuffle[K, V, C]( 7. shuffleId: Int, 8. numMaps: Int, 9. dependency: ShuffleDependency[K, V, C]): ShuffleHandle 10. 11. /**為給定分區獲取一個寫入器。Executors節點通過Map任務調用*/ 12. def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] 13. 14. /** *獲取讀取器匯聚一定范圍的分區(從 startPartition 到 endPartition-1)。在 *Executors節點,通過reduce 任務調用 15. */ 16. 17. def getReader[K, C]( 18. handle: ShuffleHandle, 19. startPartition: Int, 20. endPartition: Int, 21. context: TaskContext): ShuffleReader[K, C] 22. 23. /** 24. *從ShuffleManager移除一個shuffle的元數據 25. * @return如果元數據成功刪除,則返回true,否則返回false 26. */ 27. def unregisterShuffle(shuffleId: Int): Boolean 28. 29. /** * 返回一個能夠根據塊坐標來檢索shuffle 塊數據的解析器 30. */ 31. def shuffleBlockResolver: ShuffleBlockResolver 32. 33. /** 關閉ShuffleManager */ 34. def stop(): Unit 35. }
Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具體讀取shuffle數據,是一個trait。在ShuffleBlockResolver中已無getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,這是核心。
ShuffleBlockResolver的源碼如下。
1. trait ShuffleBlockResolver { 2. type ShuffleId = Int 3. 4. /** *為指定的塊檢索數據。如果塊數據不可用,則拋出一個未指明的異常 5. */ 6. def getBlockData(blockId: ShuffleBlockId): ManagedBuffer 7. 8. def stop(): Unit 9. }
Spark 2.0版本中通過IndexShuffleBlockResolver來具體實現ShuffleBlockResolver (SortBasedShuffle方式),已無FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver創建和維護邏輯塊和物理文件位置之間的shuffle blocks映射關系。來自于相同map task任務的shuffle blocks數據存儲在單個合并數據文件中;數據文件中的數據塊的偏移量存儲在單獨的索引文件中。將shuffleBlockId + reduce ID set to 0 + ".后綴"作為數據shuffle data的shuffleBlockId名字。其中,文件名后綴為".data"的是數據文件;文件名后綴為".index"的是索引文件。
7.6.2 Shuffle寫數據的交互
基于Sort的Shuffle實現的ShuffleHandle包含BypassMergeSortShuffleHandle與BaseShuffleHandle。兩種ShuffleHandle寫數據的方法可以參考SortShuffleManager類的getWriter方法,關鍵代碼如下所示。
SortShuffleManager的getWriter的源碼如下。
1. override def getWriter[K, V]( 2. ....... 3. case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 4. new BypassMergeSortShuffleWriter( 5. env.blockManager, 6. shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 7. ........ 8. case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 9. new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 10. } 11. }
在對應構建的兩種數據寫入器類BypassMergeSortShuffleWriter與SortShuffleWriter中,都是通過變量shuffleBlockResolver對邏輯數據塊與物理數據塊的映射進行解析。BypassMergeSortShuffleWriter寫數據的具體實現位于實現的write方法中,其中調用的createTempShuffleBlock方法描述了各個分區所生成的中間臨時文件的格式與對應的BlockId。SortShuffleWriter寫數據的具體實現位于實現的write方法中。
7.6.3 Shuffle讀數據的交互
SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是獲取數據的閱讀器,getReader方法中創建了一個BlockStoreShuffleReader實例。SortShuffleManager. scala的read()方法的源碼如下。
1. override def getReader[K, C]( 2. handle: ShuffleHandle, 3. startPartition: Int, 4. endPartition: Int, 5. context: TaskContext): ShuffleReader[K, C] = { 6. new BlockStoreShuffleReader( 7. handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) 8. }
BlockStoreShuffleReader實例的read方法,首先實例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一個閱讀器,里面有一個成員blockManager。blockManager是內存和磁盤上數據讀寫的統一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()劃分本地和遠程的blocks,Utils.randomize(remoteRequests)把遠程請求通過隨機的方式添加到隊列中,fetchUpToMaxBytes()發送遠程請求獲取我們的blocks,fetchLocalBlocks()獲取本地的blocks。
7.6.4 BlockManager架構原理、運行流程圖和源碼解密
BlockManager是管理整個Spark運行時數據的讀寫,包含數據存儲本身,在數據存儲的基礎上進行數據讀寫。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相對而言是一個比較大的模塊,Spark中有非常多的模塊:調度模塊、資源管理模塊等。BlockManager是另外一個非常重要的模塊。BlockManager本身的源碼量非常大。本節從BlockManager原理流程對BlockManager做深刻地講解。在Shuffle讀寫數據的時候,我們需要讀寫BlockManager。因此,BlockManager是至關重要的內容。
編寫一個業務代碼WordCount.scala,通過觀察WordCount運行時BlockManager的日志來理解BlockManager的運行。
WordCount.scala的代碼如下。
1. package com.dt.spark.sparksql 2. 3. import org.apache.log4j.{Level, Logger} 4. import org.apache.spark.SparkConf 5. import org.apache.spark.SparkContext 6. import org.apache.spark.internal.config 7. import org.apache.spark.rdd.RDD 8. 9. /** 10. * 使用Scala開發本地測試的Spark WordCount程序 11. * 12. * @author DT大數據夢工廠 13. * 新浪微博:http://weibo.com/ilovepains/ 14. */ 15. object WordCount { 16. def main(args: Array[String]) { 17. Logger.getLogger("org").setLevel(Level.ALL) 18. 19. /** 20. *第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, 21. *例如,通過setMaster設置程序要鏈接的Spark集群的Master的URL,如果設置 22. *為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(如只有 23. *1GB的內存)的初學者 * 24. */ 25. val conf = new SparkConf() //創建SparkConf對象 26. conf.setAppName("Wow,My First Spark App!") //設置應用程序的名稱,在程序 //運行的監控界面可以看到名稱 27. conf.setMaster("local") //此時,程序在本地運行,不需要安裝Spark集群 28. /** 29. * 第2步:創建SparkContext對象 30. * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、 * R等都必須有一個SparkContext 31. * SparkContext 核心作用:初始化 Spark 應用程序運行所需要的核心組件,包括 * DAGScheduler、TaskScheduler、SchedulerBackend 32. * 同時還會負責Spark程序往Master注冊程序等 33. * SparkContext是整個Spark應用程序中最重要的一個對象 34. */ 35. val sc = new SparkContext(conf) //創建SparkContext對象,通過傳入SparkConf //實例來定制Spark運行的具體參數和配置信息 36. /** 37. * 第 3 步:根據具體的數據來源(如 HDFS、HBase、Local FS、DB、S3 等)通過 * SparkContext創建RDD 38. * RDD的創建基本有三種方式:根據外部的數據來源(如HDFS)、根據Scala集合、由 * 其他的RDD操作 39. * 數據會被RDD劃分為一系列的Partitions,分配到每個Partition的數據屬于一 * 個Task的處理范疇 40. */ 41. //val lines: RDD[String] = sc.textFile("D://Big_Data_Software spark- 1.6.0-bin-hadoop2.6README.md", 1) //讀取本地文件并設置為一個Partition 42. // val lines = sc.textFile("D://Big_Data_Software spark-1.6.0-bin- hadoop2.6//README.md", 1) //讀取本地文件并設置為一個Partition 43. 44. val lines = sc.textFile("data/wordcount/helloSpark.txt", 1) //讀取本地文件并設置為一個Partition 45. /** 46. * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等 * 高階函數等的編程,進行具體的數據計算 47. * 第4.1步:將每一行的字符串拆分成單個單詞 48. */ 49. 50. val words = lines.flatMap { line => line.split(" ") } //對每一行的字符串進行單詞拆分并把所有行的拆分結果通過flat合并成為一個大的單詞集合 51. 52. /** 53. * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等 * 高階函數等的編程,進行具體的數據計算 54. * 第4.2步:在單詞拆分的基礎上,對每個單詞實例計數為1,也就是word => (word, 1) 55. */ 56. val pairs = words.map { word => (word, 1) } 57. 58. /** 59. * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等 * 高階函數等的編程,進行具體的數據計算 60. * 第4.3步:在每個單詞實例計數為1基礎上,統計每個單詞在文件中出現的總次數 61. */ 62. val wordCountsOdered = pairs.reduceByKey(_ + _).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)) //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) 63. wordCountsOdered.collect.foreach(wordNumberPair => println (wordNumberPair._1 + " : " + wordNumberPair._2)) 64. while (true) { 65. 66. } 67. sc.stop() 68. 69. } 70. }
在IDEA中運行一個業務程序WordCount.scala,日志中顯示:
SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中數據的讀寫都和BlockManager關聯。
SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster進行注冊。
DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盤存儲的,里面有我們的數據。可以訪問Temp目錄下以blockmgr-開頭的文件的內容。
WordCount運行結果如下。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/06/06 05:37:57 INFO SparkContext: Running Spark version 2.1.0 3. ...... 4. 17/06/06 05:38:01 INFO SparkEnv: Registering MapOutputTracker 5. 17/06/06 05:38:01 DEBUG MapOutputTrackerMasterEndpoint: init 6. 17/06/06 05:38:01 INFO SparkEnv: Registering BlockManagerMaster 7. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache .spark.storage.DefaultTopologyMapper for getting topology information 8. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 9. 17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828 872c98804 10. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook 11. 17/06/06 05:38:01 DEBUG ShutdownHookManager: Adding shutdown hook 12. 17/06/06 05:38:01 INFO MemoryStore: MemoryStore started with capacity 637.2 MB 13. 17/06/06 05:38:02 INFO SparkEnv: Registering OutputCommitCoordinator 14. 17/06/06 05:38:02 DEBUG OutputCommitCoordinator$OutputCommitCoordinator- Endpoint: init 15. ........
從Application啟動的角度觀察BlockManager:
(1)Application啟動時會在SparkEnv中注冊BlockManagerMaster以及MapOutputTracker,其中,
a)BlockManagerMaster:對整個集群的Block數據進行管理。
b)MapOutputTrackerMaster:跟蹤所有的Mapper的輸出。
BlockManagerMaster中有一個引用driverEndpoint,isDriver判斷是否運行在Driver上。
BlockManagerMaster的源碼如下。
1. private[spark] 2. class BlockManagerMaster( 3. var driverEndpoint: RpcEndpointRef, 4. conf: SparkConf, 5. isDriver: Boolean) 6. extends Logging {
BlockManagerMaster注冊給SparkEnv,SparkEnv在SparkContext中。
SparkContext.scala的源碼如下。
1. ...... 2. private var _env: SparkEnv = _ 3. ...... 4. _env = createSparkEnv(_conf, isLocal, listenerBus) 5. SparkEnv.set(_env)
進入createSparkEnv方法:
1. private[spark] def createSparkEnv( 2. conf: SparkConf, 3. isLocal: Boolean, 4. listenerBus: LiveListenerBus): SparkEnv = { 5. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext. numDriverCores(master)) 6. }
進入SparkEnv.scala的createDriverEnv方法:
1. private[spark] def createDriverEnv( 2. ...... 3. create( 4. conf, 5. SparkContext.DRIVER_IDENTIFIER, 6. bindAddress, 7. advertiseAddress, 8. port, 9. isLocal, 10. numCores, 11. ioEncryptionKey, 12. listenerBus = listenerBus, 13. mockOutputCommitCoordinator = mockOutputCommitCoordinator 14. ) 15. } 16. ......
SparkEnv.scala的createDriverEnv中調用了create方法,判斷是否是Driver。create方法的源碼如下。
1. private def create( 2. conf: SparkConf, 3. executorId: String, 4. bindAddress: String, 5. advertiseAddress: String, 6. port: Int, 7. isLocal: Boolean, 8. numUsableCores: Int, 9. ioEncryptionKey: Option[Array[Byte]], 10. listenerBus: LiveListenerBus = null, 11. mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { 12. 13. val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER 14. ...... 15. if (isDriver) { 16. conf.set("spark.driver.port", rpcEnv.address.port.toString) 17. } else if (rpcEnv.address != null) { 18. conf.set("spark.executor.port", rpcEnv.address.port.toString) 19. logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port .toString}") 20. } 21. ...... 22. val mapOutputTracker = if (isDriver) { 23. new MapOutputTrackerMaster(conf, broadcastManager, isLocal) 24. } else { 25. new MapOutputTrackerWorker(conf) 26. } 27. ...... 28. SparkContext.scala 29. private[spark] val DRIVER_IDENTIFIER = "driver" 30. ......
在SparkEnv.scala的createDriverEnv中調用new()函數創建一個MapOutputTrackerMaster。MapOutputTrackerMaster的源碼如下。
1. private[spark] class MapOutputTrackerMaster(conf: SparkConf, 2. broadcastManager: BroadcastManager, isLocal: Boolean) 3. extends MapOutputTracker(conf) { 4. ......
然后看一下blockManagerMaster。在SparkEnv.scala中調用new()函數創建一個blockManagerMaster。
1. val blockManagerMaster = new BlockManagerMaster (registerOrLookupEndpoint( 2. BlockManagerMaster.DRIVER_ENDPOINT_NAME, 3. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), 4. conf, isDriver)
BlockManagerMaster對整個集群的Block數據進行管理,Block是Spark數據管理的單位,與數據存儲沒有關系,數據可能存在磁盤上,也可能存儲在內存中,還可能存儲在offline,如Alluxio上。源碼如下。
1. private[spark] 2. class BlockManagerMaster( 3. var driverEndpoint: RpcEndpointRef, 4. conf: SparkConf, 5. isDriver: Boolean) 6. extends Logging { 7. .....
構建BlockManagerMaster的時候調用new()函數創建一個BlockManagerMasterEndpoint,這是循環消息體。
1. private[spark] 2. class BlockManagerMasterEndpoint( 3. override val rpcEnv: RpcEnv, 4. val isLocal: Boolean, 5. conf: SparkConf, 6. listenerBus: LiveListenerBus) 7. extends ThreadSafeRpcEndpoint with Logging {
(2)BlockManagerMasterEndpoint本身是一個消息體,會負責通過遠程消息通信的方式去管理所有節點的BlockManager。
查看WordCount在IDEA中的運行日志,日志中顯示BlockManagerMasterEndpoint: Registering block manager,向block manager進行注冊。
1. ...... 2. 17/06/06 05:38:02 INFO BlockManager: Using org.apache.spark.storage. RandomBlockReplicationPolicy for block replication policy 3. 17/06/06 05:38:02 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.93.1, 63572, None) 4. 17/06/06 05:38:02 DEBUG DefaultTopologyMapper: Got a request for 192.168. 93.1 5. 17/06/06 05:38:02 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.93.1:63572 with 637.2 MB RAM, BlockManagerId(driver, 192.168.93.1, 63572, None) 6. 17/06/06 05:38:02 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.93.1, 63572, None) 7. 17/06/06 05:38:02 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.93.1, 63572, None) 8. .......
(3)每啟動一個ExecutorBackend,都會實例化BlockManager,并通過遠程通信的方式注冊給BlockManagerMaster;實質上是Executor中的BlockManager在啟動的時候注冊給了Driver上的BlockManagerMasterEndpoint。
(4)MemoryStore是BlockManager中專門負責內存數據存儲和讀寫的類。
查看WordCount在IDEA中的運行日志,日志中顯示MemoryStore: Block broadcast_0 stored as values in memory,數據存儲在內存中。
1. ....... 2. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 208.5 KB, free 637.0 MB) 3. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.0 KB, free 637.0 MB) 4. 17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB) 5. .......
Spark讀寫數據是以block為單位的,MemoryStore將block數據存儲在內存中。MemoryStore.scala的源碼如下。
1. private[spark] class MemoryStore( 2. conf: SparkConf, 3. blockInfoManager: BlockInfoManager, 4. serializerManager: SerializerManager, 5. memoryManager: MemoryManager, 6. blockEvictionHandler: BlockEvictionHandler) 7. extends Logging { 8. ......
(5)DiskStore是BlockManager中專門負責基于磁盤的數據存儲和讀寫的類。
Spark 2.1.1版本的DiskStore.scala的源碼如下。
1. private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { 2. .......
Spark 2.2.0版本的DiskStore.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第1行新增加了securityManager安全管理的成員變量。
1. ....... 2. securityManager: SecurityManager) extends Logging {
(6)DiskBlockManager:管理Logical Block與Disk上的Physical Block之間的映射關系并負責磁盤文件的創建、讀寫等。
查看WordCount在IDEA中的運行日志,日志中顯示INFO DiskBlockManager: Created local directory。DiskBlockManager負責磁盤文件的管理。
1. ..... 2. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache. spark.storage.DefaultTopologyMapper for getting topology information 3. 17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 4. 17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828 872c98804 5. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook 6. .......
DiskBlockManager負責管理邏輯級別和物理級別的映射關系,根據BlockID映射一個文件。在目錄spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件進行hash生成。通過createLocalDirs生成本地目錄。DiskBlockManager的源碼如下。
1. private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { 2. ...... 3. private def createLocalDirs(conf: SparkConf): Array[File] = { 4. Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => 5. try { 6. val localDir = Utils.createDirectory(rootDir, "blockmgr") 7. logInfo(s"Created local directory at $localDir") 8. Some(localDir) 9. } catch { 10. case e: IOException => 11. logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) 12. None 13. } 14. } 15. }
從Job運行的角度來觀察BlockManager:
查看WordCount.scala的運行日志:日志中顯示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,將BlockManagerInfo的廣播變量加入到內存中。
1. ...... 2. 17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.0 KB, free 637.0 MB) 3. 17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB) 4. ......
Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元數據,BlockManagerInfo的成員變量包括blockManagerId、系統當前時間timeMs、最大堆內內存maxOnHeapMem、最大堆外內存maxOffHeapMem、slaveEndpoint。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。
1. private[spark] class BlockManagerInfo( 2. val blockManagerId: BlockManagerId, 3. timeMs: Long, 4. val maxMem: Long, 5. val slaveEndpoint: RpcEndpointRef) 6. extends Logging {
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第4行刪除maxMem。
上段代碼中第4行之后新增maxOnHeapMem成員變量:最大的堆內內存大小。
上段代碼中第4行之后新增maxOffHeapMem成員變量:最大的堆外內存大小。
1. ...... 2. val maxOnHeapMem: Long, 3. val maxOffHeapMem: Long, 4. ...... 5. extends Logging {
集群中每啟動一個節點,就創建一個BlockManager,BlockManager是在每個節點(Driver及Executors)上運行的管理器,用于存放和檢索本地和遠程不同的存儲塊(內存、磁盤和堆外內存)。BlockManagerInfo中的BlockManagerId標明是哪個BlockManager,slaveEndpoint是消息循環體,用于消息通信。
(1)首先通過MemoryStore存儲廣播變量。
(2)在Driver中是通過BlockManagerInfo來管理集群中每個ExecutorBackend中的BlockManager中的元數據信息的。
(3)當改變了具體的ExecutorBackend上的Block信息后,就必須發消息給Driver中的BlockManagerMaster來更新相應的BlockManagerInfo。
(4)當執行第二個Stage的時候,第二個Stage會向Driver中的MapOutputTracker-MasterEndpoint發消息請求上一個Stage中相應的輸出,此時MapOutputTrackerMaster會把上一個Stage的輸出數據的元數據信息發送給當前請求的Stage。圖7-14是BlockManager工作原理和運行機制簡圖:

圖7-14 BlockManager工作原理和運行機制簡圖
BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下。
1. def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks. get(blockId))
其中的BlockStatus是一個case class。
1. case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) { 2. def isCached: Boolean = memSize + diskSize > 0 3. }
BlockTransferService.scala進行網絡連接操作,獲取遠程數據。
1. private[spark] 2. abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
7.6.5 BlockManager解密進階:BlockManager初始化和注冊解密、BlockManagerMaster工作解密、BlockTransferService解密、本地數據讀寫解密、遠程數據讀寫解密
BlockManager既可以運行在Driver上,也可以運行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave結構,一切的調度、一切的工作由Master觸發,Executor在啟動的時候一定會啟動BlockManager。BlockManager主要提供了讀和寫數據的接口,可以從本地讀寫數據,也可以從遠程讀寫數據。讀寫數據可以基于磁盤,也可以基于內存以及OffHeap。OffHeap就是堆外空間(如Alluxion是分布式內存管理系統,與基于內存計算的Spark系統形成天衣無縫的組合,在大數據領域中,Spark+Alluxion+Kafka是非常有用的組合)。
從整個程序運行的角度看,Driver也是Executor的一種,BlockManager可以運行在Driver上,也可以運行在Executor上。BlockManager.scala的源碼如下。
1. private[spark] class BlockManager( 2. executorId: String, 3. rpcEnv: RpcEnv, 4. val master: BlockManagerMaster, 5. val serializerManager: SerializerManager, 6. val conf: SparkConf, 7. memoryManager: MemoryManager, 8. mapOutputTracker: MapOutputTracker, 9. shuffleManager: ShuffleManager, 10. val blockTransferService: BlockTransferService, 11. securityManager: SecurityManager, 12. numUsableCores: Int) 13. extends BlockDataManager with BlockEvictionHandler with Logging { 14. ...... 15. val diskBlockManager = { 16. //如果外部服務不為shuffle 文件提供服務執行清理文件 17. val deleteFilesOnStop = 18. !externalShuffleServiceEnabled || executorId == SparkContext. DRIVER_IDENTIFIER 19. new DiskBlockManager(conf, deleteFilesOnStop) 20. } 21. ...... 22. private val futureExecutionContext = ExecutionContext.fromExecutorService( 23. ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) 24. ...... 25. private[spark] val memoryStore = 26. new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) 27. private[spark] val diskStore = new DiskStore(conf, diskBlockManager) 28. memoryManager.setMemoryStore(memoryStore) 29. ...... 30. def initialize(appId: String): Unit = { 31. .......
BlockManager中的成員變量中:BlockManagerMaster對整個集群的BlockManagerMaster進行管理;serializerManager是默認的序列化器;MemoryManager是內存管理;MapOutputTracker是Shuffle輸出的時候,要記錄ShuffleMapTask輸出的位置,以供下一個Stage使用,因此需要進行記錄。BlockTransferService是進行網絡操作的,如果要連同另外一個BlockManager進行數據讀寫操作,就需要BlockTransferService。Block是Spark運行時數據的最小抽象單位,可能放入內存中,也可能放入磁盤中,還可能放在Alluxio上。
SecurityManager是安全管理;numUsableCores是可用的Cores。
BlockManager中DiskBlockManager管理磁盤的讀寫,創建并維護磁盤上邏輯塊和物理塊之間的邏輯映射位置。一個block被映射到根據BlockId生成的一個文件,塊文件哈希列在目錄spark.local.dir中(如果設置了SPARK LOCAL DIRS),或在目錄(SPARK LOCAL DIRS)中。
然后在BlockManager中創建一個緩存池:block-manager-future以及memoryStore 、diskStore。
Shuffle讀寫數據的時候是通過BlockManager進行管理的。
Spark 2.1.1版本的BlockManager.scala的源碼如下。
1. var blockManagerId: BlockManagerId = _ 2. 3. //服務此Executor的shuffle文件的服務器的地址,這或者是外部的服務,或者只是我們 //自己的Executor的BlockManager 4. private[spark] var shuffleServerId: BlockManagerId = _ 5. 6. //客戶端讀取其他Executors的shuffle文件。這或者是一個外部服務,或者只是 //標準BlockTransferService 直接連接到其他Executors 7. 8. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { 9. val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) 10. new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), 11. securityManager.isSaslEncryptionEnabled()) 12. } else { 13. blockTransferService 14. }
Spark 2.2.0版本的BlockManager.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第11行ExternalShuffleClient類中去掉securityManager.isSaslEncryptionEnabled()成員變量。
1. ...... 2. new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) 3. ......
BlockManager.scala中,BlockManager實例對象通過調用initialize方法才能正式工作,傳入參數是appId,基于應用程序的ID初始化BlockManager。initialize不是在構造器的時候被使用,因為BlockManager實例化的時候還不知道應用程序的ID,應用程序ID是應用程序啟動時,ExecutorBackend向Master注冊時候獲得的。
BlockManager.scala的initialize方法中的BlockTransferService進行網絡通信。ShuffleClient是BlockManagerWorker每次啟動時向BlockManagerMaster注冊。BlockManager.scala的initialize方法中調用了registerBlockManager,向Master進行注冊,告訴BlockManagerMaster把自己注冊進去。
Spark 2.1.1版本BlockManagerMaster.scala的registerBlockManager的源碼如下。
1. def registerBlockManager( 2. blockManagerId: BlockManagerId, 3. maxMemSize: Long, 4. slaveEndpoint: RpcEndpointRef): BlockManagerId = { 5. logInfo(s"Registering BlockManager $blockManagerId") 6. val updatedId = driverEndpoint.askWithRetry[BlockManagerId]( 7. RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) 8. logInfo(s"Registered BlockManager $updatedId") 9. updatedId 10. }
Spark 2.2.0版本的BlockManagerMaster.scala的registerBlockManager的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第3行maxMemSize刪除。
上段代碼中第3行之后新增參數maxOnHeapMemSize:最大的堆內內存大小。
上段代碼中第3行之后新增參數maxOffHeapMemSize:最大的堆外內存大小。
上段代碼中第6行driverEndpoint.askWithRetry方法調整為driverEndpoint.askSync方法。
上段代碼中第7行RegisterBlockManager新增maxOnHeapMemSize、maxOffHeapMemSize兩個參數。
1. ...... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ...... 5. val updatedId = driverEndpoint.askSync[BlockManagerId]( 6. RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) 7. ......
registerBlockManager方法的RegisterBlockManager是一個case class。
Spark 2.1.1版本的BlockManagerMessages.scala的源碼如下。
1. case class RegisterBlockManager( 2. blockManagerId: BlockManagerId, 3. maxMemSize: Long, 4. sender: RpcEndpointRef) 5. extends ToBlockManagerMaster
Spark 2.2.0版本的BlockManagerMessages.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第3行maxMemSize刪除。
上段代碼中第3行之后新增成員變量maxOnHeapMemSize:最大堆內內存大小。
上段代碼中第3行之后新增成員變量maxOffHeapMemSize:最大堆外內存大小。
1. ...... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ......
在Executor實例化的時候,要初始化blockManager。blockManager在initialize中將應用程序ID傳進去。
Executor.scala的源碼如下。
1. if (!isLocal) { 2. env.metricsSystem.registerSource(executorSource) 3. env.blockManager.initialize(conf.getAppId) 4. }
Executor.scala中,Executor每隔10s向Master發送心跳消息,如收不到心跳消息,blockManager須重新注冊。
Spark 2.1.1版本的Executor.scala的源碼如下。
1. ....... 2. val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) 3. try { 4. val response = heartbeatReceiverRef.askWithRetry [HeartbeatResponse]( 5. message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) 6. if (response.reregisterBlockManager) { 7. logInfo("Told to re-register on heartbeat") 8. env.blockManager.reregister() 9. } 10. heartbeatFailures = 0 11. } catch { 12. case NonFatal(e) => 13. logWarning("Issue communicating with driver in heartbeater", e) 14. heartbeatFailures += 1 15. if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { 16. logError(s"Exit as unable to send heartbeats to driver " + 17. s"more than $HEARTBEAT_MAX_FAILURES times") 18. System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) 19. } 20. } 21. } 22. .......
Spark 2.2.0版本的Executor.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第4行heartbeatReceiverRef.askWithRetry方法調整為heartbeatReceiverRef.askSync方法。
1. ....... 2. val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( 3. message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
回到BlockManagerMaster.scala的registerBlockManager:
registerBlockManager中RegisterBlockManager傳入的slaveEndpoint是:具體的Executor啟動時會啟動一個BlockManagerSlaveEndpoint,會接收BlockManagerMaster發過來的指令。在initialize方法中通過master.registerBlockManager傳入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中調用new()函數創建的BlockManagerSlaveEndpoint。
總結一下:
(1)當Executor實例化的時候,會通過BlockManager.initialize來實例化Executor上的BlockManager,并且創建BlockManagerSlaveEndpoint這個消息循環體來接受Driver中BlockManagerMaster發過來的指令,如刪除Block等。
1. env.blockManager.initialize(conf.getAppId)
BlockManagerSlaveEndpoint.scala的源碼如下。
1. class BlockManagerSlaveEndpoint( 2. override val rpcEnv: RpcEnv, 3. blockManager: BlockManager, 4. mapOutputTracker: MapOutputTracker) 5. extends ThreadSafeRpcEndpoint with Logging {
(2)當BlockManagerSlaveEndpoint實例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注冊。
BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是構建BlockManagerMaster時傳進去的。
(3)BlockManagerMasterEndpoint接收到Executor上的注冊信息并進行處理。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。
1. class BlockManagerMasterEndpoint( 2. override val rpcEnv: RpcEnv, 3. ...... 4. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 5. case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => 6. context.reply(register(blockManagerId, maxMemSize, slaveEndpoint)) 7. ......
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第5行RegisterBlockManager新增成員變量:maxOnHeapMemSize、maxOffHeapMemSize。
上段代碼中第6行register新增成員變量:maxOnHeapMemSize、maxOffHeapMemSize。
1. ...... 2. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 3. case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => 4. context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) 5. ......
BlockManagerMasterEndpoint的register注冊方法,為每個Executor的BlockManager生成對應的BlockManagerInfo。BlockManagerInfo是一個HashMap[BlockManagerId, BlockManagerInfo]。
register注冊方法源碼如下。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源碼如下。
1. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] 2. ...... 3. private def register( 4. idWithoutTopologyInfo: BlockManagerId, 5. maxMemSize: Long, 6. slaveEndpoint: RpcEndpointRef): BlockManagerId = { 7. //dummy id不應包含拓撲信息 8. //我們在這里得到信息和回應一個塊標識符 9. val id = BlockManagerId( 10. idWithoutTopologyInfo.executorId, 11. idWithoutTopologyInfo.host, 12. idWithoutTopologyInfo.port, 13. topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host)) 14. 15. val time = System.currentTimeMillis() 16. if (!blockManagerInfo.contains(id)) { 17. blockManagerIdByExecutor.get(id.executorId) match { 18. case Some(oldId) => 19. //同一個Executor 的塊管理器已經存在,所以刪除它(假定已掛掉) 20. logError("Got two different block manager registrations on same executor - " 21. + s" will replace old one $oldId with new one $id") 22. removeExecutor(id.executorId) 23. case None => 24. } 25. logInfo("Registering block manager %s with %s RAM, %s".format( 26. id.hostPort, Utils.bytesToString(maxMemSize), id)) 27. 28. blockManagerIdByExecutor(id.executorId) = id 29. 30. blockManagerInfo(id) = new BlockManagerInfo( 31. id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) 32. } 33. listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) 34. id 35. }......
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第5行刪掉maxMemSize。
上段代碼中第5行之后Register新增參數maxOnHeapMemSize:最大堆內內存大?。籱axOffHeapMemSize:最大堆外內存大小。
上段代碼中第26行日志打印時新增最大堆內內存大小、最大堆外內存大小的信息。
上段代碼中第31行構建BlockManagerInfo實例時傳入maxOnHeapMemSize、maxOffHeapMemSize。
上段代碼中第33行listenerBus監控系統增加對最大堆內內存大小、最大堆外內存大小信息的監控。
1. ....... 2. maxOnHeapMemSize: Long, 3. maxOffHeapMemSize: Long, 4. ...... 5. id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id)) 6. ....... 7. 8. blockManagerInfo(id) = new BlockManagerInfo( 9. id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) 10. ....... 11. listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, 12. Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) 13. .......
BlockManagerMasterEndpoint中,BlockManagerId是一個class,標明了BlockManager在哪個Executor中,以及host主機名、port端口等信息。
BlockManagerId.scala的源碼如下。
1. class BlockManagerId private ( 2. private var executorId_ : String, 3. private var host_ : String, 4. private var port_ : Int, 5. private var topologyInfo_ : Option[String]) 6. extends Externalizable {
BlockManagerMasterEndpoint中,BlockManagerInfo包含內存、slaveEndpoint等信息。
回到BlockManagerMasterEndpoint的register注冊方法:如果blockManagerInfo沒有包含BlockManagerId,根據BlockManagerId.executorId查詢BlockManagerId,如果匹配到舊的BlockManagerId,就進行清理。
BlockManagerMasterEndpoint的removeExecutor方法如下。
1. private def removeExecutor(execId: String) { 2. logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") 3. blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) 4. }
進入removeBlockManager方法,從blockManagerIdByExecutor數據結構中清理掉block manager信息,從blockManagerInfo數據結構中清理掉所有的blocks信息。removeBlockManager源碼如下。
Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源碼如下。
1. private def removeBlockManager(blockManagerId: BlockManagerId) { 2. val info = blockManagerInfo(blockManagerId) 3. 4. //從blockManagerIdByExecutor刪除塊管理 5. blockManagerIdByExecutor -= blockManagerId.executorId 6. 7. //將它從blockManagerInfo 刪除所有的塊 8. blockManagerInfo.remove(blockManagerId) 9. val iterator = info.blocks.keySet.iterator 10. while (iterator.hasNext) { 11. val blockId = iterator.next 12. val locations = blockLocations.get(blockId) 13. locations -= blockManagerId 14. if (locations.size == 0) { 15. blockLocations.remove(blockId) 16. } 17. } 18. listenerBus.post(SparkListenerBlockManagerRemoved(System. currentTimeMillis(), blockManagerId)) 19. logInfo(s"Removing block manager $blockManagerId") 20. }
Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第16~20行整體替換為以下代碼:新增數據復制處理。
1. ....... 2. //如果沒有塊管理器,就注銷這個塊。否則,如果主動復制啟用,塊block是一個RDD //或測試塊 block(后者用于單元測試),我們發送一條消息隨機選擇Executor的位 //置來復制給定塊block。注意,我們忽略了其他塊block類型(如廣播broadcast/ //shuffle blocks),因為復制在這種情況下沒有多大意義 3. ...... 4. logWarning(s"No more replicas available for $blockId !") 5. } else if (proactivelyReplicate && (blockId.isRDD || blockId. isInstanceOf[TestBlockId])) { 6. 7. //假設Executor未能找出故障前存在的副本數量 8. val maxReplicas = locations.size + 1 9. val i = (new Random(blockId.hashCode)).nextInt(locations.size) 10. val blockLocations = locations.toSeq 11. val candidateBMId = blockLocations(i) 12. blockManagerInfo.get(candidateBMId).foreach { bm => 13. val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) 14. val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) 15. bm.slaveEndpoint.ask[Boolean](replicateMsg) 16. } 17. } 18. } 19. .........
removeBlockManager中的一行代碼blockLocations.remove的remove方法如下。
HashMap.java的源碼如下。
1. public V remove(Object key) { 2. Node<K,V> e; 3. return (e = removeNode(hash(key), key, null, false, true)) == null ? 4. null : e.value; 5. }
回到BlockManagerMasterEndpoint的register注冊方法:然后在blockManagerIdByExecutor中加入BlockManagerId,將BlockManagerId加入BlockManagerInfo信息,在listenerBus中進行監聽,函數返回BlockManagerId,完成注冊。
回到BlockManager.scala,在initialize方法通過master.registerBlockManager注冊成功以后,將返回值賦值給idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。
reportAllBlocks方法:具體的Executor須向Driver不斷地匯報自己的狀態。
BlockManager.scala的reportAllBlocks方法的源碼如下。
1. private def reportAllBlocks(): Unit = { 2. logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") 3. for ((blockId, info) <- blockInfoManager.entries) { 4. val status = getCurrentBlockStatus(blockId, info) 5. if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) { 6. logError(s"Failed to report $blockId to master; giving up.") 7. return 8. } 9. } 10. }
reportAllBlocks方法中調用了getCurrentBlockStatus,包括內存、磁盤等信息。
getCurrentBlockStatus的源碼如下。
1. private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { 2. info.synchronized { 3. info.level match { 4. case null => 5. BlockStatus.empty 6. case level => 7. val inMem = level.useMemory && memoryStore.contains(blockId) 8. val onDisk = level.useDisk && diskStore.contains(blockId) 9. val deserialized = if (inMem) level.deserialized else false 10. val replication = if (inMem || onDisk) level.replication else 1 11. val storageLevel = StorageLevel( 12. useDisk = onDisk, 13. useMemory = inMem, 14. useOffHeap = level.useOffHeap, 15. deserialized = deserialized, 16. replication = replication) 17. val memSize = if (inMem) memoryStore.getSize(blockId) else 0L 18. val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L 19. BlockStatus(storageLevel, memSize, diskSize) 20. } 21. } 22. }
getCurrentBlockStatus方法中的BlockStatus,包含存儲級別StorageLevel、內存大小、磁盤大小等信息。
BlockManagerMasterEndpoint.scala的BlockStatus的源碼如下。
1. case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) { 2. def isCached: Boolean = memSize + diskSize > 0 3. } 4. ...... 5. object BlockStatus { 6. def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) 7. }
回到BlockManager.scala,其中的getLocationBlockIds方法比較重要,根據BlockId獲取這個BlockId所在的BlockManager。
BlockManager.scala的getLocationBlockIds的源碼如下。
1. private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq [BlockManagerId]] = { 2. val startTimeMs = System.currentTimeMillis 3. val locations = master.getLocations(blockIds).toArray 4. logDebug("Got multiple block location in %s".format (Utils.getUsedTimeMs(startTimeMs))) 5. locations 6. }
getLocationBlockIds方法中根據BlockId通過master.getLocations向Master獲取位置信息,因為master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint發送GetLocationsMultipleBlockIds消息。
Spark 2.1.1版本的BlockManagerMaster.scala的getLocations方法的源碼如下。
1. def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq [BlockManagerId]] = { 2. driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]]( 3. GetLocationsMultipleBlockIds(blockIds)) 4. }
Spark 2.2.0版本的BlockManagerMaster.scala的getLocations方法的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第2行driverEndpoint.askWithRetry方法調整為driverEndpoint. askSync方法。
1. ...... 2. driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]]( 3. .......
getLocations中的GetLocationsMultipleBlockIds是一個case class。
1. case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
在BlockManagerMasterEndpoint側接收GetLocationsMultipleBlockIds消息。
BlockManagerMasterEndpoint.scala的receiveAndReply方法如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case GetLocationsMultipleBlockIds(blockIds) => 4. context.reply(getLocationsMultipleBlockIds(blockIds))
進入getLocationsMultipleBlockIds方法,進行map操作,開始查詢位置信息。
1. private def getLocationsMultipleBlockIds( 2. blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { 3. blockIds.map(blockId => getLocations(blockId)) 4. }
進入getLocations方法,首先判斷內存緩存結構blockLocations中是否包含blockId,如果已包含,就獲取位置信息,否則返回空的信息。
1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 2. if (blockLocations.containsKey(blockId)) blockLocations.get (blockId).toSeq else Seq.empty 3. }
其中,blockLocations是一個重要的數據結構,是一個JHashMap。Key是BlockId。Value是一個HashSet[BlockManagerId],使用HashSet。因為每個BlockId在磁盤上有副本,不同機器的位置不一樣,而且不同副本對應的BlockManagerId不一樣,位于不同的機器上,所以使用HashSet數據結構。
BlockManagerMasterEndpoint.scala的blockLocations的源碼如下。
1. private val blockLocations = new JHashMap[BlockId, mutable.HashSet [BlockManagerId]]
回到BlockManager.scala,getLocalValues是一個重要的方法,從blockInfoManager中獲取本地數據。
首先根據blockId從blockInfoManager中獲取BlockInfo信息。
從BlockInfo信息獲取level級別,根據level.useMemory && memoryStore.contains (blockId)判斷是否在內存中,如果在內存中,就從memoryStore中獲取數據。
根據level.useDisk && diskStore.contains(blockId)判斷是否在磁盤中,如果在磁盤中,就從diskStore中獲取數據。
Spark 2.1.1版本的BlockManager.scala的getLocalValues方法的源碼如下。
1. def getLocalValues(blockId: BlockId): Option[BlockResult] = { 2. logDebug(s"Getting local block $blockId") 3. blockInfoManager.lockForReading(blockId) match { 4. case None => 5. logDebug(s"Block $blockId was not found") 6. None 7. case Some(info) => 8. val level = info.level 9. logDebug(s"Level for block $blockId is $level") 10. if (level.useMemory && memoryStore.contains(blockId)) { 11. val iter: Iterator[Any] = if (level.deserialized) { 12. memoryStore.getValues(blockId).get 13. } else { 14. serializerManager.dataDeserializeStream( 15. blockId, memoryStore.getBytes(blockId).get.toInputStream()) (info.classTag) 16. } 17. val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) 18. Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) 19. } else if (level.useDisk && diskStore.contains(blockId)) { 20. val iterToReturn: Iterator[Any] = { 21. val diskBytes = diskStore.getBytes(blockId) 22. if (level.deserialized) { 23. val diskValues = serializerManager.dataDeserializeStream( 24. blockId, 25. diskBytes.toInputStream(dispose = true))(info.classTag) 26. maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) 27. } else { 28. val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) 29. .map {_.toInputStream(dispose = false)} 30. .getOrElse { diskBytes.toInputStream(dispose = true) } 31. serializerManager.dataDeserializeStream(blockId, stream) (info.classTag) 32. } 33. } 34. val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) 35. Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) 36. } else { 37. handleLocalReadFailure(blockId) 38. } 39. } 40. }
Spark 2.2.0版本的BlockManager.scala的getLocalValues方法的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第9行之后新增taskAttemptId的創建。
上段代碼中第17行releaseLock新增一個參數taskAttemptId。
上段代碼中第21、25、28、30行diskBytes更新為diskData。
上段代碼中第21行之后新增val iterToReturn: Iterator[Any]。
上段代碼中第25行diskData.toInputStream方法刪掉dispose = true參數。
上段代碼中第34行CompletionIterator的第二個參數調整為releaseLockAndDispose (blockId, diskData, taskAttemptId)。
1. ....... 2. val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) 3. ...... 4. //在迭代器iterator完成觸發時,我們需要從一個沒有TaskContext上下文的 //線程捕獲taskId,參閱spark-18406討論 5. val ci = CompletionIterator[Any, Iterator[Any]](iter, { 6. releaseLock(blockId, taskAttemptId) 7. ........ 8. val diskData = diskStore.getBytes(blockId) 9. val iterToReturn: Iterator[Any] = { 10. ........ 11. diskData.toInputStream())(info.classTag) 12. ........ 13. val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) 14. ........ 15. .getOrElse { diskData.toInputStream() } 16. ........ 17. releaseLockAndDispose(blockId, diskData, taskAttemptId) 18. ........
回到BlockManager.scala,getRemoteValues方法從遠程的BlockManager中獲取block數據,在JVM中不需要去獲取鎖。
BlockManager.scala的getRemoteValues方法的源碼如下。
1. private def getRemoteValues[T: ClassTag](blockId: BlockId): Option [BlockResult] = { 2. val ct = implicitly[ClassTag[T]] 3. getRemoteBytes(blockId).map { data => 4. val values = 5. serializerManager.dataDeserializeStream(blockId, data.toInputStream (dispose = true))(ct) 6. new BlockResult(values, DataReadMethod.Network, data.size) 7. } 8. }
getRemoteValues方法中調用getRemoteBytes,獲取遠程的數據,如果獲取的失敗次數超過最大的獲取次數(locations.size),就提示失敗,返回空值;如果獲取到遠程數據,就返回。
getRemoteBytes方法調用blockTransferService.fetchBlockSync方法實現遠程獲取數據。
BlockTransferService.scala的fetchBlockSync方法的源碼如下。
Spark 2.1.1版本的BlockTransferService.scala的fetchBlockSync方法的源碼如下。
1. def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { 2. //線程等待的監視器 3. val result = Promise[ManagedBuffer]() 4. fetchBlocks(host, port, execId, Array(blockId), 5. new BlockFetchingListener { 6. override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { 7. result.failure(exception) 8. } 9. override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { 10. val ret = ByteBuffer.allocate(data.size.toInt) 11. ret.put(data.nioByteBuffer()) 12. ret.flip() 13. result.success(new NioManagedBuffer(ret)) 14. } 15. }) 16. ThreadUtils.awaitResult(result.future, Duration.Inf) 17. }
Spark 2.2.0版本的BlockTransferService.scala的fetchBlockSync方法的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第15行fetchBlocks方法新增了shuffleFiles = null參數。fetchBlocks方法用于異步從遠程節點獲取序列塊,僅在調用[init]之后可用。注意,這個API需要一個序列,可以實現批處理請求,而不是返回一個future,底層實現可以調用onBlockFetchSuccess盡快獲取塊的數據,而不是等待所有塊被取出來。
1. ...... 2. }, shuffleFiles = null) 3. .......
fetchBlockSync中調用fetchBlocks方法,NettyBlockTransferService繼承自BlockTransferService,是BlockTransferService實現子類。
Spark 2.1.1版本的NettyBlockTransferService的fetchBlocks的源碼如下。
1. override def fetchBlocks( 2. host: String, 3. port: Int, 4. execId: String, 5. blockIds: Array[String], 6. listener: BlockFetchingListener): Unit = { 7. logTrace(s"Fetch blocks from $host:$port (executor id $execId)") 8. try { 9. val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { 10. override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { 11. val client = clientFactory.createClient(host, port) 12. new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() 13. } 14. } 15. 16. val maxRetries = transportConf.maxIORetries() 17. if (maxRetries > 0) { 18. //注意,Fetcher將正確處理maxRetries等于0的情況;避免它在代碼中產生Bug, //一旦確定了穩定性,就應該刪除if語句 19. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() 20. } else { 21. blockFetchStarter.createAndStart(blockIds, listener) 22. } 23. } catch { 24. case e: Exception => 25. logError("Exception while beginning fetchBlocks", e) 26. blockIds.foreach(listener.onBlockFetchFailure(_, e)) 27. } 28. }
Spark 2.2.0版本的NettyBlockTransferService的fetchBlocks的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第6行fetchBlocks方法新增了shuffleFiles參數。
1. ....... 2. shuffleFiles: Array[File]): Unit = { 3. .......
回到BlockManager.scala,無論是doPutBytes(),還是doPutIterator()方法中,都會使用doPut方法。
BlockManager.scala的doPut方法的源碼如下。
1. private def doPut[T]( 2. blockId: BlockId, 3. level: StorageLevel, 4. classTag: ClassTag[_], 5. tellMaster: Boolean, 6. keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { 7. require(blockId != null, "BlockId is null") 8. require(level != null && level.isValid, "StorageLevel is null or invalid") 9. 10. val putBlockInfo = { 11. val newInfo = new BlockInfo(level, classTag, tellMaster) 12. if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { 13. newInfo 14. } else { 15. logWarning(s"Block $blockId already exists on this machine; not re-adding it") 16. if (!keepReadLock) { 17. //在現有的塊上lockNewBlockForWriting 返回一個讀鎖,所以我們必須釋放它 18. releaseLock(blockId) 19. } 20. return None 21. } 22. } 23. 24. val startTimeMs = System.currentTimeMillis 25. var exceptionWasThrown: Boolean = true 26. val result: Option[T] = try { 27. val res = putBody(putBlockInfo) 28. exceptionWasThrown = false 29. ...... 30. result 31. }
doPut方法中,lockNewBlockForWriting寫入一個新的塊前先嘗試獲得適當的鎖,如果我們是第一個寫塊,獲得寫入鎖后繼續后續操作。否則,如果另一個線程已經寫入塊,須等待寫入完成,才能獲取讀取鎖,調用new()函數創建一個BlockInfo賦值給putBlockInfo,然后通過putBody(putBlockInfo)將數據存入。putBody是一個匿名函數,輸入BlockInfo,輸出的是一個泛型Option[T]。putBody函數體內容是doPutIterator方法(doPutBytes方法也類似調用doPut)調用doPut時傳入的。
BlockManager.scala的doPutIterator調用doput方法,在其putBody匿名函數體中進行判斷:
如果是level.useMemory,則在memoryStore中放入數據。
如果是level.useDisk,則在diskStore中放入數據。
如果level.replication大于1,則在其他節點中存入副本數據。
其中,BlockManager.scala的replicate方法的副本復制源碼如下。
Spark 2.1.1版本的BlockManager.scala的replicate方法的源碼如下。
1. private def replicate( 2. blockId: BlockId, 3. data: ChunkedByteBuffer, 4. level: StorageLevel, 5. classTag: ClassTag[_]): Unit = { 6. ...... 7. while(numFailures <= maxReplicationFailures && 8. !peersForReplication.isEmpty && 9. peersReplicatedTo.size != numPeersToReplicateTo) { 10. val peer = peersForReplication.head 11. try { 12. val onePeerStartTime = System.nanoTime 13. logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") 14. blockTransferService.uploadBlockSync( 15. peer.host, 16. peer.port, 17. peer.executorId, 18. blockId, 19. new NettyManagedBuffer(data.toNetty), 20. tLevel, 21. classTag) 22. ......
Spark 2.2.0版本的BlockManager.scala的replicate方法的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第5行replicate方法中新增了existingReplicas參數。
上段代碼中第19行uploadBlockSync方法的第5個參數由NettyManagedBuffer實例調整為BlockManagerManagedBuffer實例。
1. ....... 2. existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { 3. ...... 4. new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false), 5. ......
replicate方法中調用了blockTransferService.uploadBlockSync方法。
BlockTransferService.scala的uploadBlockSync的源碼如下。
1. def uploadBlockSync( 2. hostname: String, 3. port: Int, 4. execId: String, 5. blockId: BlockId, 6. blockData: ManagedBuffer, 7. level: StorageLevel, 8. classTag: ClassTag[_]): Unit = { 9. val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) 10. ThreadUtils.awaitResult(future, Duration.Inf) 11. } 12. }
uploadBlockSync中又調用uploadBlock方法,BlockTransferService.scala的uploadBlock方法無具體實現,NettyBlockTransferService是BlockTransferService的子類,具體實現uploadBlock方法。
NettyBlockTransferService的uploadBlock的源碼如下。
1. override def uploadBlock( 2. hostname: String, 3. port: Int, 4. execId: String, 5. blockId: BlockId, 6. blockData: ManagedBuffer, 7. level: StorageLevel, 8. classTag: ClassTag[_]): Future[Unit] = { 9. val result = Promise[Unit]() 10. val client = clientFactory.createClient(hostname, port) 11. 12. //使用JavaSerializer序列號器將StorageLevel和ClassTag序列化。其他一切都 //用我們的二進制協議編碼 13. val metadata = JavaUtils.bufferToArray(serializer.newInstance(). serialize((level, classTag))) 14. 15. //為了序列化,轉換或復制NIO緩沖到數組 16. val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) 17. 18. client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, 19. new RpcResponseCallback { 20. override def onSuccess(response: ByteBuffer): Unit = { 21. logTrace(s"Successfully uploaded block $blockId") 22. result.success((): Unit) 23. } 24. override def onFailure(e: Throwable): Unit = { 25. logError(s"Error while uploading block $blockId", e) 26. result.failure(e) 27. } 28. }) 29. 30. result.future 31. }
回到BlockManager.scala,看一下dropFromMemory方法。如果存儲級別定位為MEMORY_AND_DISK,那么數據可能放在內存和磁盤中,內存夠的情況下不會放到磁盤上;如果內存不夠,就放到磁盤上,這時就會調用dropFromMemory。如果存儲級別不是定義為MEMORY_AND_DISK,而只是存儲在內存中,內存不夠時,緩存的數據此時就會丟棄。如果仍需要數據,那就要重新計算。
Spark 2.1.1版本的BlockManager.scala的dropFromMemory的源碼如下。
1. private[storage] override def dropFromMemory[T: ClassTag]( 2. blockId: BlockId, 3. data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { 4. logInfo(s"Dropping block $blockId from memory") 5. val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) 6. var blockIsUpdated = false 7. val level = info.level 8. 9. //如果存儲級別要求,則保存到磁盤 10. if (level.useDisk && !diskStore.contains(blockId)) { 11. logInfo(s"Writing block $blockId to disk") 12. data() match { 13. case Left(elements) => 14. diskStore.put(blockId) { fileOutputStream => 15. serializerManager.dataSerializeStream( 16. blockId, 17. fileOutputStream, 18. elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) 19. } 20. case Right(bytes) => 21. diskStore.putBytes(blockId, bytes) 22. } 23. blockIsUpdated = true 24. } 25. 26. //實際由內存存儲 27. val droppedMemorySize = 28. if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L 29. val blockIsRemoved = memoryStore.remove(blockId) 30. if (blockIsRemoved) { 31. blockIsUpdated = true 32. } else { 33. logWarning(s"Block $blockId could not be dropped from memory as it does not exist") 34. } 35. 36. val status = getCurrentBlockStatus(blockId, info) 37. if (info.tellMaster) { 38. reportBlockStatus(blockId, status, droppedMemorySize) 39. } 40. if (blockIsUpdated) { 41. addUpdatedBlockStatusToTaskMetrics(blockId, status) 42. } 43. status.storageLevel 44. }
Spark 2.2.0版本的BlockManager.scala的dropFromMemory的源碼與Spark 2.1.1版本相比具有如下特點。
上段代碼中第14行fileOutputStream名稱調整為channel。
上段代碼中第14行之后新增代碼:val out = Channels.newOutputStream(channel)。
上段代碼中第17行fileOutputStream調整為out。
1. ........ 2. diskStore.put(blockId) { channel => 3. val out = Channels.newOutputStream(channel) 4. ........ 5. out, 6. ........
總結:dropFromMemory是指在內存不夠的時候,嘗試釋放一部分內存給要使用內存的應用,釋放的這部分內存數據需考慮是丟棄,還是放到磁盤上。如果丟棄,如5000個步驟作為一個Stage,前面4000個步驟進行了Cache,Cache時可能有100萬個partition分區單位,其中丟棄了100個,丟棄的100個數據就要重新計算;但是,如果設置了同時放到內存和磁盤,此時會放入磁盤中,下次如果需要,就可以從磁盤中讀取數據,而不是重新計算。
- Big Data Analytics with Hadoop 3
- Ansible Configuration Management
- 火格局的時空變異及其在電網防火中的應用
- Hands-On Machine Learning on Google Cloud Platform
- Drupal 7 Multilingual Sites
- 計算機應用基礎·基礎模塊
- 條碼技術及應用
- 水晶石精粹:3ds max & ZBrush三維數字靜幀藝術
- 四向穿梭式自動化密集倉儲系統的設計與控制
- Learning C for Arduino
- 面向對象程序設計綜合實踐
- 單片機C語言程序設計完全自學手冊
- Containers in OpenStack
- Learning Linux Shell Scripting
- 青少年VEX IQ機器人實訓課程(初級)