- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 3749字
- 2019-12-12 17:29:59
6.2 Spark Application是如何向集群申請資源的
本節講解Application申請資源的兩種類型:第一種是盡可能在集群的所有Worker上分配Executor;第二種是運行在盡可能少的Worker上。本節講解Application申請資源的源碼內容,將徹底解密Spark Application是如何向集群申請資源的。
6.2.1 Application申請資源的兩種類型詳解
Master負責資源管理和調度。資源調度的方法schedule位于Master.scala類中,當注冊程序或者資源發生改變時,都會導致schedule的調用。Schedule調用的時機:每次有新的應用程序提交或者集群資源狀況發生改變時(包括Executor增加或者減少、Worker增加或者減少等)。
Spark默認為應用程序啟動Executor的方式是FIFO的方式,也就是所有提交的應用程序都放在調度的等待隊列中,先進先出,只有在滿足了前面應用程序的資源分配的基礎上,才能夠滿足下一個應用程序資源的分配;在FIFO的情況下,默認是spreadOutApps來讓應用程序盡可能多地運行在所有的Node上。為應用程序分配Executors有兩種方式:第一種方式是盡可能在集群的所有Worker上分配Executor,這種方式往往會帶來潛在的、更好的數據本地性;第二種方式是嘗試運行在盡可能少的Worker上。
為了更形象地描述Master的調度機制,下面通過圖6-1介紹抽象的資源調度框架。

