官术网_书友最值得收藏!

6.2 Spark Application是如何向集群申請資源的

本節講解Application申請資源的兩種類型:第一種是盡可能在集群的所有Worker上分配Executor;第二種是運行在盡可能少的Worker上。本節講解Application申請資源的源碼內容,將徹底解密Spark Application是如何向集群申請資源的。

6.2.1 Application申請資源的兩種類型詳解

Master負責資源管理和調度。資源調度的方法schedule位于Master.scala類中,當注冊程序或者資源發生改變時,都會導致schedule的調用。Schedule調用的時機:每次有新的應用程序提交或者集群資源狀況發生改變時(包括Executor增加或者減少、Worker增加或者減少等)。

Spark默認為應用程序啟動Executor的方式是FIFO的方式,也就是所有提交的應用程序都放在調度的等待隊列中,先進先出,只有在滿足了前面應用程序的資源分配的基礎上,才能夠滿足下一個應用程序資源的分配;在FIFO的情況下,默認是spreadOutApps來讓應用程序盡可能多地運行在所有的Node上。為應用程序分配Executors有兩種方式:第一種方式是盡可能在集群的所有Worker上分配Executor,這種方式往往會帶來潛在的、更好的數據本地性;第二種方式是嘗試運行在盡可能少的Worker上。

為了更形象地描述Master的調度機制,下面通過圖6-1介紹抽象的資源調度框架。

圖6-1 Master中抽象的資源調度框架

其中,Worker1到WorkerN是集群中全部的Workers節點,調度時,會根據應用程序請求的資源信息,從全部Workers節點中過濾出資源足夠的節點,假設可以得到Worker1到WorkerM的節點。當前過濾的需求是內核數和內存大小足夠啟動一個Executor,因為Executor是集群執行應用程序的單位組件(注意:和任務(Task)不是同一個概念,對應的任務是在Executor中執行的)。

選出可用Workers之后,會根據內核大小進行排序,這可以理解成是一種基于可用內核排序的、簡單的負載均衡策略。然后根據設置的spreadOutApps參數,對應指定兩種資源分配策略。

(1)當spreadOutApps=true:使用輪流均攤的策略,也就是采用圓桌(round-robin)算法,圖中的虛線表示第一次輪流攤派的資源不足以滿足申請的需求,因此開始第二輪攤派,依次輪流均攤,直到符合資源需求。

(2)當spreadOutApps=false:使用依次全占策略,依次從可用Workers上獲取該Worker上可用的全部資源,直到符合資源需求。

對應圖中Worker內部的小方塊,在此表示分配的資源的抽象單位。對應資源的條件,理解的關鍵點在于資源是分配給Executor的,因此最終啟動Executor時,占用的資源必須滿足啟動所需的條件。

前面描述了Workers上的資源是如何分配給應用程序的,之后正式開始為Executor分配資源,并向Worker發送啟動Executor的命令了。根據申請時是否明確指定需要為每個Executor分配確定的內核個數,有:

(1)明確指定每個Executor需要分配的內核個數時:每次分配的是一個Executor所需的內核數和內存數,對應在某個Worker分配到的總的內核數可能是Executor的內核數的倍數,此時,該Worker節點上會啟動多個Executor,每個Executor需要指定的內核數和內存數(注意該Worker節點上分配到的總的內存大小)。

(2)未明確指定每個Executor需要分配的內核個數時:每次分配一個內核,最后所有在某Worker節點上分配到的內核都會放到一個Executor內(未明確指定內核個數,因此可以一起放入一個Executor)。因此,最終該應用程序在一個Worker上只有一個Executor(這里指的是針對一個應用程序,當該Worker節點上存在多個應用程序時,仍然會為每個應用程序分別啟動相應的Executor)。

在此強調、補充一下調度機制中使用的三個重要的配置屬性。

a.指定為所有Executors分配的總內核個數:在spark-submit腳本提交參數時進行配置。所有Executors分配的總內核個數的控制屬性在類SparkSubmitArguments的方法printUsageAndExit中。

