官术网_书友最值得收藏!

5.3 ExecutorBackend啟動原理和源碼詳解

ExecutorBackend是Executor向集群發送更新消息的一個可插拔的接口。ExecutorBackend擁有不同的實現。Standalone模式下ExecutorBackend的默認實現是CoarseGrainedExecutorBackend;在Local模式下,ExecutorBackend的默認實現是LocalBackend。在Mesos調度模式下,ExecutorBackend的默認實現是MesosExecutorBackend。本節主要探索Standalone模式下的ExecutorBackend,通過源碼深入理解ExecutorBackend接口設計的精髓。

5.3.1 ExecutorBackend接口與Executor的關系

本節將詳細分析Standalone模式下ExecutorBackend和Executor的關系。在StandaloneSchedulerBackend中會實例化一個StandaloneAppClient。StandaloneAppClient中攜帶了command信息,command信息中指定了要啟動的ExecutorBackend的實現類,Standalone模式下,該ExecutorBackend的實現類是org.apache.spark.executor.CoarseGrainedExecutorBackend類。

StandaloneSchedulerBackend.scala的start方法中構建了一個Command對象,該對象的第一個參數是mainClass,即進程的主類。該類在Standalone模式下為org.apache.spark.executor. CoarseGrainedExecutorBackend。分別將sparkJavaopts、javaOpts、command、appUiAddress、coresPerExecutor、appDes傳入StandaloneAppClient構造函數。StandaloneAppClient將會向Master發送RegisterApplication注冊請求,Master受理后通過launchExecutor方法在Worker節點啟動一個ExecutorRunner對象,該對象用于管理一個Executor進程。在ExecutorRunner中將通過CommandUtil構建一個ProcessBuilder,調用ProcessBuilder的start方法將會以進程的方式啟動org.apache.spark.executor.CoarseGrainedExecutorBackend。在CoarseGrainedExecotorBackend的onStart方法中,將會向Driver端發送RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)消息請求注冊,完成注冊后將立即返回一個RegisteredExecutor(executorAddress. host)消息,CoarseGraiendExecutorBackend收到該消息,馬上實例化出一個Executor。源碼如下所示。

CoarseGrainedExecutorBackend.scala的源碼如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  case RegisteredExecutor =>
3.    logInfo("Successfully registered with driver")
4.    try {
5.      executor = new Executor(executorId, hostname, env, userClassPath,
        isLocal = false)
6.    } catch {
7.      case NonFatal(e) =>
8.        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
9.    }

從這里可以看出,CoarseGrainedExecutorBackend比Executor先實例化。CoarseGrained-ExecutorBackend負責與集群通信,而Executor則專注于任務的處理,它們是一對一的關系,在集群中各司其職。

每個Worker節點上可以啟動多個CoarseGrainedExecutorBackend進程,每個進程對應一個Executor。

5.3.2 ExecutorBackend的不同實現

ExecutorBackend是與集群交互的接口,該接口在不同的調度模式下有不同的實現。圖5-3是ExecutorBackend及其實現的關系類圖。

圖5-3 ExecutorBackend及其實現的關系類圖

不同模式下,ExecutorRunner啟動的進程不一樣。在Standalone模式下啟動的是org.apache.spark.executor.CoarseGrainedExecutorBackend進程;在Local模式下,啟動的是org.apache.spark.executor.LocalExecutorBackend進程;在Mesos模式下,啟動的是org.apache. spark.executor.MesosExecutorBackend進程。

下面來看Standalone模式下CoarseGrainedExecutorBackend的啟動。在Standalone模式下,會啟動org.apache.spark.deploy.Client類,該類將向Master發送RequestSubmitDriver (driverDescription)消息,Master中匹配到RequestSubmitDriver(driverDescription)后,將會調用schedule方法。該調用的源碼如下所示。

Master.scala的receiveAndReply的源碼如下。

