官术网_书友最值得收藏!

10.1 Spark中Broadcast原理和源碼詳解

本節(jié)講解Spark中Broadcast原理及Spark中Broadcast源碼。

10.1.1 Spark中Broadcast原理詳解

Broadcast在機器學(xué)習(xí)、圖計算、構(gòu)建日常的各種算法中到處可見。 Broadcast將數(shù)據(jù)從一個節(jié)點發(fā)送到其他節(jié)點上;例如,Driver上有一張表,而Executor中的每個并行執(zhí)行的Task (100萬個Task)都要查詢這張表,那我們通過Broadcast的方式只需要往每個Executor發(fā)送一次這張表就行了,Executor中的每個運行的Task查詢這張唯一的表,而不是每次執(zhí)行的時候都從Driver獲得這張表。

Java中的Servlet里有一個ServletContext,是JSP或Java代碼運行時的上下文,通過上下文可以獲取各種資源。Broadcast類似于ServletContext中的資源、變量或數(shù)據(jù),Broadcast廣播出去是基于Executor的,里面的每個任務(wù)可以用上下文,Task的上下文就是Executor,可以抓取數(shù)據(jù)。這就好像ServletContext的具體作用,只是Broadcast是分布式的共享數(shù)據(jù),默認(rèn)情況下,只要程序在運行,Broadcast變量就會存在,因為Broadcast在底層是通過BlockManager管理的。但是,你可以手動指定或者配置具體周期來銷毀Broadcast變量??梢灾付˙roadcast的unpersist銷毀Broadcast變量,因為Spark應(yīng)用程序中可能運行很多Job,可能一個Job需要很多Broadcast變量,但下一個Job不需要這些變量,但是應(yīng)用程序還存在,因此需手工銷毀Broadcast變量。

Broadcast一般用于處理共享配置文件、通用的Dataset、常用的數(shù)據(jù)結(jié)構(gòu)等;但是在Broadcast中不適合存放太大的數(shù)據(jù),Broadcast不會內(nèi)存溢出,因為其數(shù)據(jù)的保存的StorageLevel是MEMORY_AND_DISK的方式;雖然如此,我們也不可以放入太大的數(shù)據(jù)在Broadcast中,因為網(wǎng)絡(luò)I/O和可能的單點壓力會非常大?。⊿park 1.6版本Broadcast有兩種方式:HttpBroadcast、TorrentBroadcast。HttpBroadcast可能有單點壓力; TorrentBroadcast下載沒有單點壓力,但可能有網(wǎng)絡(luò)壓力)Spark 2.0版本中已經(jīng)去掉HTTPBroadcast (SPARK-12588)了,Spark 2.0版本的TorrentBroadcast是Broadcast唯一的廣播實現(xiàn)方式。

廣播Broadcast變量是只讀變量,如果Broadcast不是只讀變量而可以更新,那帶來的問題是:①一個節(jié)點上Broadcast可以更新,其他的節(jié)點Broadcast也要更新;②如果多個節(jié)點Broadcast同時更新,如何確定更新的順序,以及容錯等內(nèi)容。因此,廣播Broadcast變量是只讀變量,最為輕松保持了數(shù)據(jù)的一致性!

Broadcast廣播變量是只讀變量,緩存在每個節(jié)點上,而不是每個Task去獲取它的一份復(fù)制副本。例如,以高效的方式給每個節(jié)點發(fā)送一個dataset的副本。Spark嘗試在分布式發(fā)送廣播變量時使用高效的廣播算法減少通信的成本。

廣播變量是由一個變量V通過調(diào)用[org.apache.spark.SparkContext#broadcast]創(chuàng)建的。廣播變量是一個圍繞V的包裝器,它的值可以通過調(diào)用value方法來獲取。例如:

1.  scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
2.  broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)
3.
4.  scala> broadcastVar.value
5.  res0: Array[Int] = Array(1, 2, 3)

如果要更新廣播變量,只有再廣播一次,那就是一個新的廣播變量,使用一個新的廣播變量ID。

廣播變量創(chuàng)建后,在群集上運行時,V變量不是在任何函數(shù)都使用,以便V傳送到節(jié)點時不止一次。此外,對象V不應(yīng)該被修改,是為了確保廣播所有節(jié)點得到相同的廣播變量值(例如,如果變量被發(fā)送到后來的一個新節(jié)點)。

Broadcast的源碼如下。

