官术网_书友最值得收藏!

6.4 從Application提交的角度重新審視Executor

本節從Application提交的角度重新審視Executor,徹底解密Executor到底是什么時候啟動的,以及Executor如何把結果交給Application。

6.4.1 Executor到底是什么時候啟動的

SparkContext啟動后,StandaloneSchedulerBackend中會調用new()函數創建一個StandaloneAppClient,StandaloneAppClient中有一個名叫ClientEndPoint的內部類,在創建ClientEndpoint時會傳入Command來指定具體為當前應用程序啟動的Executor進行的入口類的名稱為CoarseGrainedExecutorBackend。ClientEndPoint繼承自ThreadSafeRpcEndpoint,其通過RPC機制完成和Master的通信。在ClientEndPoint的start方法中,會通過registerWithMaster方法向Master發送RegisterApplication請求,Master收到該請求消息后,首先通過registerApplication方法完成信息登記,之后將會調用schedule方法,在Worker上啟動Executor。Master對RegisterApplication請求處理源碼如下所示。

Master.scala的源碼如下。

1.        case RegisterApplication(description, driver) =>
2.       //待辦事項:防止driver程序重復注冊
3.      //Master處于STANDBY(備用)狀態,不作處理
4.       if (state == RecoveryState.STANDBY) {
5.         //忽略,不發送響應
6.       } else {
7.         logInfo("Registering app " + description.name)
8.     //由description描述,構建ApplicationInfo
9.         val app = createApplication(description, driver)
10.        registerApplication(app)
11.        logInfo("Registered app " + description.name + " with ID " + app.id)
12.     //在持久化引擎中加入application
13.        persistenceEngine.addApplication(app)
14.        driver.send(RegisteredApplication(app.id, self))
15.  //調用schedule方法,在worker節點上啟動Executor
16.        schedule()
17.      }

在上面的代碼中,Master匹配到RegisterApplication請求,先判斷Master的狀態是否為STANDBY(備用)狀態,如果不是,說明Master為ALIVE狀態,在這種狀態下調用createApplication(description,sender)方法創建ApplicationInfo,完成之后調用persistenceEngine. addApplication(app)方法,將新創建的ApplicationInfo持久化,以便錯誤恢復。完成這兩步操作后,通過driver.send(RegisteredApplication(app.id, self))向StandaloneAppClient返回注冊成功后ApplicationInfo的Id和master的url地址。

ApplicationInfo對象是對application的描述,下面先來看createApplication方法的源碼。

Master.scala的源碼如下。

1.  private def createApplication(desc: ApplicationDescription, driver:
        RpcEndpointRef):
2.        ApplicationInfo = {
3.   //ApplicationInfo創建時間
4.      val now = System.currentTimeMillis()
5.      val date = new Date(now)
6.    //由date生成application id
7.      val appId = newApplicationId(date)
8.   //創建ApplicationInfo
9.      new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
10.   }

上面的代碼中,createApplication方法接收ApplicationDescription和ActorRef兩種類型的參數,并調用newApplicationId方法生成appId,關鍵代碼如下所示。

1. val appId = "app-%s-%04d".format(createDateFormat.format(submitDate),
   nextAppNumber)

由代碼所決定,appid的格式形如:app-20160429101010-0001。desc對象中包含一些基本的配置,包括從系統中傳入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等作為參數構造一個ApplicatiOnInfo對象并返回。函數返回之后,調用registerApplication方法,完成application的注冊,該方法是如何完成注冊的?方法代碼如下所示。

Master.scala的源碼如下。

1.    private def registerApplication(app: ApplicationInfo): Unit = {
2.    //Driver的地址,用于Master和Driver通信
3.      val appAddress = app.driver.address
4.    //如果addressToApp中已經有了該Driver地址,說明該Driver已經注冊過了,直接
      //return
5.
6.      if (addressToApp.contains(appAddress)) {
7.        logInfo("Attempted to re-register application at same address: " +
          appAddress)
8.        return
9.      }
10.  //向度量系統注冊
11.     applicationMetricsSystem.registerSource(app.appSource)
12.  //apps是一個HashSet,保存數據不能重復,向HashSet中加入app
13.     apps += app
14. //idToApp是一個HashMap,該HashMap用于保存id和app的對應關系
15.     idToApp(app.id) = app
16. //endpointToApp是一個HashMap,該HashMap用于保存driver和app的對應關系
17.     endpointToApp(app.driver) = app
18. //addressToApp是一個HashMap,記錄app Driver的地址和app的對應關系
19.     addressToApp(appAddress) = app
20. /waitingApps是一個數組,記錄等待調度的app記錄
21.     waitingApps += app
22.     if (reverseProxy) {
23.       webUi.addProxyTargets(app.id, app.desc.appUiUrl)
24.     }
25.   }