1.  //指定為所有Executors分配的總內核個數
2.  | Spark standalone and Mesos only:
3.  |  --total-executor-cores NUM  Total cores for all executors.

b.指定需要為每個Executor分配的內核個數:在spark-submit腳本提交參數時進行配置。每個Executor分配的內核個數的控制屬性在類SparkSubmitArguments的方法printUsageAndExit中。

SparkSubmitArguments.scala的源碼如下。

1.  // 指定需要為每個Executor分配的內核個數
2.  || Spark standalone and YARN only:
3.  |  --executor-cores NUM        Number of cores per executor. (Default: 1
    in YARN mode,
4.    or all available cores on the worker in standalone mode)

c.資源分配策略:數據本地性(數據密集)與計算密集的控制屬性,對應的配置屬性在Master類中,代碼如下。

1.  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
    true)

6.2.2 Application申請資源的源碼詳解

1.任務調度與資源調度的區別

 任務調度是通過DAGScheduler、TaskScheduler、SchedulerBackend等進行的作業調度。

 資源調度是指應用程序如何獲得資源。

 任務調度是在資源調度的基礎上進行的,如果沒有資源調度,任務調度就成為無源之水,無本之木。

2.資源調度內幕

(1)因為Master負責資源管理和調度,所以資源調度的方法shedule位于Master.scala類中,注冊程序或者資源發生改變時都會導致schedule的調用,如注冊程序時:

1.  case RegisterApplication(description, driver) =>
2.  //待辦事項:防止重復注冊Driver
3.  if (state == RecoveryState.STANDBY) {
4.    //忽略,不要發送響應
5.  } else {
6.     logInfo("Registering app " + description.name)
7.     val app = createApplication(description, driver)
8.     registerApplication(app)
9.     logInfo("Registered app " + description.name + " with ID " + app.id)
10.    persistenceEngine.addApplication(app)
11.    driver.send(RegisteredApplication(app.id, self))
12.    schedule()
13.  }

(2)Schedule調用的時機:每次有新的應用程序提交或者集群資源狀況發生改變的時候(包括Executor增加或者減少、Worker增加或者減少等)。

進入schedule(),schedule為當前等待的應用程序分配可用的資源。每當一個新的應用程序進來時,schedule都會被調用。或者資源發生變化時(如Executor掛掉,Worker掛掉,或者新增加機器),schedule都會被調用。

(3)當前Master必須以ALIVE的方式進行資源調度,如果不是ALIVE的狀態,就會直接返回,也就是Standby Master不會進行Application的資源調用。

1.     if (state != RecoveryState.ALIVE) {
2.    return
3.  }

(4)接下來通過workers.toSeq.filter(_.state == WorkerState.ALIVE)過濾判斷所有Worker中哪些是ALIVE級別的Worker,ALIVE才能夠參與資源的分配工作。

1.  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
    == WorkerState.ALIVE))

(5)使用Random.shuffle把Master中保留的集群中所有ALIVE級別的Worker的信息隨機打亂;Master的schedule()方法中:workers是一個數據結構,打亂workers有利于負載均衡。例如,不是以固定的順序啟動launchDriver。WorkerInfo是Worker注冊時將信息注冊過來。

1.  val workers = new HashSet[WorkerInfo]
2.  .......
3.   val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
     == WorkerState.ALIVE))

WorkerInfo.scala的源碼如下。

