- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 2970字
- 2019-12-12 17:29:59
6.4 從Application提交的角度重新審視Executor
本節從Application提交的角度重新審視Executor,徹底解密Executor到底是什么時候啟動的,以及Executor如何把結果交給Application。
6.4.1 Executor到底是什么時候啟動的
SparkContext啟動后,StandaloneSchedulerBackend中會調用new()函數創建一個StandaloneAppClient,StandaloneAppClient中有一個名叫ClientEndPoint的內部類,在創建ClientEndpoint時會傳入Command來指定具體為當前應用程序啟動的Executor進行的入口類的名稱為CoarseGrainedExecutorBackend。ClientEndPoint繼承自ThreadSafeRpcEndpoint,其通過RPC機制完成和Master的通信。在ClientEndPoint的start方法中,會通過registerWithMaster方法向Master發送RegisterApplication請求,Master收到該請求消息后,首先通過registerApplication方法完成信息登記,之后將會調用schedule方法,在Worker上啟動Executor。Master對RegisterApplication請求處理源碼如下所示。
Master.scala的源碼如下。
1. case RegisterApplication(description, driver) => 2. //待辦事項:防止driver程序重復注冊 3. //Master處于STANDBY(備用)狀態,不作處理 4. if (state == RecoveryState.STANDBY) { 5. //忽略,不發送響應 6. } else { 7. logInfo("Registering app " + description.name) 8. //由description描述,構建ApplicationInfo 9. val app = createApplication(description, driver) 10. registerApplication(app) 11. logInfo("Registered app " + description.name + " with ID " + app.id) 12. //在持久化引擎中加入application 13. persistenceEngine.addApplication(app) 14. driver.send(RegisteredApplication(app.id, self)) 15. //調用schedule方法,在worker節點上啟動Executor 16. schedule() 17. }
在上面的代碼中,Master匹配到RegisterApplication請求,先判斷Master的狀態是否為STANDBY(備用)狀態,如果不是,說明Master為ALIVE狀態,在這種狀態下調用createApplication(description,sender)方法創建ApplicationInfo,完成之后調用persistenceEngine. addApplication(app)方法,將新創建的ApplicationInfo持久化,以便錯誤恢復。完成這兩步操作后,通過driver.send(RegisteredApplication(app.id, self))向StandaloneAppClient返回注冊成功后ApplicationInfo的Id和master的url地址。
ApplicationInfo對象是對application的描述,下面先來看createApplication方法的源碼。
Master.scala的源碼如下。
1. private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): 2. ApplicationInfo = { 3. //ApplicationInfo創建時間 4. val now = System.currentTimeMillis() 5. val date = new Date(now) 6. //由date生成application id 7. val appId = newApplicationId(date) 8. //創建ApplicationInfo 9. new ApplicationInfo(now, appId, desc, date, driver, defaultCores) 10. }
上面的代碼中,createApplication方法接收ApplicationDescription和ActorRef兩種類型的參數,并調用newApplicationId方法生成appId,關鍵代碼如下所示。
1. val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
由代碼所決定,appid的格式形如:app-20160429101010-0001。desc對象中包含一些基本的配置,包括從系統中傳入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等作為參數構造一個ApplicatiOnInfo對象并返回。函數返回之后,調用registerApplication方法,完成application的注冊,該方法是如何完成注冊的?方法代碼如下所示。
Master.scala的源碼如下。
1. private def registerApplication(app: ApplicationInfo): Unit = { 2. //Driver的地址,用于Master和Driver通信 3. val appAddress = app.driver.address 4. //如果addressToApp中已經有了該Driver地址,說明該Driver已經注冊過了,直接 //return 5. 6. if (addressToApp.contains(appAddress)) { 7. logInfo("Attempted to re-register application at same address: " + appAddress) 8. return 9. } 10. //向度量系統注冊 11. applicationMetricsSystem.registerSource(app.appSource) 12. //apps是一個HashSet,保存數據不能重復,向HashSet中加入app 13. apps += app 14. //idToApp是一個HashMap,該HashMap用于保存id和app的對應關系 15. idToApp(app.id) = app 16. //endpointToApp是一個HashMap,該HashMap用于保存driver和app的對應關系 17. endpointToApp(app.driver) = app 18. //addressToApp是一個HashMap,記錄app Driver的地址和app的對應關系 19. addressToApp(appAddress) = app 20. /waitingApps是一個數組,記錄等待調度的app記錄 21. waitingApps += app 22. if (reverseProxy) { 23. webUi.addProxyTargets(app.id, app.desc.appUiUrl) 24. } 25. }
上面的代碼中,首先通過app.driver.path.address得到Driver的地址,然后查看appAddress映射表中是否已經存在這個路徑,如果存在,表示該application已經注冊,直接返回;如果不存在,則在waitingApps數組中加入該application,同時在idToApp、endpointToApp、addressToApp映射表中加入映射關系。加入waitingApps數組中的application等待schedule方法的調度。
schedule方法有兩個作用:第一,完成Driver的調度,將waitingDrivers數組中的Driver發送到滿足條件的Worker上運行;第二,在滿足條件的Worker節點上為application啟動Executor。
Master.scala的schedule方法的源碼如下。
1. private def schedule(): Unit = { 2. ....... 3. launchDriver(worker, driver) 4. ....... 5. startExecutorsOnWorkers() 6. }
在Master中,schedule方法是一個很重要的方法,每一次新的Driver的注冊、application的注冊,或者可用資源發生變動,都將調用schedule方法。schedule方法用于為當前等待調度的application調度可用的資源,在滿足條件的Worker節點上啟動Executor。這個方法還有另外一個作用,就是當有Driver提交的時候,負責將Driver發送到一個可用資源滿足Driver需求的Worker節點上運行。launchDriver(worker,driver)方法負責完成這一任務。
application調度成功之后,Master將會為application在Worker節點上啟動Executors,調用startExecutorsOnWorkers方法完成此操作。
在scheduleExecutorsOnWorkers方法中,有兩種啟動Executor的策略:第一種是輪流均攤策略(round-robin),采用圓桌算法依次輪流均攤,直到滿足資源需求,輪流均攤策略通常會有更好的數據本地性,因此它是默認的選擇策略;第二種是依次全占,在usableWorkers中,依次獲取每個Worker上的全部資源,直到滿足資源需求。
scheduleExecutorsOnWorkers方法為application分配好邏輯意義上的資源后,還不能真正在Worker節點為application分配出資源,需要調用動作函數為application真正地分配資源。allocateWorkerResourceToExecutors方法的調用,將會在Worker節點上實際分配資源。下面是allocateWorkerResourceToExecutors的源碼。
Master.scala的源碼如下。
1. private def allocateWorkerResourceToExecutors( 2. ...... 3. launchExecutor(worker, exec) 4. .......
上面代碼調用了launchExecutor(worker,exec)方法,這個方法有兩個參數:第一個參數是滿足條件的WorkerInfo信息;第二個參數是描述Executor的ExecutorDesc對象。這個方法將會向Worker節點發送LaunchExecutor的請求,Worker節點收到該請求之后,將會負責啟動Executor。launchExecutor方法代碼清單如下所示。
Master.scala的源碼如下。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. //向WorkerInfo中加入exec這個描述Executor的ExecutorDesc對象 4. worker.addExecutor(exec) 5. //向worker發送LaunchExecutor消息,加載Executor消息中攜帶了masterUrl地址、 //application id、Executor id、Executor描述desc、Executor核的個數、Executor //分配的內存大小 6. 7. worker.endpoint.send(LaunchExecutor(masterUrl, 8. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 9. //向Driver發回ExecutorAdded消息,消息攜帶worker的id號、worker的host和 //port、分配的核的個數和內存大小 10. exec.application.driver.send( 11. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 12. }
launchExecutor有兩個參數,第一個參數是worker:WorkerInfo,代表Worker的基本信息;第二個參數是exec:ExecutorDesc,這個參數保存了Executor的基本配置信息,如memory、cores等。此方法中有worker.endpoint.send(LaunchExecutor(...)),向Worker發送LaunchExecutor請求,Worker收到該請求之后,將會調用方法啟動Executor。
向Worker發送LaunchExecutor消息的同時,通過exec.application.driver.send (ExecutorAdded(…))向Driver發送ExecutorAdded消息,該消息為Driver反饋Master都在哪些Worker上啟動了Executor,Executor的編號是多少,為每個Executor分配了多少個核,多大的內存以及Worker的聯系hostport等消息。
Worker收到LaunchExecutor消息會做相應的處理。首先判斷傳過來的masterUrl是否和activeMasterUrl相同,如果不相同,說明收到的不是處于ALIVE狀態的Master發送過來的請求,這種情況直接打印警告信息。如果相同,則說明該請求來自ALIVE Master,于是為Executor創建工作目錄,創建好工作目錄之后,使用appid、execid、appDes等參數創建ExecutorRunner。顧名思義,ExecutorRunner是Executor運行的地方,在ExecutorRunner中有一個工作線程,這個線程負責下載依賴的文件,并啟動CoarseGaindExecutorBackend進程,該進程單獨在一個JVM上運行。下面是ExecutorRunner中的線程啟動的源碼。
ExecutorRunner.scala的源碼如下。
1. private[worker] def start() { 2. //創建線程 3. workerThread = new Thread("ExecutorRunner for " + fullId) { 4. //線程run方法中調用fetchAndRunExecutor 5. override def run() { fetchAndRunExecutor() } 6. } 7. //啟動線程 8. workerThread.start() 9. 10. //終止回調函數,用于殺死進程 11. shutdownHook = ShutdownHookManager.addShutdownHook { () => 12. //這是可能的,調用fetchAndRunExecutor 之前,state 將是 ExecutorState. //RUNNING。在這種情況下,我們應該設置“狀態”為“失敗” 13. if (state == ExecutorState.RUNNING) { 14. state = ExecutorState.FAILED 15. } 16. killProcess(Some("Worker shutting down")) } 17. }
上面代碼中定義了一個Thread,這個Thread的run方法中調用fetchAndRunExecutor方法,fetchAndRunExecutor負責以進程的方式啟動ApplicationDescription中攜帶的org.apache.spark.executor.CoarseGrainedExecutorBackend進程。
其中,fetchAndRunExecutor方法中的CommandUtils.buildProcessBuilder(appDesc.command,傳入的入口類是"org.apache.spark.executor.CoarseGrainedExecutorBackend",當Worker節點中啟動ExecutorRunner時,ExecutorRunner中會啟動CoarseGrainedExecutorBackend進程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver發出RegisterExecutor注冊請求。
CoarseGrainedExecutorBackend的onStart方法的源碼如下。
1. override def onStart() { 2. ....... 3. driver = Some(ref) 4. //向driver發送ask請求,等待driver回應 5. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) 6. ......
Driver端收到注冊請求,將會注冊Executor的請求。
CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源碼如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 2. 3. case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => 4. ....... 5. executorRef.send(RegisteredExecutor) 6. ......
如上面代碼所示,Driver向CoarseGrainedExecutorBackend發送RegisteredExecutor消息,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后將會新建一個Executor執行器,并為此Executor充當信使,與Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源碼如下所示。
CoarseGrainedExecutorBackend.scala的receive的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. //收到RegisteredExecutor消息,立即創建Executor 6. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 7. } catch { 8. case NonFatal(e) => 9. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 10. }
從上面的代碼中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新創建一個org.apache.spark.executor.Executor對象,至此Executor創建完畢。
6.4.2 Executor如何把結果交給Application
CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate傳輸執行結果,DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,然后交給TaskResultGetter內部通過線程分別處理Task執行成功和失敗的不同情況,然后告訴DAGScheduler任務處理結束的狀況。
CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. scheduler.statusUpdate(taskId, state, data.value) 4. if (TaskState.isFinished(state)) { 5. executorDataMap.get(executorId) match { 6. case Some(executorInfo) => 7. executorInfo.freeCores += scheduler.CPUS_PER_TASK 8. makeOffers(executorId) 9. case None => 10. //忽略更新,因為我們不知道Executor 11. logWarning(s"Ignored task status update ($taskId state $state)"+ 12. s"from unknown executor with ID $executorId") 13. } 14. }
DriverEndpoint的receive方法中的StatusUpdate調用scheduler.statusUpdate,然后釋放資源,再次進行資源調度makeOffers(executorId)。
TaskSchedulerImpl的statusUpdate中:
如果是TaskState.LOST,則記錄原因,將Executor清理掉。
如果是TaskState.isFinished,則從taskSet中運行的任務中清除掉,調用taskResultGetter. enqueueSuccessfulTask處理。
如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,調用taskResultGetter. enqueueFailedTask處理。