1.  override def receiveAndReply(context: RpcCallContext): PartialFunction
    [Any, Unit] = {
2.  ......
3.   case RequestSubmitDriver(description) =>
4.   //若state不為ALIVE,直接向Client返回SubmitDriverResponse(self,false,
     //None,msg)消息
5.        if (state != RecoveryState.ALIVE) {
6.          val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
7.            "Can only accept driver submissions in ALIVE state."
8.          context.reply(SubmitDriverResponse(self, false, None, msg))
9.        } else {
10.         logInfo("Driver submitted " + description.command.mainClass)
11. //使用description創建driver,該方法返回DriverDescription
12.         val driver = createDriver(description)
13.         persistenceEngine.addDriver(driver)
14.         waitingDrivers += driver
15.  //waitingDrivers等待在調度數組中加入該driver
16.         drivers.add(driver)
17.  //用schedule方法調度資源
18.         schedule()
19.  //向ClientEndpoint回復SubmitDriverResponse消息
20.
21.         context.reply(SubmitDriverResponse(self, true, Some(driver.id),
22.           s"Driver successfully submitted as ${driver.id}"))
23.       }

Master的receiveAndReply收到RequestSubmitDriver消息后,調用schedule方法。

Master的schedule的源碼如下。

1.      private def schedule(): Unit = {
2.   if (state != RecoveryState.ALIVE) {
3.     return
4.   }
5.   //Drivers 優先于executors
6.   val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter
     (_.state == WorkerState.ALIVE))
7.   val numWorkersAlive = shuffledAliveWorkers.size
8.   var curPos = 0
9.   for (driver <- waitingDrivers.toList) { //遍歷waitingDrivers
10.    //以循環的方式給每個等候的driver分配Worker。對于每個driver,我們從分配
       //給driver的最后一個Worker開始,繼續前進,直到所有活躍的Worker節點
11.
12.    var launched = false
13.    var numWorkersVisited = 0
14.    while (numWorkersVisited < numWorkersAlive && !launched) {
15.      val worker = shuffledAliveWorkers(curPos)
16.      numWorkersVisited += 1
17.      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
         driver.desc.cores) {
18.        launchDriver(worker, driver)
19.        waitingDrivers -= driver
20.        launched = true
21.        }
22.        curPos = (curPos + 1) % numWorkersAlive
23.      }
24.    }
25.    startExecutorsOnWorkers()
26.  }

上面代碼中,RecoveryState若不為ALIVE,則直接返回,否則使用Random.shuffle將Workers集合打亂,過濾出ALIVE的Worker,生成新的集合shuffledAliveWorkers,盡量考慮到選擇Driver的負載均衡。在for語句中遍歷waitingDrivers隊列,判斷Worker剩余內存和剩余物理核是否滿足Driver需求,如滿足,則調用launchDriver(worker,driver)方法在選中的Worker上啟動Driver進程。

實例化SparkContext時,在SparkContext中將實例化出DAGScheduler、StandaloneSchedulerBackend。Driver在Worker節點上啟動之后,在StandaloneSchedulerBackend中將會調用new()函數創建一個StandaloneAppClient。StandaloneAppClient中有一個ClientEndpoint,在其onStart方法中將向Master發送RegisterApplication請求注冊application,注冊好application后,Master又會調用schedule方法,在滿足條件的Worker上為application啟動Executor,首先會啟動ExecutorRunner,在ExecutorRunner中啟動CoarseGrainedExecutor-Backend,啟動后將會實例化出Executor。為什么在Standalone模式下會啟動CoarseGrained-ExecutorBackend呢?在什么地方設置要啟動的CoarseGrainedExecutorBackend進程呢?其實,在實例化StandaloneAppClient的時候就已經傳入了。

StandaloneSchedulerBackend.scala的start方法代碼中設置了Command對象。Command對象的第一個參數是啟動進程的mainClass。因此,ExecutorRunner中啟動進程時,啟動的是org.apache.spark.executor.CoarseGrainedExecutorBackend。

5.3.3 ExecutorBackend中的通信

ExecutorBackend是一個被Executor使用的可插拔的與集群通信的接口。在ExecutorBackend中有statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)方法,通過這個方法向集群發送Task執行的各種信息,如果任務執行失敗,則返回失敗的信息;如果執行成功,則返回任務執行的結果。本節重點講解在Standalone模式下CoarseGrainedExecutor-Backend中的通信。CoarseGrainedExecutorBackend在整個集群中的通信如圖5-4所示。

在圖5-4中,Executor與CoarseGrainedExecutorBackend協作,將任務計算的結果通過CoarseGrainedExecutorBackend的statusUpdate方法將taskId、TaskState以及結果數據發送給Driver。Driver收到StatusUpdate(executorId,tasked,state,data)消息,通過判斷state的不同狀態,進行不同的處理。例如,當state的狀態為TaskState.LOST時,Driver端會移除Executor;當state的狀態為TaskState.FINISHED時,Driver端會調用enqueueSuccessfulTask進行處理。

這里主要看CoarseGrainedExecutorBackend與Driver之間的通信。當Worker節點中啟動ExecutorRunner時,ExecutorRunner中會啟動CoarseGrainedExecutorBackend進程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver發出RegisterExecutor注冊請求。源碼如下所示。

圖5-4 CoarseGrainedExecutorBackend在整個集群中的通信

CoarseGrainedExecutorBackend的onStart方法的源碼如下。

1.      override def onStart() {
2.      logInfo("Connecting to driver: " + driverUrl)
3.      rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
4.        //這是一個非常快的行動,所以我們可以用"ThreadUtils.sameThread"
5.        driver = Some(ref)
6.   //向Driver發送ask請求,等待Driver回應
7.        ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores,
          extractLogUrls))
8.      }(ThreadUtils.sameThread).onComplete {
9.        //這是一個非常快的行動,所以我們可以用"ThreadUtils.sameThread"
10.       case Success(msg) =>
11.         //經常收到true,可以忽略
12.       case Failure(e) =>
13.         exitExecutor(1, s"Cannot register with driver: $driverUrl", e,
            notifyDriver = false)
14.     }(ThreadUtils.sameThread)
15.   }

