官术网_书友最值得收藏!

5.4 Executor中任務(wù)的執(zhí)行

本節(jié)講解Executor中任務(wù)的加載,通過(guò)launchTask()方法加載任務(wù),將任務(wù)以TaskRunner的形式放入線程池中運(yùn)行;Executor中的任務(wù)線程池可以減少在創(chuàng)建和銷毀線程上所花的時(shí)間和系統(tǒng)資源開(kāi)銷;TaskRunner任務(wù)執(zhí)行失敗處理以及TaskRunner的運(yùn)行內(nèi)幕等內(nèi)容。

5.4.1 Executor中任務(wù)的加載

Executor是基于線程池的任務(wù)執(zhí)行器。通過(guò)launchTask方法加載任務(wù),將任務(wù)以TaskRunner的形式放入線程池中運(yùn)行。

DAGScheduler劃分好Stage通過(guò)submitMissingTasks方法分配好任務(wù),并把任務(wù)交由TaskSchedulerImpl的submitTasks方法,將任務(wù)加入調(diào)度池,之后調(diào)用CoarseGrainedScheduler-Backend的riviveOffers方法為Task分配資源,指定Executor。任務(wù)資源都分配好之后,CoarseGrainedSchedulerBackend將向CoarseGranedExecutorBackend發(fā)送LaunchTask消息,將具體的任務(wù)發(fā)送到Executor上進(jìn)行計(jì)算。

CoarseGranedExecutorBackend匹配到LaunchTask(data)消息之后,將會(huì)調(diào)用Executor的launchTask方法。launchTask方法中將會(huì)構(gòu)建TaskRunner對(duì)象,并放入線程池中執(zhí)行。

Executor中Task的加載時(shí)序圖如圖5-5所示。

圖5-5 Executor中Task的加載時(shí)序圖

任務(wù)加載好后,在Executor中將會(huì)把構(gòu)建好的TaskRunner放入線程池運(yùn)行,至此任務(wù)完成加載,開(kāi)始運(yùn)行。

5.4.2 Executor中的任務(wù)線程池

Executor是構(gòu)建在線程池之上的任務(wù)執(zhí)行器。在Executor中使用線程池的好處是顯而易見(jiàn)的,使用線程池可以減少在創(chuàng)建和銷毀線程上所花的時(shí)間和系統(tǒng)資源開(kāi)銷。如果不使用線程池,可能造成系統(tǒng)創(chuàng)建大量的線程而導(dǎo)致消耗完系統(tǒng)內(nèi)存以及出現(xiàn)“過(guò)度切換”。

為什么Executor中需要線程池?使用線程池基于以下原因:首先,在Executor端執(zhí)行的任務(wù)處理時(shí)間都比較短,需要頻繁地創(chuàng)建和銷毀線程,這樣就帶來(lái)了巨大的創(chuàng)建和銷毀線程的開(kāi)銷,造成額外的系統(tǒng)資源開(kāi)銷;其次,Executor中處理的任務(wù)數(shù)量巨大,如果每個(gè)任務(wù)都創(chuàng)建一個(gè)線程,將導(dǎo)致消耗完系統(tǒng)內(nèi)存,出現(xiàn)“過(guò)度切換”。

首先來(lái)看Executor中的線程池。Executor中使用的是CachedThreadPool,使用這種類型線程池的好處是:任務(wù)比較多時(shí)可以自動(dòng)新增處理線程,而任務(wù)比較少時(shí)自動(dòng)回收空閑線程。

CoarseGrainedExecutorBackend調(diào)用Executor的launchTask方法,將會(huì)新建TaskRunner,然后放入線程池進(jìn)行處理。

從上面的源碼中可以看到,新建的TaskRunner對(duì)象首先放入runningTasks這樣一個(gè)ConcurrentHashMap里面,然后使用線程池的Execute方法運(yùn)行TaskRunner。Execute方法將會(huì)調(diào)用TaskRunner的run方法。在TaskRunner的run方法中執(zhí)行計(jì)算任務(wù)。

5.4.3 任務(wù)執(zhí)行失敗處理

