書名: Spark大數(shù)據(jù)商業(yè)實戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)作者名: 王家林本章字?jǐn)?shù): 2459字更新時間: 2019-12-12 17:30:05
10.1 Spark中Broadcast原理和源碼詳解
本節(jié)講解Spark中Broadcast原理及Spark中Broadcast源碼。
10.1.1 Spark中Broadcast原理詳解
Broadcast在機器學(xué)習(xí)、圖計算、構(gòu)建日常的各種算法中到處可見。 Broadcast將數(shù)據(jù)從一個節(jié)點發(fā)送到其他節(jié)點上;例如,Driver上有一張表,而Executor中的每個并行執(zhí)行的Task (100萬個Task)都要查詢這張表,那我們通過Broadcast的方式只需要往每個Executor發(fā)送一次這張表就行了,Executor中的每個運行的Task查詢這張唯一的表,而不是每次執(zhí)行的時候都從Driver獲得這張表。
Java中的Servlet里有一個ServletContext,是JSP或Java代碼運行時的上下文,通過上下文可以獲取各種資源。Broadcast類似于ServletContext中的資源、變量或數(shù)據(jù),Broadcast廣播出去是基于Executor的,里面的每個任務(wù)可以用上下文,Task的上下文就是Executor,可以抓取數(shù)據(jù)。這就好像ServletContext的具體作用,只是Broadcast是分布式的共享數(shù)據(jù),默認(rèn)情況下,只要程序在運行,Broadcast變量就會存在,因為Broadcast在底層是通過BlockManager管理的。但是,你可以手動指定或者配置具體周期來銷毀Broadcast變量??梢灾付˙roadcast的unpersist銷毀Broadcast變量,因為Spark應(yīng)用程序中可能運行很多Job,可能一個Job需要很多Broadcast變量,但下一個Job不需要這些變量,但是應(yīng)用程序還存在,因此需手工銷毀Broadcast變量。
Broadcast一般用于處理共享配置文件、通用的Dataset、常用的數(shù)據(jù)結(jié)構(gòu)等;但是在Broadcast中不適合存放太大的數(shù)據(jù),Broadcast不會內(nèi)存溢出,因為其數(shù)據(jù)的保存的StorageLevel是MEMORY_AND_DISK的方式;雖然如此,我們也不可以放入太大的數(shù)據(jù)在Broadcast中,因為網(wǎng)絡(luò)I/O和可能的單點壓力會非常大?。⊿park 1.6版本Broadcast有兩種方式:HttpBroadcast、TorrentBroadcast。HttpBroadcast可能有單點壓力; TorrentBroadcast下載沒有單點壓力,但可能有網(wǎng)絡(luò)壓力)Spark 2.0版本中已經(jīng)去掉HTTPBroadcast (SPARK-12588)了,Spark 2.0版本的TorrentBroadcast是Broadcast唯一的廣播實現(xiàn)方式。
廣播Broadcast變量是只讀變量,如果Broadcast不是只讀變量而可以更新,那帶來的問題是:①一個節(jié)點上Broadcast可以更新,其他的節(jié)點Broadcast也要更新;②如果多個節(jié)點Broadcast同時更新,如何確定更新的順序,以及容錯等內(nèi)容。因此,廣播Broadcast變量是只讀變量,最為輕松保持了數(shù)據(jù)的一致性!
Broadcast廣播變量是只讀變量,緩存在每個節(jié)點上,而不是每個Task去獲取它的一份復(fù)制副本。例如,以高效的方式給每個節(jié)點發(fā)送一個dataset的副本。Spark嘗試在分布式發(fā)送廣播變量時使用高效的廣播算法減少通信的成本。
廣播變量是由一個變量V通過調(diào)用[org.apache.spark.SparkContext#broadcast]創(chuàng)建的。廣播變量是一個圍繞V的包裝器,它的值可以通過調(diào)用value方法來獲取。例如:
1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) 2. broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0) 3. 4. scala> broadcastVar.value 5. res0: Array[Int] = Array(1, 2, 3)
如果要更新廣播變量,只有再廣播一次,那就是一個新的廣播變量,使用一個新的廣播變量ID。
廣播變量創(chuàng)建后,在群集上運行時,V變量不是在任何函數(shù)都使用,以便V傳送到節(jié)點時不止一次。此外,對象V不應(yīng)該被修改,是為了確保廣播所有節(jié)點得到相同的廣播變量值(例如,如果變量被發(fā)送到后來的一個新節(jié)點)。
Broadcast的源碼如下。
1. @param id 廣播變量的唯一標(biāo)識符。 2. @tparam T 廣播變量的數(shù)據(jù)類型。 3. abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging { 4. 5. @volatile private var _isValid = true 6. 7. private var _destroySite = "" 8. 9. /**獲得廣播值.*/ 10. def value: T = { 11. assertValid() 12. getValue() 13. } 14. ......
Spark 1.6版本的HttpBroadcast方式的Broadcast,最開始的時候數(shù)據(jù)放在Driver的本地文件系統(tǒng)中,Driver在本地會創(chuàng)建一個文件夾來存放Broadcast中的data,然后啟動HttpServer訪問文件夾中的數(shù)據(jù),同時寫入到BlockManager(StorageLevel是MEMORY_AND_DISK)中獲得BlockId(BroadcastBlockId),當(dāng)?shù)谝淮蜤xecutor中的Task要訪問Broadcast變量的時候,會向Driver通過HttpServer來訪問數(shù)據(jù),然后會在Executor中的BlockManager中注冊該Broadcast中的數(shù)據(jù)BlockManager,Task訪問Broadcast變量時,首先查詢BlockManager,如果BlockManager中已有此數(shù)據(jù),Task就可直接使用BlockManager中的數(shù)據(jù)(說明SPARK-12588,HTTPBroadcast方式在Spark 2.0版本中已經(jīng)去掉)。
10.1.2 Spark中Broadcast源碼詳解
BroadcastManager是用來管理Broadcast的,該實例對象是在SparkContext創(chuàng)建SparkEnv的時候創(chuàng)建的。
SparkEnv.scala的源碼如下。
1. val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) 2. 3. val mapOutputTracker = if (isDriver) { 4. new MapOutputTrackerMaster(conf, broadcastManager, isLocal) 5. } else { 6. new MapOutputTrackerWorker(conf) 7. }
BroadcastManager.scala中BroadcastManager實例化的時候會調(diào)用initialize方法,initialize方法就創(chuàng)建TorrentBroadcastFactory的方式。
BroadcastManager的源碼如下。
1. 2. private[spark] class BroadcastManager( 3. val isDriver: Boolean, 4. conf: SparkConf, 5. securityManager: SecurityManager) 6. extends Logging { 7. 8. private var initialized = false 9. private var broadcastFactory: BroadcastFactory = null 10. 11. initialize() 12. 13. //使用廣播前,被SparkContext或Executor調(diào)用 14. private def initialize() { 15. synchronized { 16. if (!initialized) { 17. broadcastFactory = new TorrentBroadcastFactory 18. broadcastFactory.initialize(isDriver, conf, securityManager) 19. initialized = true 20. } 21. } 22. }
Spark 2.0版本中的TorrentBroadcast方式:數(shù)據(jù)開始在Driver中,A節(jié)點如果使用了數(shù)據(jù),A就成為供應(yīng)源,這時Driver節(jié)點、A節(jié)點兩個節(jié)點成為供應(yīng)源,如第三個節(jié)點B訪問的時候,第三個節(jié)點B也成為了供應(yīng)源,同樣地,第四個節(jié)點、第五個節(jié)點……等都成為了供應(yīng)源,這些都被BlockManager管理,這樣不會導(dǎo)致一個節(jié)點壓力太大,從理論上講,數(shù)據(jù)使用的節(jié)點越多,網(wǎng)絡(luò)速度就越快。
TorrentBroadcast按照BLOCK_SIZE(默認(rèn)是4MB)將Broadcast中的數(shù)據(jù)劃分成為不同的Block,然后將分塊信息(也就是Meta信息)存放到Driver的BlockManager中,同時會告訴BlockManagerMaster,說明Meta信息存放完畢。
SparkContext.scala的broadcast方法的源碼如下。
1. def broadcast[T: ClassTag](value: T): Broadcast[T] = { 2. assertNotStopped() 3. require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), 4. "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") 5. val bc = env.broadcastManager.newBroadcast[T](value, isLocal) 6. val callSite = getCallSite 7. logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) 8. cleaner.foreach(_.registerBroadcastForCleanup(bc)) 9. bc 10. }
SparkContext.scala的broadcast方法中調(diào)用env.broadcastManager.newBroadcast。BroadcastManager.scala的newBroadcast方法如下。
1. def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { 2. broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId. getAndIncrement()) 3. }
newBroadcast方法調(diào)用new()函數(shù)創(chuàng)建一個Broadcast,第一個參數(shù)是Value,第三個參數(shù)是BroadcastId。這里,BroadcastFactory是一個trait,沒有具體的實現(xiàn)。
1. private[spark] trait BroadcastFactory { 2. ...... 3. def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T] 4. ...
TorrentBroadcastFactory是BroadcastFactory的具體實現(xiàn)。
1. private[spark] class TorrentBroadcastFactory extends BroadcastFactory { 2. ...... 3. override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = { 4. new TorrentBroadcast[T](value_, id) 5. }
BroadcastFactory的newBroadcast方法創(chuàng)建TorrentBroadcast實例。
Spark 2.1.1版本的TorrentBroadcast.scala的源碼如下。
1. private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) 2. extends Broadcast[T](id) with Logging with Serializable { 3. ...... 4. private def readBlocks(): Array[ChunkedByteBuffer] = { 5. //獲取數(shù)據(jù)塊。注意,所有這些塊存儲在BlockManager 且向driver匯報,其他 //Executors 也可以從這個Executors 中提取這些塊 6. val blocks = new Array[ChunkedByteBuffer](numBlocks) 7. val bm = SparkEnv.get.blockManager 8. 9. for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { 10. val pieceId = BroadcastBlockId(id, "piece" + pid) 11. logDebug(s"Reading piece $pieceId of $broadcastId") 12. //第一次嘗試getLocalBytes從本地讀?。阂驗橐郧霸噲D獲取廣播塊時已經(jīng)獲取了一些 //塊,在這種情況下,一些塊將在本地(在Executor上) 13. bm.getLocalBytes(pieceId) match { 14. case Some(block) => 15. blocks(pid) = block 16. releaseLock(pieceId) 17. case None => 18. bm.getRemoteBytes(pieceId) match { 19. case Some(b) => 20. if (checksumEnabled) { 21. val sum = calcChecksum(b.chunks(0)) 22. if (sum != checksums(pid)) { 23. throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + 24. s" $sum != ${checksums(pid)}") 25. } 26. } 27. //從遠程Executors/driver的BlockManager查找塊,所以把塊在 //Executor節(jié)點BlockManager 28. if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_ SER, tellMaster = true)) { 29. throw new SparkException( 30. s"Failed to store $pieceId of $broadcastId in local BlockManager") 31. } 32. blocks(pid) = b 33. case None => 34. throw new SparkException(s"Failed to get $pieceId of $broadcastId") 35. } 36. } 37. } 38. blocks 39. }
Spark 2.2.0版本的TorrentBroadcast.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第32行blocks(pid) = b調(diào)整為blocks(pid) = new ByteBufferBlockData(b, true) 。
1. ...... 2. blocks(pid) = new ByteBufferBlockData(b, true) 3. .......
TorrentBroadcast.scala的readBlocks方法中Random.shuffle(Seq.range(0, numBlocks)進行隨機洗牌,是因為數(shù)據(jù)有很多來源DataServer,為了保持負(fù)載均衡,因此使用shuffle。
TorrentBroadcast將元數(shù)據(jù)信息存放到BlockManager,然后匯報給BlockManagerMaster。數(shù)據(jù)存放到BlockManagerMaster中就變成了全局?jǐn)?shù)據(jù),BlockManagerMaster具有所有的信息,Driver、Executor就可以訪問這些內(nèi)容。Executor運行具體的TASK的時候,通過TorrentBroadcast的方式readBlocks,如果本地有數(shù)據(jù),就從本地讀取,如果本地沒有數(shù)據(jù),就從遠程讀取數(shù)據(jù)。Executor讀取信息以后,通過TorrentBroadcast的機制通知BlockManagerMaster數(shù)據(jù)多了一份副本,下一個Task讀取數(shù)據(jù)的時候,就有兩個選擇,分享的節(jié)點越多,下載的供應(yīng)源就越多,最終變成點到點的方式。
Broadcast可以廣播RDD,Join操作性能優(yōu)化之一也是采用Broadcast。
- 精通MATLAB神經(jīng)網(wǎng)絡(luò)
- Introduction to DevOps with Kubernetes
- 網(wǎng)上沖浪
- 圖形圖像處理(Photoshop)
- Mobile DevOps
- 機器學(xué)習(xí)與大數(shù)據(jù)技術(shù)
- 工業(yè)機器人入門實用教程(KUKA機器人)
- 城市道路交通主動控制技術(shù)
- TensorFlow Reinforcement Learning Quick Start Guide
- 嵌入式操作系統(tǒng)原理及應(yīng)用
- 精通數(shù)據(jù)科學(xué):從線性回歸到深度學(xué)習(xí)
- 人工智能:語言智能處理
- 啊哈C!思考快你一步
- Ansible 2 Cloud Automation Cookbook
- ADuC系列ARM器件應(yīng)用技術(shù)