1.   @param id  廣播變量的唯一標(biāo)識符。
2.   @tparam T   廣播變量的數(shù)據(jù)類型。
3.   abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable
     with Logging {
4.
5.    @volatile private var _isValid = true
6.
7.    private var _destroySite = ""
8.
9.    /**獲得廣播值.*/
10.   def value: T = {
11.     assertValid()
12.     getValue()
13.   }
14. ......

Spark 1.6版本的HttpBroadcast方式的Broadcast,最開始的時候數(shù)據(jù)放在Driver的本地文件系統(tǒng)中,Driver在本地會創(chuàng)建一個文件夾來存放Broadcast中的data,然后啟動HttpServer訪問文件夾中的數(shù)據(jù),同時寫入到BlockManager(StorageLevel是MEMORY_AND_DISK)中獲得BlockId(BroadcastBlockId),當(dāng)?shù)谝淮蜤xecutor中的Task要訪問Broadcast變量的時候,會向Driver通過HttpServer來訪問數(shù)據(jù),然后會在Executor中的BlockManager中注冊該Broadcast中的數(shù)據(jù)BlockManager,Task訪問Broadcast變量時,首先查詢BlockManager,如果BlockManager中已有此數(shù)據(jù),Task就可直接使用BlockManager中的數(shù)據(jù)(說明SPARK-12588,HTTPBroadcast方式在Spark 2.0版本中已經(jīng)去掉)。

10.1.2 Spark中Broadcast源碼詳解

BroadcastManager是用來管理Broadcast的,該實例對象是在SparkContext創(chuàng)建SparkEnv的時候創(chuàng)建的。

SparkEnv.scala的源碼如下。

1.  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
2.
3.   val mapOutputTracker = if (isDriver) {
4.     new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
5.   } else {
6.     new MapOutputTrackerWorker(conf)
7.   }

BroadcastManager.scala中BroadcastManager實例化的時候會調(diào)用initialize方法,initialize方法就創(chuàng)建TorrentBroadcastFactory的方式。

BroadcastManager的源碼如下。

1.
2.  private[spark] class BroadcastManager(
3.      val isDriver: Boolean,
4.      conf: SparkConf,
5.      securityManager: SecurityManager)
6.    extends Logging {
7.
8.    private var initialized = false
9.    private var broadcastFactory: BroadcastFactory = null
10.
11.   initialize()
12.
13.   //使用廣播前,被SparkContext或Executor調(diào)用
14.   private def initialize() {
15.     synchronized {
16.       if (!initialized) {
17.         broadcastFactory = new TorrentBroadcastFactory
18.         broadcastFactory.initialize(isDriver, conf, securityManager)
19.         initialized = true
20.       }
21.     }
22.   }

Spark 2.0版本中的TorrentBroadcast方式:數(shù)據(jù)開始在Driver中,A節(jié)點如果使用了數(shù)據(jù),A就成為供應(yīng)源,這時Driver節(jié)點、A節(jié)點兩個節(jié)點成為供應(yīng)源,如第三個節(jié)點B訪問的時候,第三個節(jié)點B也成為了供應(yīng)源,同樣地,第四個節(jié)點、第五個節(jié)點……等都成為了供應(yīng)源,這些都被BlockManager管理,這樣不會導(dǎo)致一個節(jié)點壓力太大,從理論上講,數(shù)據(jù)使用的節(jié)點越多,網(wǎng)絡(luò)速度就越快。

TorrentBroadcast按照BLOCK_SIZE(默認(rèn)是4MB)將Broadcast中的數(shù)據(jù)劃分成為不同的Block,然后將分塊信息(也就是Meta信息)存放到Driver的BlockManager中,同時會告訴BlockManagerMaster,說明Meta信息存放完畢。

SparkContext.scala的broadcast方法的源碼如下。

1.        def broadcast[T: ClassTag](value: T): Broadcast[T] = {
2.     assertNotStopped()
3.     require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
4.       "Can not directly broadcast RDDs; instead, call collect() and
          broadcast the result.")
5.     val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
6.     val callSite = getCallSite
7.     logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
8.     cleaner.foreach(_.registerBroadcastForCleanup(bc))
9.     bc
10.  }

SparkContext.scala的broadcast方法中調(diào)用env.broadcastManager.newBroadcast。BroadcastManager.scala的newBroadcast方法如下。

1.  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T]
     = {
2.     broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.
       getAndIncrement())
3.   }

newBroadcast方法調(diào)用new()函數(shù)創(chuàng)建一個Broadcast,第一個參數(shù)是Value,第三個參數(shù)是BroadcastId。這里,BroadcastFactory是一個trait,沒有具體的實現(xiàn)。

1.    private[spark] trait BroadcastFactory {
2.  ......
3.  def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long):
    Broadcast[T]
