- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 2624字
- 2019-12-12 17:29:57
5.3 ExecutorBackend啟動原理和源碼詳解
ExecutorBackend是Executor向集群發送更新消息的一個可插拔的接口。ExecutorBackend擁有不同的實現。Standalone模式下ExecutorBackend的默認實現是CoarseGrainedExecutorBackend;在Local模式下,ExecutorBackend的默認實現是LocalBackend。在Mesos調度模式下,ExecutorBackend的默認實現是MesosExecutorBackend。本節主要探索Standalone模式下的ExecutorBackend,通過源碼深入理解ExecutorBackend接口設計的精髓。
5.3.1 ExecutorBackend接口與Executor的關系
本節將詳細分析Standalone模式下ExecutorBackend和Executor的關系。在StandaloneSchedulerBackend中會實例化一個StandaloneAppClient。StandaloneAppClient中攜帶了command信息,command信息中指定了要啟動的ExecutorBackend的實現類,Standalone模式下,該ExecutorBackend的實現類是org.apache.spark.executor.CoarseGrainedExecutorBackend類。
StandaloneSchedulerBackend.scala的start方法中構建了一個Command對象,該對象的第一個參數是mainClass,即進程的主類。該類在Standalone模式下為org.apache.spark.executor. CoarseGrainedExecutorBackend。分別將sparkJavaopts、javaOpts、command、appUiAddress、coresPerExecutor、appDes傳入StandaloneAppClient構造函數。StandaloneAppClient將會向Master發送RegisterApplication注冊請求,Master受理后通過launchExecutor方法在Worker節點啟動一個ExecutorRunner對象,該對象用于管理一個Executor進程。在ExecutorRunner中將通過CommandUtil構建一個ProcessBuilder,調用ProcessBuilder的start方法將會以進程的方式啟動org.apache.spark.executor.CoarseGrainedExecutorBackend。在CoarseGrainedExecotorBackend的onStart方法中,將會向Driver端發送RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)消息請求注冊,完成注冊后將立即返回一個RegisteredExecutor(executorAddress. host)消息,CoarseGraiendExecutorBackend收到該消息,馬上實例化出一個Executor。源碼如下所示。
CoarseGrainedExecutorBackend.scala的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 6. } catch { 7. case NonFatal(e) => 8. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 9. }
從這里可以看出,CoarseGrainedExecutorBackend比Executor先實例化。CoarseGrained-ExecutorBackend負責與集群通信,而Executor則專注于任務的處理,它們是一對一的關系,在集群中各司其職。
每個Worker節點上可以啟動多個CoarseGrainedExecutorBackend進程,每個進程對應一個Executor。
5.3.2 ExecutorBackend的不同實現
ExecutorBackend是與集群交互的接口,該接口在不同的調度模式下有不同的實現。圖5-3是ExecutorBackend及其實現的關系類圖。