TaskRunner在計(jì)算的過(guò)程中可能發(fā)生各種異常,甚至錯(cuò)誤,如抓取shuffle結(jié)果失敗、任務(wù)被殺死、沒(méi)權(quán)限向HDFS寫入數(shù)據(jù)等。當(dāng)TaskRunner的run方法運(yùn)行的時(shí)候,代碼中通過(guò)try-catch語(yǔ)句捕獲這些異常,并通過(guò)調(diào)用CoarseGrainedExecutorBackend的statusUpdate方法向CoarseGrainedSchedulerBackend匯報(bào)。

下面是CoarseGrainedExecutorBackend的statusUpdate方法的源碼如下。

1.      override def statusUpdate(taskId: Long, state: TaskState, data:
        ByteBuffer) {
2.    val msg = StatusUpdate(executorId, taskId, state, data)
3.    driver match {
4.      case Some(driverRef) => driverRef.send(msg)
5.      case None => logWarning(s"Drop $msg because has not yet connected
        to driver")
6.    }
7.  }

在statusUpdate方法中,通過(guò)方法的參數(shù)taskId、state、data構(gòu)建一個(gè)StatusUpdate對(duì)象,并通過(guò)driverRef的send方法將該對(duì)象發(fā)送回CoarseGrainedSheduleBackend。CoarseGrainedScheduleBackend匹配到StatusUpdate時(shí),將根據(jù)StatusUpdate對(duì)象中的state值對(duì)該Task的執(zhí)行情況做出判斷,并執(zhí)行不同的處理邏輯。

從源碼中可以發(fā)現(xiàn)有TaskState對(duì)象,其實(shí)這里的TaskState是一個(gè)枚舉變量,該枚舉變量中包括LAUNCHING、RUNNING、FINISHED、FAILED、KILLED、LOST這些枚舉值,分別對(duì)應(yīng)任務(wù)執(zhí)行的不同狀態(tài)。Executor根據(jù)任務(wù)執(zhí)行的不同狀態(tài),通過(guò)statusUpdate方法返回特定的TaskState值,該值通過(guò)ExecutorBackend返回給SchedulerBackend,在SchedulerBackend中根據(jù)TaskState中的值進(jìn)行處理。

TaskState.scala的源碼如下。

1.  private[spark] object TaskState extends Enumeration {
2.
3.   val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
4.
5.   private val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
6.
7.   type TaskState = Value
8.
9.   def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED ==
      state)
10.
11.   def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains
      (state)
12. }
13.

以TaskState.FAILED這種情況為例,在Executor的run方法中,如果發(fā)生FetchFailed-Exeception、CommitDeniedExeception或其他Throwable的子類的異常,就會(huì)返回TaskState. FAILED狀態(tài),該狀態(tài)通過(guò)CoaseGainedExecutorBackend返回。在CoaseGaiendScheduler-Backend中,匹配到StatusUpdate消息,將進(jìn)行相應(yīng)的處理,匹配代碼如下所示。

CoarseGrainedSchedulerBackend.scala的StatusUpdate的源碼如下。

1.     override def receive: PartialFunction[Any, Unit] = {
2.        case StatusUpdate(executorId, taskId, state, data) =>
3.  //調(diào)用TaskSchedulerImpl更新?tīng)顟B(tài)
4.          scheduler.statusUpdate(taskId, state, data.value)
5.  //若狀態(tài)為FINISHED,則從executorDataMap中取出executorId對(duì)應(yīng)的ExecutorInfo
6.          if (TaskState.isFinished(state)) {
7.            executorDataMap.get(executorId) match {
8.              case Some(executorInfo) =>
9.                executorInfo.freeCores += scheduler.CPUS_PER_TASK
10. //用makeOffers方法重新分配資源
11.               makeOffers(executorId)
12.             case None =>
13.               //因?yàn)椴恢纄xecutor,忽略更新
14.               logWarning(s"Ignored task status update ($taskId state $state) " +
15.                 s"from unknown executor with ID $executorId")
16.
17.           }
18.         }

上面的代碼中,首先調(diào)用TaskSchedulerImpl的statusUpdate方法,該方法用于更新taskId對(duì)應(yīng)任務(wù)的狀態(tài)。完成更新之后,判斷state狀態(tài)是否FINISHED,若狀態(tài)為FINISHED,則從executorDataMap這個(gè)哈希表中取出executorId對(duì)應(yīng)的ExecutorData對(duì)象,修改該對(duì)象中的freeCores。因?yàn)闋顟B(tài)已經(jīng)為FINISHED,因此ExecutorData中的freeCores會(huì)增加CPUS_PER_TASK個(gè),這里的CPU_PER_TASK為每個(gè)任務(wù)占用的CPU核的個(gè)數(shù),該個(gè)數(shù)可以通過(guò)spark.task.cpus配置項(xiàng)進(jìn)行配置。