1.  private[spark] class WorkerInfo(
2.     val id: String,
3.     val host: String,
4.     val port: Int,
5.     val cores: Int,
6.     val memory: Int,
7.     val endpoint: RpcEndpointRef,
8.     val webUiAddress: String)
9.   extends Serializable {

隨機打亂的算法:將Worker的信息傳進來,先調用new()函數創建一個ArrayBuffer,將所有的信息放進去。然后將兩個索引位置的內容進行交換。例如,如果有4個Worker,依次分別為第一個Worker至第四個Worker,第一個位置是第1個Worker,第2個位置是第2個Worker,第3個位置是第3個Worker,第4個位置是第4個Worker;通過Shuffle以后,現在第一個位置可能是第3個Worker,第2個位置可能是第1個Worker,第3個位置可能是第4個Worker,第4個位置可能是第2個Worker,位置信息打亂。

Random.scala中的shuffle方法,其算法內部是循環隨機交換所有Worker在Master緩存數據結構中的位置。

1.   def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf:
     CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
2.      val buf = new ArrayBuffer[T] ++= xs
3.
4.      def swap(i1: Int, i2: Int) {
5.        val tmp = buf(i1)
6.        buf(i1) = buf(i2)
7.        buf(i2) = tmp
8.      }
9.
10.     for (n <- buf.length to 2 by -1) {
11.       val k = nextInt(n)
12.       swap(n - 1, k)
13.     }
14.
15.     (bf(xs) ++= buf).result
16.   }

(6)Master的schedule()方法中:循環遍歷等待啟動的Driver,如果是Client模式,就不需要waitingDrivers等待;如果是Cluster模式,此時Driver會加入waitingDrivers等待列表。

當SparkSubmit指定Driver在Cluster模式的情況下,此時Driver會加入waitingDrivers等待列表中,在每個DriverInfo的DriverDescription中有要啟動Driver時對Worker的內存及Cores的要求等內容。

1.   private val waitingDrivers = new ArrayBuffer[DriverInfo]
2.  ......

DriverInfo包括啟動時間、ID、描述信息、提交時間等內容。

DriverInfo.scala的源碼如下。

1.  private[deploy] class DriverInfo(
2.     val startTime: Long,
3.     val id: String,
4.     val desc: DriverDescription,
5.     val submitDate: Date)
6.   extends Serializable {

其中,DriverInfo的DriverDescription描述信息中包括jarUrl、內存、Cores、supervise、command等內容。如果在Cluster模式中指定supervise為True,那么Driver掛掉時就會自動重啟。

DriverDescription.scala的源碼如下。

1.  private[deploy] case class DriverDescription(
2.   jarUrl: String,
3.   mem: Int,
4.   cores: Int,
5.   supervise: Boolean,
6.   command: Command) {

在符合資源要求的情況下,采用隨機打亂后的一個Worker來啟動Driver,worker是Master中對Worker的一個描述。

Master.scala的launchDriver方法如下。

1.      private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
2.    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
3.    worker.addDriver(driver)
4.    driver.worker = Some(worker)
5.    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
6.    driver.state = DriverState.RUNNING
7.  }

Master通過worker.endpoint.send(LaunchDriver)發指令給Worker,讓遠程的Worker啟動Driver,Driver啟動以后,Driver的狀態就變成DriverState.RUNNING。

(7)先啟動Driver,才會發生后續的一切資源調度的模式。

(8)Spark默認為應用程序啟動Executor的方式是FIFO方式,也就是所有提交的應用程序都是放在調度的等待隊列中的,先進先出,只有滿足了前面應用程序的資源分配的基礎,才能夠滿足下一個應用程序資源的分配。

Master的schedule()方法中,調用startExecutorsOnWorkers()為當前的程序調度和啟動Worker的Executor,默認情況下排隊的方式是FIFO。

startExecutorsOnWorkers的源碼如下。

1.   private def startExecutorsOnWorkers(): Unit = {
2.     //這是一個非常簡單的FIFO調度。我們嘗試在隊列中推入第一個應用程序,然后推入第二
       //個應用程序等
3.     for (app <- waitingApps if app.coresLeft > 0) {
4.       val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
5.       //篩選出workers,其沒有足夠資源來啟動Executor
6.       val usableWorkers = workers.toArray.filter(_.state == WorkerState
         .ALIVE)
7.         .filter(worker => worker.memoryFree >= app.desc
           .memoryPerExecutorMB &&
8.           worker.coresFree >= coresPerExecutor.getOrElse(1))
9.         .sortBy(_.coresFree).reverse
10.      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers,
         spreadOutApps)
11.
12.      //現在我們決定每個worker分配多少cores,進行分配
13.      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
14.        allocateWorkerResourceToExecutors(
15.          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
16.      }
17.    }
18.  }