圖5-3 ExecutorBackend及其實現的關系類圖
不同模式下,ExecutorRunner啟動的進程不一樣。在Standalone模式下啟動的是org.apache.spark.executor.CoarseGrainedExecutorBackend進程;在Local模式下,啟動的是org.apache.spark.executor.LocalExecutorBackend進程;在Mesos模式下,啟動的是org.apache. spark.executor.MesosExecutorBackend進程。
下面來看Standalone模式下CoarseGrainedExecutorBackend的啟動。在Standalone模式下,會啟動org.apache.spark.deploy.Client類,該類將向Master發送RequestSubmitDriver (driverDescription)消息,Master中匹配到RequestSubmitDriver(driverDescription)后,將會調用schedule方法。該調用的源碼如下所示。
Master.scala的receiveAndReply的源碼如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case RequestSubmitDriver(description) => 4. //若state不為ALIVE,直接向Client返回SubmitDriverResponse(self,false, //None,msg)消息 5. if (state != RecoveryState.ALIVE) { 6. val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 7. "Can only accept driver submissions in ALIVE state." 8. context.reply(SubmitDriverResponse(self, false, None, msg)) 9. } else { 10. logInfo("Driver submitted " + description.command.mainClass) 11. //使用description創建driver,該方法返回DriverDescription 12. val driver = createDriver(description) 13. persistenceEngine.addDriver(driver) 14. waitingDrivers += driver 15. //waitingDrivers等待在調度數組中加入該driver 16. drivers.add(driver) 17. //用schedule方法調度資源 18. schedule() 19. //向ClientEndpoint回復SubmitDriverResponse消息 20. 21. context.reply(SubmitDriverResponse(self, true, Some(driver.id), 22. s"Driver successfully submitted as ${driver.id}")) 23. }
Master的receiveAndReply收到RequestSubmitDriver消息后,調用schedule方法。
Master的schedule的源碼如下。
1. private def schedule(): Unit = { 2. if (state != RecoveryState.ALIVE) { 3. return 4. } 5. //Drivers 優先于executors 6. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter (_.state == WorkerState.ALIVE)) 7. val numWorkersAlive = shuffledAliveWorkers.size 8. var curPos = 0 9. for (driver <- waitingDrivers.toList) { //遍歷waitingDrivers 10. //以循環的方式給每個等候的driver分配Worker。對于每個driver,我們從分配 //給driver的最后一個Worker開始,繼續前進,直到所有活躍的Worker節點 11. 12. var launched = false 13. var numWorkersVisited = 0 14. while (numWorkersVisited < numWorkersAlive && !launched) { 15. val worker = shuffledAliveWorkers(curPos) 16. numWorkersVisited += 1 17. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 18. launchDriver(worker, driver) 19. waitingDrivers -= driver 20. launched = true 21. } 22. curPos = (curPos + 1) % numWorkersAlive 23. } 24. } 25. startExecutorsOnWorkers() 26. }
上面代碼中,RecoveryState若不為ALIVE,則直接返回,否則使用Random.shuffle將Workers集合打亂,過濾出ALIVE的Worker,生成新的集合shuffledAliveWorkers,盡量考慮到選擇Driver的負載均衡。在for語句中遍歷waitingDrivers隊列,判斷Worker剩余內存和剩余物理核是否滿足Driver需求,如滿足,則調用launchDriver(worker,driver)方法在選中的Worker上啟動Driver進程。
實例化SparkContext時,在SparkContext中將實例化出DAGScheduler、StandaloneSchedulerBackend。Driver在Worker節點上啟動之后,在StandaloneSchedulerBackend中將會調用new()函數創建一個StandaloneAppClient。StandaloneAppClient中有一個ClientEndpoint,在其onStart方法中將向Master發送RegisterApplication請求注冊application,注冊好application后,Master又會調用schedule方法,在滿足條件的Worker上為application啟動Executor,首先會啟動ExecutorRunner,在ExecutorRunner中啟動CoarseGrainedExecutor-Backend,啟動后將會實例化出Executor。為什么在Standalone模式下會啟動CoarseGrained-ExecutorBackend呢?在什么地方設置要啟動的CoarseGrainedExecutorBackend進程呢?其實,在實例化StandaloneAppClient的時候就已經傳入了。
StandaloneSchedulerBackend.scala的start方法代碼中設置了Command對象。Command對象的第一個參數是啟動進程的mainClass。因此,ExecutorRunner中啟動進程時,啟動的是org.apache.spark.executor.CoarseGrainedExecutorBackend。
5.3.3 ExecutorBackend中的通信
ExecutorBackend是一個被Executor使用的可插拔的與集群通信的接口。在ExecutorBackend中有statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)方法,通過這個方法向集群發送Task執行的各種信息,如果任務執行失敗,則返回失敗的信息;如果執行成功,則返回任務執行的結果。本節重點講解在Standalone模式下CoarseGrainedExecutor-Backend中的通信。CoarseGrainedExecutorBackend在整個集群中的通信如圖5-4所示。
在圖5-4中,Executor與CoarseGrainedExecutorBackend協作,將任務計算的結果通過CoarseGrainedExecutorBackend的statusUpdate方法將taskId、TaskState以及結果數據發送給Driver。Driver收到StatusUpdate(executorId,tasked,state,data)消息,通過判斷state的不同狀態,進行不同的處理。例如,當state的狀態為TaskState.LOST時,Driver端會移除Executor;當state的狀態為TaskState.FINISHED時,Driver端會調用enqueueSuccessfulTask進行處理。
這里主要看CoarseGrainedExecutorBackend與Driver之間的通信。當Worker節點中啟動ExecutorRunner時,ExecutorRunner中會啟動CoarseGrainedExecutorBackend進程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver發出RegisterExecutor注冊請求。源碼如下所示。