更新完成ExecutorData上的可用CPU后,這些閑置的CPU通過(guò)makeOffers方法再次分配給其他任務(wù)使用。

Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源碼如下。

1.         private def makeOffers(executorId: String) {
2.         //過(guò)濾存活的Executor
3.       if (executorIsAlive(executorId)) {
4.  //從  executorDataMap    這個(gè)哈希表中取出 executorId   對(duì)應(yīng)的  ExecutorData  對(duì)象,
    //ExecutorData表示Executor上的一組資源
5.         val executorData = executorDataMap(executorId)
6.  //使用executorData創(chuàng)建WorkerOffer對(duì)象,該對(duì)象代表Executor上可用的資源
7.      val workOffers= IndexedSeq(
8.           new WorkerOffer(executorId, executorData.executorHost,
             executorData.freeCores))
9.
10.  //調(diào)用TaskSchedulerImpl上的resourceOffers方法,為任務(wù)分配運(yùn)行資源,該方法返
     //回獲得運(yùn)行資源的任務(wù)集合,之后運(yùn)行l(wèi)aunchTasks方法,將這些任務(wù)發(fā)送到Executor
     //上運(yùn)行
11.      launchTasks(scheduler.resourceOffers(workOffers))
12.       }
13.     }

Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。

 上段代碼中第3行之前新增加了同步鎖CoarseGrainedSchedulerBackend.this.s ynchronized,在執(zhí)行某項(xiàng)任務(wù)task時(shí),確保沒(méi)有executor被殺死。

 上段代碼中第11行之前增加scheduler.resourceOffers(workOffers)以及Seq.empty為空的情況。

 上段代碼中第11行l(wèi)aunchTasks方法調(diào)整增加taskDescs不為空的邏輯判斷。

1.     ......
2.     //執(zhí)行某項(xiàng)任務(wù)時(shí),確保沒(méi)有Executor被殺死
3.     val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
4.    ......
5.         scheduler.resourceOffers(workOffers)
6.       } else {
7.         Seq.empty
8.       }
9.     }
10.    if (!taskDescs.isEmpty) {
11.      launchTasks(taskDescs)
12.    }
13.  }

每個(gè)Executor上的資源發(fā)生變動(dòng)時(shí),都將調(diào)用makeOffers方法,該方法的作用是為等待執(zhí)行的任務(wù)分配資源,并通過(guò)launchTasks方法將這些任務(wù)發(fā)送到這些Executor上運(yùn)行。這些任務(wù)將被包裝成TaskRenner對(duì)象,運(yùn)行于Executor上的線程池中。

5.4.4 揭秘TaskRunner

TaskRunner位于Executor中,繼承自Runnable接口,代表一個(gè)可執(zhí)行的任務(wù)。Driver端下發(fā)的任務(wù)最終都要在Executor中封裝成TaskRunner。在TaskRunner的run方法中,將會(huì)進(jìn)行任務(wù)的解析,并調(diào)用Task接口的run方法進(jìn)行計(jì)算。TaskRunner定義的代碼如下所示。

Spark 2.1.1版本的Executor.scala的源碼如下。