圖6-1 Master中抽象的資源調度框架
其中,Worker1到WorkerN是集群中全部的Workers節點,調度時,會根據應用程序請求的資源信息,從全部Workers節點中過濾出資源足夠的節點,假設可以得到Worker1到WorkerM的節點。當前過濾的需求是內核數和內存大小足夠啟動一個Executor,因為Executor是集群執行應用程序的單位組件(注意:和任務(Task)不是同一個概念,對應的任務是在Executor中執行的)。
選出可用Workers之后,會根據內核大小進行排序,這可以理解成是一種基于可用內核排序的、簡單的負載均衡策略。然后根據設置的spreadOutApps參數,對應指定兩種資源分配策略。
(1)當spreadOutApps=true:使用輪流均攤的策略,也就是采用圓桌(round-robin)算法,圖中的虛線表示第一次輪流攤派的資源不足以滿足申請的需求,因此開始第二輪攤派,依次輪流均攤,直到符合資源需求。
(2)當spreadOutApps=false:使用依次全占策略,依次從可用Workers上獲取該Worker上可用的全部資源,直到符合資源需求。
對應圖中Worker內部的小方塊,在此表示分配的資源的抽象單位。對應資源的條件,理解的關鍵點在于資源是分配給Executor的,因此最終啟動Executor時,占用的資源必須滿足啟動所需的條件。
前面描述了Workers上的資源是如何分配給應用程序的,之后正式開始為Executor分配資源,并向Worker發送啟動Executor的命令了。根據申請時是否明確指定需要為每個Executor分配確定的內核個數,有:
(1)明確指定每個Executor需要分配的內核個數時:每次分配的是一個Executor所需的內核數和內存數,對應在某個Worker分配到的總的內核數可能是Executor的內核數的倍數,此時,該Worker節點上會啟動多個Executor,每個Executor需要指定的內核數和內存數(注意該Worker節點上分配到的總的內存大小)。
(2)未明確指定每個Executor需要分配的內核個數時:每次分配一個內核,最后所有在某Worker節點上分配到的內核都會放到一個Executor內(未明確指定內核個數,因此可以一起放入一個Executor)。因此,最終該應用程序在一個Worker上只有一個Executor(這里指的是針對一個應用程序,當該Worker節點上存在多個應用程序時,仍然會為每個應用程序分別啟動相應的Executor)。
在此強調、補充一下調度機制中使用的三個重要的配置屬性。
a.指定為所有Executors分配的總內核個數:在spark-submit腳本提交參數時進行配置。所有Executors分配的總內核個數的控制屬性在類SparkSubmitArguments的方法printUsageAndExit中。
1. //指定為所有Executors分配的總內核個數 2. | Spark standalone and Mesos only: 3. | --total-executor-cores NUM Total cores for all executors.
b.指定需要為每個Executor分配的內核個數:在spark-submit腳本提交參數時進行配置。每個Executor分配的內核個數的控制屬性在類SparkSubmitArguments的方法printUsageAndExit中。
SparkSubmitArguments.scala的源碼如下。
1. // 指定需要為每個Executor分配的內核個數 2. || Spark standalone and YARN only: 3. | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, 4. or all available cores on the worker in standalone mode)
c.資源分配策略:數據本地性(數據密集)與計算密集的控制屬性,對應的配置屬性在Master類中,代碼如下。
1. private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
6.2.2 Application申請資源的源碼詳解
1.任務調度與資源調度的區別
任務調度是通過DAGScheduler、TaskScheduler、SchedulerBackend等進行的作業調度。
資源調度是指應用程序如何獲得資源。
任務調度是在資源調度的基礎上進行的,如果沒有資源調度,任務調度就成為無源之水,無本之木。
2.資源調度內幕
(1)因為Master負責資源管理和調度,所以資源調度的方法shedule位于Master.scala類中,注冊程序或者資源發生改變時都會導致schedule的調用,如注冊程序時:
1. case RegisterApplication(description, driver) => 2. //待辦事項:防止重復注冊Driver 3. if (state == RecoveryState.STANDBY) { 4. //忽略,不要發送響應 5. } else { 6. logInfo("Registering app " + description.name) 7. val app = createApplication(description, driver) 8. registerApplication(app) 9. logInfo("Registered app " + description.name + " with ID " + app.id) 10. persistenceEngine.addApplication(app) 11. driver.send(RegisteredApplication(app.id, self)) 12. schedule() 13. }
(2)Schedule調用的時機:每次有新的應用程序提交或者集群資源狀況發生改變的時候(包括Executor增加或者減少、Worker增加或者減少等)。
進入schedule(),schedule為當前等待的應用程序分配可用的資源。每當一個新的應用程序進來時,schedule都會被調用。或者資源發生變化時(如Executor掛掉,Worker掛掉,或者新增加機器),schedule都會被調用。
(3)當前Master必須以ALIVE的方式進行資源調度,如果不是ALIVE的狀態,就會直接返回,也就是Standby Master不會進行Application的資源調用。
1. if (state != RecoveryState.ALIVE) { 2. return 3. }
(4)接下來通過workers.toSeq.filter(_.state == WorkerState.ALIVE)過濾判斷所有Worker中哪些是ALIVE級別的Worker,ALIVE才能夠參與資源的分配工作。
1. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
(5)使用Random.shuffle把Master中保留的集群中所有ALIVE級別的Worker的信息隨機打亂;Master的schedule()方法中:workers是一個數據結構,打亂workers有利于負載均衡。例如,不是以固定的順序啟動launchDriver。WorkerInfo是Worker注冊時將信息注冊過來。
1. val workers = new HashSet[WorkerInfo] 2. ....... 3. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
WorkerInfo.scala的源碼如下。
1. private[spark] class WorkerInfo( 2. val id: String, 3. val host: String, 4. val port: Int, 5. val cores: Int, 6. val memory: Int, 7. val endpoint: RpcEndpointRef, 8. val webUiAddress: String) 9. extends Serializable {
隨機打亂的算法:將Worker的信息傳進來,先調用new()函數創建一個ArrayBuffer,將所有的信息放進去。然后將兩個索引位置的內容進行交換。例如,如果有4個Worker,依次分別為第一個Worker至第四個Worker,第一個位置是第1個Worker,第2個位置是第2個Worker,第3個位置是第3個Worker,第4個位置是第4個Worker;通過Shuffle以后,現在第一個位置可能是第3個Worker,第2個位置可能是第1個Worker,第3個位置可能是第4個Worker,第4個位置可能是第2個Worker,位置信息打亂。
Random.scala中的shuffle方法,其算法內部是循環隨機交換所有Worker在Master緩存數據結構中的位置。
1. def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = { 2. val buf = new ArrayBuffer[T] ++= xs 3. 4. def swap(i1: Int, i2: Int) { 5. val tmp = buf(i1) 6. buf(i1) = buf(i2) 7. buf(i2) = tmp 8. } 9. 10. for (n <- buf.length to 2 by -1) { 11. val k = nextInt(n) 12. swap(n - 1, k) 13. } 14. 15. (bf(xs) ++= buf).result 16. }
(6)Master的schedule()方法中:循環遍歷等待啟動的Driver,如果是Client模式,就不需要waitingDrivers等待;如果是Cluster模式,此時Driver會加入waitingDrivers等待列表。
當SparkSubmit指定Driver在Cluster模式的情況下,此時Driver會加入waitingDrivers等待列表中,在每個DriverInfo的DriverDescription中有要啟動Driver時對Worker的內存及Cores的要求等內容。
1. private val waitingDrivers = new ArrayBuffer[DriverInfo] 2. ......
DriverInfo包括啟動時間、ID、描述信息、提交時間等內容。
DriverInfo.scala的源碼如下。
1. private[deploy] class DriverInfo( 2. val startTime: Long, 3. val id: String, 4. val desc: DriverDescription, 5. val submitDate: Date) 6. extends Serializable {
其中,DriverInfo的DriverDescription描述信息中包括jarUrl、內存、Cores、supervise、command等內容。如果在Cluster模式中指定supervise為True,那么Driver掛掉時就會自動重啟。
DriverDescription.scala的源碼如下。
1. private[deploy] case class DriverDescription( 2. jarUrl: String, 3. mem: Int, 4. cores: Int, 5. supervise: Boolean, 6. command: Command) {
在符合資源要求的情況下,采用隨機打亂后的一個Worker來啟動Driver,worker是Master中對Worker的一個描述。
Master.scala的launchDriver方法如下。
1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { 2. logInfo("Launching driver " + driver.id + " on worker " + worker.id) 3. worker.addDriver(driver) 4. driver.worker = Some(worker) 5. worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) 6. driver.state = DriverState.RUNNING 7. }
Master通過worker.endpoint.send(LaunchDriver)發指令給Worker,讓遠程的Worker啟動Driver,Driver啟動以后,Driver的狀態就變成DriverState.RUNNING。
(7)先啟動Driver,才會發生后續的一切資源調度的模式。
(8)Spark默認為應用程序啟動Executor的方式是FIFO方式,也就是所有提交的應用程序都是放在調度的等待隊列中的,先進先出,只有滿足了前面應用程序的資源分配的基礎,才能夠滿足下一個應用程序資源的分配。
Master的schedule()方法中,調用startExecutorsOnWorkers()為當前的程序調度和啟動Worker的Executor,默認情況下排隊的方式是FIFO。
startExecutorsOnWorkers的源碼如下。
1. private def startExecutorsOnWorkers(): Unit = { 2. //這是一個非常簡單的FIFO調度。我們嘗試在隊列中推入第一個應用程序,然后推入第二 //個應用程序等 3. for (app <- waitingApps if app.coresLeft > 0) { 4. val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor 5. //篩選出workers,其沒有足夠資源來啟動Executor 6. val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE) 7. .filter(worker => worker.memoryFree >= app.desc .memoryPerExecutorMB && 8. worker.coresFree >= coresPerExecutor.getOrElse(1)) 9. .sortBy(_.coresFree).reverse 10. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 11. 12. //現在我們決定每個worker分配多少cores,進行分配 13. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 14. allocateWorkerResourceToExecutors( 15. app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 16. } 17. } 18. }
(9)為應用程序具體分配Executor前要判斷應用程序是否還需要分配Core,如果不需要,則不會為應用程序分配Executor。
startExecutorsOnWorkers中的coresLeft是請求的requestedCores和可用的coresGranted的相減值。例如,如果整個程序要求1000個Cores,但是目前集群可用的只有100個Cores,如果coresLeft不為0,就放入等待隊列中;如果coresLeft是0,那么就不需要調度。
1. private[master] def coresLeft: Int = requestedCores - coresGranted
(10)Master.scala的startExecutorsOnWorkers中,具體分配Executor之前,要求Worker必須是ALIVE的狀態且必須滿足Application對每個Executor的內存和Cores的要求,并且在此基礎上進行排序,產生計算資源由大到小的usableWorkers數據結構。
1. val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE) 2. .filter(worker => worker.memoryFree >= app.desc .memoryPerExecutorMB && 3. worker.coresFree >= coresPerExecutor.getOrElse(1)) 4. .sortBy(_.coresFree).reverse 5. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
然后調用scheduleExecutorsOnWorkers,在FIFO的情況下,默認spreadOutApps讓應用程序盡可能多地運行在所有的Node上。
1. private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
scheduleExecutorsOnWorker中,minCoresPerExecutor表示每個Executor最小分配的core個數。scheduleExecutorsOnWorker的源碼如下。
1. private def scheduleExecutorsOnWorkers( 2. app: ApplicationInfo, 3. usableWorkers: Array[WorkerInfo], 4. spreadOutApps: Boolean): Array[Int] = { 5. val coresPerExecutor = app.desc.coresPerExecutor 6. val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 7. val oneExecutorPerWorker = coresPerExecutor.isEmpty 8. val memoryPerExecutor = app.desc.memoryPerExecutorMB 9. val numUsable = usableWorkers.length 10. val assignedCores = new Array[Int](numUsable) 11. val assignedExecutors = new Array[Int](numUsable) 12. var coresToAssign = math.min(app.coresLeft, usableWorkers.map (_.coresFree).sum) 13. ......
(11)為應用程序分配Executors有兩種方式:第一種方式是盡可能在集群的所有Worker上分配Executor,這種方式往往會帶來潛在的、更好的數據本地性;第二種方式是嘗試運行在盡可能少的Worker上。
(12)具體在集群上分配Cores時會盡可能地滿足我們的要求。math.min用于計算最小值。coresToAssig用于計算app.coresLeft與可用的Worker中可用的Cores的和的最小值。例如,應用程序要求1000個Cores,但整個集群中只有100個Cores,所以只能先分配100個Cores。
scheduleExecutorsOnWorkers方法如下。
1. var coresToAssign = math.min(app.coresLeft, usableWorkers.map (_.coresFree).sum) 2. ......
(13)如果每個Worker下面只能為當前的應用程序分配一個Executor,那么每次只分配一個Core。scheduleExecutorsOnWorkers方法如下。
1. if (oneExecutorPerWorker) { 2. assignedExecutors(pos) = 1 3. } else { 4. assignedExecutors(pos) += 1 5. }
總結為兩種情況:一種情況是盡可能在一臺機器上運行程序的所有功能;另一種情況是盡可能在所有節點上運行程序的所有功能。無論是哪種情況,每次給Executor增加Cores,是增加一個,如果是spreadOutApps的方式,循環一輪再下一輪。例如,有4個Worker,第一次為每個Executor啟動一個線程,第二次循環分配一個線程,第三次循環再分配一個線程……
scheduleExecutorsOnWorkers方法如下。
1. while (freeWorkers.nonEmpty) { 2. freeWorkers.foreach { pos => 3. var keepScheduling = true 4. while (keepScheduling && canLaunchExecutor(pos)) { 5. coresToAssign -= minCoresPerExecutor 6. assignedCores(pos) += minCoresPerExecutor 7. 8. //如果每個worker上啟動一個Executor,那么每次迭代在Executor 上分配一 //個核,否則,每次迭代都將把內核分配給一個新的Executor 9. if (oneExecutorPerWorker) { 10. assignedExecutors(pos) = 1 11. } else { 12. assignedExecutors(pos) += 1 13. } 14. 15. //展開應用程序意味著將Executors展開到盡可能多的workers節點。如果不展 //開,將對這個workers的Executors進行調度,直到使用它的全部資源。否則, //只是移動到下一個worker節點 16. if (spreadOutApps) { 17. keepScheduling = false 18. } 19. } 20. }
回到Master.scala的startExecutorsOnWorkers,現在已經決定為每個worker分配多少個cores,然后進行資源分配。
1. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 2. allocateWorkerResourceToExecutors( 3. app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 4. }
allocateWorkerResourceToExecutors的源碼如下。
1. private def allocateWorkerResourceToExecutors( 2. app: ApplicationInfo, 3. assignedCores: Int, 4. coresPerExecutor: Option[Int], 5. worker: WorkerInfo): Unit = { 6. //如果指定了每個Executor的內核數,我們就將分配的內核無剩余地均分給worker節點的 //Executors。否則,我們啟動一個單一的 Executor,抓住這個 worker 節點所有的 //assignedCores 7. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) 8. val coresToAssign = coresPerExecutor.getOrElse(assignedCores) 9. for (i <- 1 to numExecutors) { 10. val exec = app.addExecutor(worker, coresToAssign) 11. launchExecutor(worker, exec) 12. app.state = ApplicationState.RUNNING 13. } 14. }
allocateWorkerResourceToExecutors中的app.addExecutor增加一個Executor,記錄Executor的相關信息。
1. private[master] def addExecutor( 2. worker: WorkerInfo, 3. cores: Int, 4. useID: Option[Int] = None): ExecutorDesc = { 5. val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) 6. executors(exec.id) = exec 7. coresGranted += cores 8. exec 9. }
回到allocateWorkerResourceToExecutors方法中,launchExecutor(worker, exec)啟動Executor。
1. launchExecutor(worker, exec)
(14)準備具體要為當前應用程序分配的Executor信息后,Master要通過遠程通信發指令給Worker來具體啟動ExecutorBackend進程。
launchExecutor方法如下。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. worker.addExecutor(exec) 4. worker.endpoint.send(LaunchExecutor(masterUrl, 5. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 6. ......
(15)緊接著給應用程序的Driver發送一個ExecutorAdded的信息。
launchExecutor方法如下。
1. exec.application.driver.send( 2. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 3. }
- Mastering Proxmox(Third Edition)
- Hadoop 2.x Administration Cookbook
- 輕松學PHP
- 計算機原理
- 條碼技術及應用
- VMware Performance and Capacity Management(Second Edition)
- ROS機器人編程與SLAM算法解析指南
- 21天學通Visual Basic
- Excel 2007技巧大全
- Linux常用命令簡明手冊
- 軟件需求十步走
- 這樣用Word!
- 服務器配置與應用(Windows Server 2008 R2)
- Modern Big Data Processing with Hadoop
- 局域網組建與使用完全自學手冊