圖5-4 CoarseGrainedExecutorBackend在整個集群中的通信
CoarseGrainedExecutorBackend的onStart方法的源碼如下。
1. override def onStart() { 2. logInfo("Connecting to driver: " + driverUrl) 3. rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => 4. //這是一個非常快的行動,所以我們可以用"ThreadUtils.sameThread" 5. driver = Some(ref) 6. //向Driver發送ask請求,等待Driver回應 7. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) 8. }(ThreadUtils.sameThread).onComplete { 9. //這是一個非常快的行動,所以我們可以用"ThreadUtils.sameThread" 10. case Success(msg) => 11. //經常收到true,可以忽略 12. case Failure(e) => 13. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) 14. }(ThreadUtils.sameThread) 15. }
上面的代碼中,Some(ref)得到Driver的引用,通過ask方法返回Future[Boolean],然后在Future對象上調用onComplete方法進行額外的處理。Driver端收到注冊請求,將會注冊Executor的請求,并向ListenerBus中發送SparkListenerExecutorAdded事件。
如果executorDataMap中已經存在該Executor的id,就返回RegisterExecutorFailed,如果不存在該Executor的id,則在executorDataMap中加入該Executor的id,并返回RegisteredExecutor消息且向listenerBus中添加SparkListenerExecutorAdded事件。CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新建一個Executor執行器,并為此Executor充當信使,與Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源碼如下所示。
CoarseGrainedExecutorBackend.scala的receive的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. //收到RegisteredExecutor消息,立即創建Executor 6. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 7. } catch { 8. case NonFatal(e) => 9. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 10. }
從上面的代碼中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新建一個Executor。由此可見,Executor在CoarseGrainedExecutorBackend后實例化,這與Executor和CoarseGrainedExecutorBackend的不同職責有關,Executor主要負責計算,而CoarseGrainedExecutorBackend主要負責通信,通信環境準備好了,架起同CoarseGrainedSchedulerBackend通信的橋梁,就可以接收CoarseGrainedSchedulerBackend中調用launchTask方法發送的LaunchTask消息了,因此通信在前,計算在后。
Executor中的計算結果是通過CoarseGrainedExecutorBackend的statusUpdate方法返回給CoarseGrainedExecutorBackend的。statusUpdate方法的代碼如下所示。
CoarseGrainedExecutorBackend.scala的源碼如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. //向Driver發送StatusUpdate消息 5. case Some(driverRef) => driverRef.send(msg) 6. case None => logWarning(s"Drop $msg because has not yet connected to driver") 7. } 8. }
上面源碼中,通過參數taskId、state、data構建一個StatusUpdate對象,該對象將被當作消息發送到Driver端,Driver根據返回結果的需要,將會向CoarseGrainedExecutorBackend發送新的指令消息,如LaunchTask、KillTask、StopExecutors、Shutdown等。
5.3.4 ExecutorBackend的異常處理
若CoarseGrainedExecutorBackend在運行中出現異常,將調用exitExecutor方法進行處理,處理以后,系統退出。exitExecutor函數可以由其他子類重載來處理,Executor執行的退出方式不同。例如,當Executor掛掉了,后臺程序可能不會讓父進程也掛掉。如果須通知Driver,Driver將清理掛掉的Executor的數據。
CoarseGrainedExecutorBackend的exitExecutor方法的源碼如下。
1. protected def exitExecutor(code: Int, 2. reason: String, 3. throwable: Throwable = null, 4. notifyDriver: Boolean = true) = { 5. val message = "Executor self-exiting due to : " + reason 6. if (throwable != null) { 7. logError(message, throwable) 8. } else { 9. logError(message) 10. } 11. 12. if (notifyDriver && driver.nonEmpty) { 13. driver.get.ask[Boolean]( 14. RemoveExecutor(executorId, new ExecutorLossReason(reason)) 15. ).onFailure { case e => 16. logWarning(s"Unable to notify the driver due to " + e.getMessage, e) 17. }(ThreadUtils.sameThread) 18. } 19. 20. System.exit(code) 21. } 22. }
CoarseGrainedExecutorBackend在運行中一旦出現異常情況,將調用exitExecutor方法處理。
Executor向Driver注冊RegisterExecutor失敗。
Executor收到Driver的RegisteredExecutor注冊成功消息以后,創建Executor實例失敗。
Driver返回Executor注冊失敗消息RegisterExecutorFailed。
Executor收到Driver的LaunchTask啟動任務消息,但是Executor為null。
Executor收到Driver的KillTask消息,但是Executor為null。
Executor和Driver失去連接。
- Splunk 7 Essentials(Third Edition)
- 協作機器人技術及應用
- 數據運營之路:掘金數據化時代
- 機器人智能運動規劃技術
- 電腦上網直通車
- Visual Basic從初學到精通
- 大數據處理平臺
- 21天學通Visual C++
- Prometheus監控實戰
- 網絡安全管理實踐
- Machine Learning with Apache Spark Quick Start Guide
- LMMS:A Complete Guide to Dance Music Production Beginner's Guide
- Mastering GitLab 12
- Windows安全指南
- Oracle 11g Anti-hacker's Cookbook