4.  ...

TorrentBroadcastFactory是BroadcastFactory的具體實現(xiàn)。

1.  private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
2.  ......
3.  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id:
    Long): Broadcast[T] = {
4.      new TorrentBroadcast[T](value_, id)
5.    }

BroadcastFactory的newBroadcast方法創(chuàng)建TorrentBroadcast實例。

Spark 2.1.1版本的TorrentBroadcast.scala的源碼如下。

1.   private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
2.    extends Broadcast[T](id) with Logging with Serializable {
3.  ......
4.  private def readBlocks(): Array[ChunkedByteBuffer] = {
5.      //獲取數(shù)據(jù)塊。注意,所有這些塊存儲在BlockManager 且向driver匯報,其他
        //Executors 也可以從這個Executors 中提取這些塊
6.      val blocks = new Array[ChunkedByteBuffer](numBlocks)
7.      val bm = SparkEnv.get.blockManager
8.
9.      for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
10.       val pieceId = BroadcastBlockId(id, "piece" + pid)
11.       logDebug(s"Reading piece $pieceId of $broadcastId")
12.       //第一次嘗試getLocalBytes從本地讀?。阂驗橐郧霸噲D獲取廣播塊時已經(jīng)獲取了一些
          //塊,在這種情況下,一些塊將在本地(在Executor上)
13.       bm.getLocalBytes(pieceId) match {
14.         case Some(block) =>
15.          blocks(pid) = block
16.          releaseLock(pieceId)
17.        case None =>
18.          bm.getRemoteBytes(pieceId) match {
19.            case Some(b) =>
20.              if (checksumEnabled) {
21.                val sum = calcChecksum(b.chunks(0))
22.                if (sum != checksums(pid)) {
23.                  throw new SparkException(s"corrupt remote block $pieceId
                     of $broadcastId:" +
24.                    s" $sum != ${checksums(pid)}")
25.                }
26.              }
27.              //從遠程Executors/driver的BlockManager查找塊,所以把塊在
                 //Executor節(jié)點BlockManager
28.              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_
                  SER, tellMaster = true)) {
29.                throw new SparkException(
30.                  s"Failed to store $pieceId of $broadcastId in local
                     BlockManager")
31.              }
32.              blocks(pid) = b
33.            case None =>
34.              throw new SparkException(s"Failed to get $pieceId of
                 $broadcastId")
35.          }
36.      }
37.    }
38.    blocks
39.  }

Spark 2.2.0版本的TorrentBroadcast.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第32行blocks(pid) = b調(diào)整為blocks(pid) = new ByteBufferBlockData(b, true) 。

1.     ......
2.      blocks(pid) = new ByteBufferBlockData(b, true)
3.  .......

TorrentBroadcast.scala的readBlocks方法中Random.shuffle(Seq.range(0, numBlocks)進行隨機洗牌,是因為數(shù)據(jù)有很多來源DataServer,為了保持負(fù)載均衡,因此使用shuffle。

TorrentBroadcast將元數(shù)據(jù)信息存放到BlockManager,然后匯報給BlockManagerMaster。數(shù)據(jù)存放到BlockManagerMaster中就變成了全局?jǐn)?shù)據(jù),BlockManagerMaster具有所有的信息,Driver、Executor就可以訪問這些內(nèi)容。Executor運行具體的TASK的時候,通過TorrentBroadcast的方式readBlocks,如果本地有數(shù)據(jù),就從本地讀取,如果本地沒有數(shù)據(jù),就從遠程讀取數(shù)據(jù)。Executor讀取信息以后,通過TorrentBroadcast的機制通知BlockManagerMaster數(shù)據(jù)多了一份副本,下一個Task讀取數(shù)據(jù)的時候,就有兩個選擇,分享的節(jié)點越多,下載的供應(yīng)源就越多,最終變成點到點的方式。

Broadcast可以廣播RDD,Join操作性能優(yōu)化之一也是采用Broadcast。

主站蜘蛛池模板: 海城市| 巴中市| 汉中市| 澄城县| 新余市| 高邑县| 井陉县| 织金县| 新晃| 望江县| 静安区| 镇安县| 佛坪县| 淮南市| 崇左市| 丰镇市| 广昌县| 巩留县| 塔城市| 黄浦区| 陵水| 新郑市| 东乌珠穆沁旗| 临颍县| 刚察县| 新乡县| 普陀区| 田阳县| 阜新市| 南澳县| 长葛市| 商洛市| 阿拉善左旗| 武隆县| 会理县| 鸡东县| 微山县| 东台市| 开远市| 启东市| 牡丹江市|