上面的代碼中,首先通過app.driver.path.address得到Driver的地址,然后查看appAddress映射表中是否已經存在這個路徑,如果存在,表示該application已經注冊,直接返回;如果不存在,則在waitingApps數組中加入該application,同時在idToApp、endpointToApp、addressToApp映射表中加入映射關系。加入waitingApps數組中的application等待schedule方法的調度。

schedule方法有兩個作用:第一,完成Driver的調度,將waitingDrivers數組中的Driver發送到滿足條件的Worker上運行;第二,在滿足條件的Worker節點上為application啟動Executor。

Master.scala的schedule方法的源碼如下。

1.  private def schedule(): Unit = {
2.  .......
3.       launchDriver(worker, driver)
4.      .......
5.      startExecutorsOnWorkers()
6.    }

在Master中,schedule方法是一個很重要的方法,每一次新的Driver的注冊、application的注冊,或者可用資源發生變動,都將調用schedule方法。schedule方法用于為當前等待調度的application調度可用的資源,在滿足條件的Worker節點上啟動Executor。這個方法還有另外一個作用,就是當有Driver提交的時候,負責將Driver發送到一個可用資源滿足Driver需求的Worker節點上運行。launchDriver(worker,driver)方法負責完成這一任務。

application調度成功之后,Master將會為application在Worker節點上啟動Executors,調用startExecutorsOnWorkers方法完成此操作。

在scheduleExecutorsOnWorkers方法中,有兩種啟動Executor的策略:第一種是輪流均攤策略(round-robin),采用圓桌算法依次輪流均攤,直到滿足資源需求,輪流均攤策略通常會有更好的數據本地性,因此它是默認的選擇策略;第二種是依次全占,在usableWorkers中,依次獲取每個Worker上的全部資源,直到滿足資源需求。

scheduleExecutorsOnWorkers方法為application分配好邏輯意義上的資源后,還不能真正在Worker節點為application分配出資源,需要調用動作函數為application真正地分配資源。allocateWorkerResourceToExecutors方法的調用,將會在Worker節點上實際分配資源。下面是allocateWorkerResourceToExecutors的源碼。

Master.scala的源碼如下。

1. private def allocateWorkerResourceToExecutors(
2.    ......
3.       launchExecutor(worker, exec)
4.    .......

上面代碼調用了launchExecutor(worker,exec)方法,這個方法有兩個參數:第一個參數是滿足條件的WorkerInfo信息;第二個參數是描述Executor的ExecutorDesc對象。這個方法將會向Worker節點發送LaunchExecutor的請求,Worker節點收到該請求之后,將會負責啟動Executor。launchExecutor方法代碼清單如下所示。

Master.scala的源碼如下。

1.      private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc):
        Unit = {
2.      logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.  //向WorkerInfo中加入exec這個描述Executor的ExecutorDesc對象
4.      worker.addExecutor(exec)
5.  //向worker發送LaunchExecutor消息,加載Executor消息中攜帶了masterUrl地址、
    //application id、Executor id、Executor描述desc、Executor核的個數、Executor
    //分配的內存大小
6.
7.      worker.endpoint.send(LaunchExecutor(masterUrl,
8.        exec.application.id, exec.id, exec.application.desc, exec.cores,
          exec.memory))
9.    //向Driver發回ExecutorAdded消息,消息攜帶worker的id號、worker的host和
      //port、分配的核的個數和內存大小
10.     exec.application.driver.send(
11.       ExecutorAdded(exec.id,       worker.id,     worker.hostPort,      exec.cores,
          exec.memory))
12.   }

launchExecutor有兩個參數,第一個參數是worker:WorkerInfo,代表Worker的基本信息;第二個參數是exec:ExecutorDesc,這個參數保存了Executor的基本配置信息,如memory、cores等。此方法中有worker.endpoint.send(LaunchExecutor(...)),向Worker發送LaunchExecutor請求,Worker收到該請求之后,將會調用方法啟動Executor。

向Worker發送LaunchExecutor消息的同時,通過exec.application.driver.send (ExecutorAdded(…))向Driver發送ExecutorAdded消息,該消息為Driver反饋Master都在哪些Worker上啟動了Executor,Executor的編號是多少,為每個Executor分配了多少個核,多大的內存以及Worker的聯系hostport等消息。

Worker收到LaunchExecutor消息會做相應的處理。首先判斷傳過來的masterUrl是否和activeMasterUrl相同,如果不相同,說明收到的不是處于ALIVE狀態的Master發送過來的請求,這種情況直接打印警告信息。如果相同,則說明該請求來自ALIVE Master,于是為Executor創建工作目錄,創建好工作目錄之后,使用appid、execid、appDes等參數創建ExecutorRunner。顧名思義,ExecutorRunner是Executor運行的地方,在ExecutorRunner中有一個工作線程,這個線程負責下載依賴的文件,并啟動CoarseGaindExecutorBackend進程,該進程單獨在一個JVM上運行。下面是ExecutorRunner中的線程啟動的源碼。

ExecutorRunner.scala的源碼如下。