(9)為應用程序具體分配Executor前要判斷應用程序是否還需要分配Core,如果不需要,則不會為應用程序分配Executor。

startExecutorsOnWorkers中的coresLeft是請求的requestedCores和可用的coresGranted的相減值。例如,如果整個程序要求1000個Cores,但是目前集群可用的只有100個Cores,如果coresLeft不為0,就放入等待隊列中;如果coresLeft是0,那么就不需要調度。

1.  private[master] def coresLeft: Int = requestedCores - coresGranted

(10)Master.scala的startExecutorsOnWorkers中,具體分配Executor之前,要求Worker必須是ALIVE的狀態且必須滿足Application對每個Executor的內存和Cores的要求,并且在此基礎上進行排序,產生計算資源由大到小的usableWorkers數據結構。

1.        val usableWorkers = workers.toArray.filter(_.state == WorkerState
          .ALIVE)
2.          .filter(worker => worker.memoryFree >= app.desc
            .memoryPerExecutorMB &&
3.            worker.coresFree >= coresPerExecutor.getOrElse(1))
4.          .sortBy(_.coresFree).reverse
5.  val   assignedCores     =   scheduleExecutorsOnWorkers(app,         usableWorkers,
    spreadOutApps)

然后調用scheduleExecutorsOnWorkers,在FIFO的情況下,默認spreadOutApps讓應用程序盡可能多地運行在所有的Node上。

1.  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
    true)

scheduleExecutorsOnWorker中,minCoresPerExecutor表示每個Executor最小分配的core個數。scheduleExecutorsOnWorker的源碼如下。

1.     private def scheduleExecutorsOnWorkers(
2.        app: ApplicationInfo,
3.        usableWorkers: Array[WorkerInfo],
4.        spreadOutApps: Boolean): Array[Int] = {
5.      val coresPerExecutor = app.desc.coresPerExecutor
6.      val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
7.      val oneExecutorPerWorker = coresPerExecutor.isEmpty
8.      val memoryPerExecutor = app.desc.memoryPerExecutorMB
9.      val numUsable = usableWorkers.length
10.     val assignedCores = new Array[Int](numUsable)
11.     val assignedExecutors = new Array[Int](numUsable)
12.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map
        (_.coresFree).sum)
13. ......

(11)為應用程序分配Executors有兩種方式:第一種方式是盡可能在集群的所有Worker上分配Executor,這種方式往往會帶來潛在的、更好的數據本地性;第二種方式是嘗試運行在盡可能少的Worker上。

(12)具體在集群上分配Cores時會盡可能地滿足我們的要求。math.min用于計算最小值。coresToAssig用于計算app.coresLeft與可用的Worker中可用的Cores的和的最小值。例如,應用程序要求1000個Cores,但整個集群中只有100個Cores,所以只能先分配100個Cores。

scheduleExecutorsOnWorkers方法如下。

1.       var coresToAssign = math.min(app.coresLeft, usableWorkers.map
         (_.coresFree).sum)
2.  ......

(13)如果每個Worker下面只能為當前的應用程序分配一個Executor,那么每次只分配一個Core。scheduleExecutorsOnWorkers方法如下。

1.  if (oneExecutorPerWorker) {
2.       assignedExecutors(pos) = 1
3.  } else {
4.    assignedExecutors(pos) += 1
5.  }

總結為兩種情況:一種情況是盡可能在一臺機器上運行程序的所有功能;另一種情況是盡可能在所有節點上運行程序的所有功能。無論是哪種情況,每次給Executor增加Cores,是增加一個,如果是spreadOutApps的方式,循環一輪再下一輪。例如,有4個Worker,第一次為每個Executor啟動一個線程,第二次循環分配一個線程,第三次循環再分配一個線程……

scheduleExecutorsOnWorkers方法如下。