1. class TaskRunner(
2.       execBackend: ExecutorBackend,//通過(guò)execBackend和SchedulerBakend通信
3.       val taskId: Long,
4.       val attemptNumber: Int,
5.       taskName: String,
6.       serializedTask: ByteBuffer)
7.     extends Runnable {

Spark 2.2.0版本的Executor.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第3~6行TaskRunner中的成員變量調(diào)整為新增成員變量TaskDescription。

1.   ......
2.      private val taskDescription: TaskDescription)
3.  .....

TaskRunner的構(gòu)造函數(shù)中有execBackend、taskId、attemptNumber、taskName、serializedTask 5個(gè)參數(shù)。其中,execBackend作為和CoarseGrainedSchedulerBackend通信的使者傳入到TaskRunner中,在任務(wù)計(jì)算狀態(tài)發(fā)生變化的時(shí)候,調(diào)用execBackend的statusUpdate方法向CoarseGrainedSchedulerBackend報(bào)告。傳入taskId是為了使用TaskMemoryManager管理該Task。attemptNumber代表任務(wù)嘗試執(zhí)行的次數(shù),serializedTask是序列化的任務(wù)。序列化的任務(wù)通過(guò)序列化工具反序列化得到任務(wù)對(duì)象。

在TaskRunner中是如何運(yùn)行任務(wù)的?我們知道,在線程池中啟動(dòng)Runnable任務(wù)會(huì)自動(dòng)調(diào)用Runnable的run方法,TaskRunner作為一個(gè)Runnable接口的實(shí)現(xiàn)類,啟動(dòng)時(shí)會(huì)自動(dòng)調(diào)用其run方法。run方法主要完成以下任務(wù)。

 調(diào)用ExecutorBackend的statusUpdate方法向SchedulerBackend發(fā)送任務(wù)狀態(tài)更新消息。

 反序列化出Task和相關(guān)依賴Jar包。

 調(diào)用Task上的run方法運(yùn)行任務(wù)。

 返回Task運(yùn)行結(jié)果。

Task是一個(gè)接口,ResultTask和ShffleMapTask是其兩種實(shí)現(xiàn)。Task接口中提供了run方法,用于運(yùn)行任務(wù)。TaskRunner的run方法中,會(huì)通過(guò)反序列化器反序列化出Task,并調(diào)用Task上的run方法運(yùn)行任務(wù),這里怎么知道是ResultTask,還是ShffleMapTask呢?其實(shí),這里不管是ResultTask,還是ShffleMapTask,都一視同仁,因?yàn)镽esultTask和ShffleMapTask都實(shí)現(xiàn)了Task接口,都有run方法。這正是面向接口編程帶來(lái)的最大的好處,靈活且最大限度地復(fù)用代碼。

Task運(yùn)行結(jié)果的處理情況有3種:第一種情況是resultSize大于maxResultSize,這種情況下構(gòu)建IndirectTaskResult對(duì)象,并返回該IndirectTaskResult對(duì)象,IndirectTaskResult對(duì)象中包含結(jié)果所在的BlockId,在SchedulerBackend中可以通過(guò)BlockManager獲得該BlockId對(duì)應(yīng)的結(jié)果數(shù)據(jù),這里的maxResultSize默認(rèn)為1GB;第二種情況是resultSize大于Akka幀的大小,這種情況下也是構(gòu)建IndirectTaskResult對(duì)象,并返回該IndirectTaskResult對(duì)象,Akka幀的大小為128MB;第三種情況是直接返回DirectTaskResult,這是在resultSize小于Akka幀大小的情況下采取的默認(rèn)返回方式。

主站蜘蛛池模板: 垣曲县| 双辽市| 吉安县| 平果县| 庄河市| 乌鲁木齐市| 中超| 湖南省| 屯门区| 桐乡市| 万盛区| 玛多县| 呼伦贝尔市| 方城县| 东平县| 师宗县| 阿坝县| 茂名市| 奉节县| 西充县| 丘北县| 潞西市| 滁州市| 崇明县| 乌兰察布市| 五常市| 纳雍县| 阿拉善左旗| 红桥区| 延川县| 大兴区| 太原市| 新闻| 大足县| 菏泽市| 娱乐| 连山| 庄浪县| 舞阳县| 长宁县| 正蓝旗|