1.   private[worker] def start() {
2.       //創建線程
3.      workerThread = new Thread("ExecutorRunner for " + fullId) {
4.       //線程run方法中調用fetchAndRunExecutor
5.        override def run() { fetchAndRunExecutor() }
6.      }
7.       //啟動線程
8.      workerThread.start()
9.
10.      //終止回調函數,用于殺死進程
11.     shutdownHook = ShutdownHookManager.addShutdownHook { () =>
12.       //這是可能的,調用fetchAndRunExecutor       之前,state  將是 ExecutorState.
          //RUNNING。在這種情況下,我們應該設置“狀態”為“失敗”
13.       if (state == ExecutorState.RUNNING) {
14.         state = ExecutorState.FAILED
15.       }
16.       killProcess(Some("Worker shutting down")) }
17.   }

上面代碼中定義了一個Thread,這個Thread的run方法中調用fetchAndRunExecutor方法,fetchAndRunExecutor負責以進程的方式啟動ApplicationDescription中攜帶的org.apache.spark.executor.CoarseGrainedExecutorBackend進程。

其中,fetchAndRunExecutor方法中的CommandUtils.buildProcessBuilder(appDesc.command,傳入的入口類是"org.apache.spark.executor.CoarseGrainedExecutorBackend",當Worker節點中啟動ExecutorRunner時,ExecutorRunner中會啟動CoarseGrainedExecutorBackend進程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver發出RegisterExecutor注冊請求。

CoarseGrainedExecutorBackend的onStart方法的源碼如下。

1.      override def onStart() {
2.  .......
3.        driver = Some(ref)
4.   //向driver發送ask請求,等待driver回應
5.        ref.ask[Boolean](RegisterExecutor(executorId, self, hostname,
          cores, extractLogUrls))
6.    ......

Driver端收到注冊請求,將會注冊Executor的請求。

CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源碼如下。

1.     override def receiveAndReply(context: RpcCallContext):
       PartialFunction[Any, Unit] = {
2.
3.       case RegisterExecutor(executorId, executorRef, hostname, cores,
         logUrls) =>
4.  .......
5.           executorRef.send(RegisteredExecutor)
6.   ......

如上面代碼所示,Driver向CoarseGrainedExecutorBackend發送RegisteredExecutor消息,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后將會新建一個Executor執行器,并為此Executor充當信使,與Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源碼如下所示。

CoarseGrainedExecutorBackend.scala的receive的源碼如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case RegisteredExecutor =>
3.        logInfo("Successfully registered with driver")
4.        try {
5.    //收到RegisteredExecutor消息,立即創建Executor
6.          executor = new Executor(executorId, hostname, env, userClassPath,
            isLocal = false)
7.        } catch {
8.          case NonFatal(e) =>
9.            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
10.       }

從上面的代碼中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新創建一個org.apache.spark.executor.Executor對象,至此Executor創建完畢。

6.4.2 Executor如何把結果交給Application

CoarseGrainedExecutorBackend給DriverEndpoint發送StatusUpdate傳輸執行結果,DriverEndpoint會把執行結果傳遞給TaskSchedulerImpl處理,然后交給TaskResultGetter內部通過線程分別處理Task執行成功和失敗的不同情況,然后告訴DAGScheduler任務處理結束的狀況。

CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法的源碼如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case StatusUpdate(executorId, taskId, state, data) =>
3.        scheduler.statusUpdate(taskId, state, data.value)
4.        if (TaskState.isFinished(state)) {
5.          executorDataMap.get(executorId) match {
6.            case Some(executorInfo) =>
7.              executorInfo.freeCores += scheduler.CPUS_PER_TASK
8.              makeOffers(executorId)
9.            case None =>
10.             //忽略更新,因為我們不知道Executor
11.             logWarning(s"Ignored task status update ($taskId state $state)"+
12.               s"from unknown executor with ID $executorId")
13.         }
14.       }

DriverEndpoint的receive方法中的StatusUpdate調用scheduler.statusUpdate,然后釋放資源,再次進行資源調度makeOffers(executorId)。

TaskSchedulerImpl的statusUpdate中:

 如果是TaskState.LOST,則記錄原因,將Executor清理掉。

 如果是TaskState.isFinished,則從taskSet中運行的任務中清除掉,調用taskResultGetter. enqueueSuccessfulTask處理。

 如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,調用taskResultGetter. enqueueFailedTask處理。

主站蜘蛛池模板: 中方县| 卢氏县| 韩城市| 炎陵县| 定边县| 红原县| 潞西市| 石景山区| 德安县| 泌阳县| 三门峡市| 章丘市| 抚远县| 启东市| 宿州市| 磐石市| 额敏县| 伽师县| 五指山市| 德庆县| 静宁县| 墨竹工卡县| 益阳市| 塔河县| 上高县| 五河县| 乌审旗| 边坝县| 项城市| 磐石市| 拉萨市| 长子县| 凭祥市| 沙河市| 永兴县| 玛多县| 梁山县| 合水县| 岳阳县| 汶上县| 西林县|