上面的代碼中,Some(ref)得到Driver的引用,通過ask方法返回Future[Boolean],然后在Future對象上調用onComplete方法進行額外的處理。Driver端收到注冊請求,將會注冊Executor的請求,并向ListenerBus中發送SparkListenerExecutorAdded事件。

如果executorDataMap中已經存在該Executor的id,就返回RegisterExecutorFailed,如果不存在該Executor的id,則在executorDataMap中加入該Executor的id,并返回RegisteredExecutor消息且向listenerBus中添加SparkListenerExecutorAdded事件。CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新建一個Executor執行器,并為此Executor充當信使,與Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源碼如下所示。

CoarseGrainedExecutorBackend.scala的receive的源碼如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case RegisteredExecutor =>
3.        logInfo("Successfully registered with driver")
4.        try {
5.    //收到RegisteredExecutor消息,立即創建Executor
6.          executor = new Executor(executorId, hostname, env, userClassPath,
            isLocal = false)
7.        } catch {
8.          case NonFatal(e) =>
9.            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
10.       }

從上面的代碼中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,將會新建一個Executor。由此可見,Executor在CoarseGrainedExecutorBackend后實例化,這與Executor和CoarseGrainedExecutorBackend的不同職責有關,Executor主要負責計算,而CoarseGrainedExecutorBackend主要負責通信,通信環境準備好了,架起同CoarseGrainedSchedulerBackend通信的橋梁,就可以接收CoarseGrainedSchedulerBackend中調用launchTask方法發送的LaunchTask消息了,因此通信在前,計算在后。

Executor中的計算結果是通過CoarseGrainedExecutorBackend的statusUpdate方法返回給CoarseGrainedExecutorBackend的。statusUpdate方法的代碼如下所示。

CoarseGrainedExecutorBackend.scala的源碼如下。

1.      override def statusUpdate(taskId: Long, state: TaskState, data:
        ByteBuffer) {
2.      val msg = StatusUpdate(executorId, taskId, state, data)
3.      driver match {
4.  //向Driver發送StatusUpdate消息
5.        case Some(driverRef) => driverRef.send(msg)
6.        case None => logWarning(s"Drop $msg because has not yet connected
          to driver")
7.      }
8.    }

上面源碼中,通過參數taskId、state、data構建一個StatusUpdate對象,該對象將被當作消息發送到Driver端,Driver根據返回結果的需要,將會向CoarseGrainedExecutorBackend發送新的指令消息,如LaunchTask、KillTask、StopExecutors、Shutdown等。

5.3.4 ExecutorBackend的異常處理

若CoarseGrainedExecutorBackend在運行中出現異常,將調用exitExecutor方法進行處理,處理以后,系統退出。exitExecutor函數可以由其他子類重載來處理,Executor執行的退出方式不同。例如,當Executor掛掉了,后臺程序可能不會讓父進程也掛掉。如果須通知Driver,Driver將清理掛掉的Executor的數據。

CoarseGrainedExecutorBackend的exitExecutor方法的源碼如下。

1.  protected def exitExecutor(code: Int,
2.                           reason: String,
3.                               throwable: Throwable = null,
4.                               notifyDriver: Boolean = true) = {
5.      val message = "Executor self-exiting due to : " + reason
6.      if (throwable != null) {
7.        logError(message, throwable)
8.      } else {
9.        logError(message)
10.     }
11.
12.     if (notifyDriver && driver.nonEmpty) {
13.       driver.get.ask[Boolean](
14.         RemoveExecutor(executorId, new ExecutorLossReason(reason))
15.       ).onFailure { case e =>
16.         logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
17.       }(ThreadUtils.sameThread)
18.     }
19.
20.     System.exit(code)
21.   }
22. }

CoarseGrainedExecutorBackend在運行中一旦出現異常情況,將調用exitExecutor方法處理。

 Executor向Driver注冊RegisterExecutor失敗。

 Executor收到Driver的RegisteredExecutor注冊成功消息以后,創建Executor實例失敗。

 Driver返回Executor注冊失敗消息RegisterExecutorFailed。

 Executor收到Driver的LaunchTask啟動任務消息,但是Executor為null。

 Executor收到Driver的KillTask消息,但是Executor為null。

 Executor和Driver失去連接。

主站蜘蛛池模板: 江川县| 德庆县| 海南省| 东明县| 渝北区| 岚皋县| 新营市| 石泉县| 分宜县| 临泉县| 藁城市| 洛南县| 新建县| 图们市| 天峻县| 新乡市| 杨浦区| 容城县| 桂东县| 都安| 辉县市| 广元市| 宁晋县| 四平市| 石渠县| 平乡县| 景德镇市| 拜城县| 泗洪县| 乌拉特中旗| 皋兰县| 吉安市| 衡山县| 腾冲县| 克拉玛依市| 济阳县| 龙南县| 东平县| 乌拉特后旗| 五大连池市| 阿鲁科尔沁旗|