1.          while (freeWorkers.nonEmpty) {
2.   freeWorkers.foreach { pos =>
3.     var keepScheduling = true
4.     while (keepScheduling && canLaunchExecutor(pos)) {
5.       coresToAssign -= minCoresPerExecutor
6.       assignedCores(pos) += minCoresPerExecutor
7.
8.       //如果每個worker上啟動一個Executor,那么每次迭代在Executor 上分配一
         //個核,否則,每次迭代都將把內核分配給一個新的Executor
9.       if (oneExecutorPerWorker) {
10.        assignedExecutors(pos) = 1
11.      } else {
12.        assignedExecutors(pos) += 1
13.      }
14.
15.      //展開應用程序意味著將Executors展開到盡可能多的workers節點。如果不展
         //開,將對這個workers的Executors進行調度,直到使用它的全部資源。否則,
         //只是移動到下一個worker節點
16.      if (spreadOutApps) {
17.        keepScheduling = false
18.      }
19.    }
20.  }

回到Master.scala的startExecutorsOnWorkers,現在已經決定為每個worker分配多少個cores,然后進行資源分配。

1.        for (pos <- 0 until usableWorkers.length if assignedCores(pos)
          > 0) {
2.    allocateWorkerResourceToExecutors(
3.      app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
4.  }

allocateWorkerResourceToExecutors的源碼如下。

1.     private def allocateWorkerResourceToExecutors(
2.      app: ApplicationInfo,
3.      assignedCores: Int,
4.      coresPerExecutor: Option[Int],
5.      worker: WorkerInfo): Unit = {
6.  //如果指定了每個Executor的內核數,我們就將分配的內核無剩余地均分給worker節點的
    //Executors。否則,我們啟動一個單一的       Executor,抓住這個    worker  節點所有的
    //assignedCores
7.    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
8.    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
9.    for (i <- 1 to numExecutors) {
10.      val exec = app.addExecutor(worker, coresToAssign)
11.      launchExecutor(worker, exec)
12.      app.state = ApplicationState.RUNNING
13.    }
14.  }

allocateWorkerResourceToExecutors中的app.addExecutor增加一個Executor,記錄Executor的相關信息。

1.     private[master] def addExecutor(
2.      worker: WorkerInfo,
3.      cores: Int,
4.      useID: Option[Int] = None): ExecutorDesc = {
5.    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores,
      desc.memoryPerExecutorMB)
6.    executors(exec.id) = exec
7.    coresGranted += cores
8.    exec
9.  }

回到allocateWorkerResourceToExecutors方法中,launchExecutor(worker, exec)啟動Executor。

1.  launchExecutor(worker, exec)

(14)準備具體要為當前應用程序分配的Executor信息后,Master要通過遠程通信發指令給Worker來具體啟動ExecutorBackend進程。

launchExecutor方法如下。

1.  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc):
    Unit = {
2.    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.    worker.addExecutor(exec)
4.    worker.endpoint.send(LaunchExecutor(masterUrl,
5.      exec.application.id, exec.id, exec.application.desc, exec.cores,
        exec.memory))
6.  ......

(15)緊接著給應用程序的Driver發送一個ExecutorAdded的信息。

launchExecutor方法如下。

1.    exec.application.driver.send(
2.      ExecutorAdded(exec.id,       worker.id,  worker.hostPort,  exec.cores,
        exec.memory))
3.  }
主站蜘蛛池模板: 隆化县| 莫力| 东山县| 凤翔县| 金堂县| 山东省| 微山县| 清涧县| 兴业县| 龙里县| 郯城县| 綦江县| 沂源县| 威信县| 闽清县| 红原县| 渭源县| 泰兴市| 武隆县| 温州市| 班玛县| 肥东县| 龙门县| 乐都县| 定结县| 蓝田县| 肥城市| 满洲里市| 时尚| 连南| 札达县| 中山市| 嘉祥县| 紫金县| 大埔区| 望谟县| 河南省| 杭锦旗| 昭觉县| 荥阳市| 绥江县|