- Spark大數(shù)據(jù)商業(yè)實(shí)戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)
- 王家林
- 2765字
- 2019-12-12 17:29:58
5.4 Executor中任務(wù)的執(zhí)行
本節(jié)講解Executor中任務(wù)的加載,通過(guò)launchTask()方法加載任務(wù),將任務(wù)以TaskRunner的形式放入線程池中運(yùn)行;Executor中的任務(wù)線程池可以減少在創(chuàng)建和銷毀線程上所花的時(shí)間和系統(tǒng)資源開(kāi)銷;TaskRunner任務(wù)執(zhí)行失敗處理以及TaskRunner的運(yùn)行內(nèi)幕等內(nèi)容。
5.4.1 Executor中任務(wù)的加載
Executor是基于線程池的任務(wù)執(zhí)行器。通過(guò)launchTask方法加載任務(wù),將任務(wù)以TaskRunner的形式放入線程池中運(yùn)行。
DAGScheduler劃分好Stage通過(guò)submitMissingTasks方法分配好任務(wù),并把任務(wù)交由TaskSchedulerImpl的submitTasks方法,將任務(wù)加入調(diào)度池,之后調(diào)用CoarseGrainedScheduler-Backend的riviveOffers方法為Task分配資源,指定Executor。任務(wù)資源都分配好之后,CoarseGrainedSchedulerBackend將向CoarseGranedExecutorBackend發(fā)送LaunchTask消息,將具體的任務(wù)發(fā)送到Executor上進(jìn)行計(jì)算。
CoarseGranedExecutorBackend匹配到LaunchTask(data)消息之后,將會(huì)調(diào)用Executor的launchTask方法。launchTask方法中將會(huì)構(gòu)建TaskRunner對(duì)象,并放入線程池中執(zhí)行。
Executor中Task的加載時(shí)序圖如圖5-5所示。

圖5-5 Executor中Task的加載時(shí)序圖
任務(wù)加載好后,在Executor中將會(huì)把構(gòu)建好的TaskRunner放入線程池運(yùn)行,至此任務(wù)完成加載,開(kāi)始運(yùn)行。
5.4.2 Executor中的任務(wù)線程池
Executor是構(gòu)建在線程池之上的任務(wù)執(zhí)行器。在Executor中使用線程池的好處是顯而易見(jiàn)的,使用線程池可以減少在創(chuàng)建和銷毀線程上所花的時(shí)間和系統(tǒng)資源開(kāi)銷。如果不使用線程池,可能造成系統(tǒng)創(chuàng)建大量的線程而導(dǎo)致消耗完系統(tǒng)內(nèi)存以及出現(xiàn)“過(guò)度切換”。
為什么Executor中需要線程池?使用線程池基于以下原因:首先,在Executor端執(zhí)行的任務(wù)處理時(shí)間都比較短,需要頻繁地創(chuàng)建和銷毀線程,這樣就帶來(lái)了巨大的創(chuàng)建和銷毀線程的開(kāi)銷,造成額外的系統(tǒng)資源開(kāi)銷;其次,Executor中處理的任務(wù)數(shù)量巨大,如果每個(gè)任務(wù)都創(chuàng)建一個(gè)線程,將導(dǎo)致消耗完系統(tǒng)內(nèi)存,出現(xiàn)“過(guò)度切換”。
首先來(lái)看Executor中的線程池。Executor中使用的是CachedThreadPool,使用這種類型線程池的好處是:任務(wù)比較多時(shí)可以自動(dòng)新增處理線程,而任務(wù)比較少時(shí)自動(dòng)回收空閑線程。
CoarseGrainedExecutorBackend調(diào)用Executor的launchTask方法,將會(huì)新建TaskRunner,然后放入線程池進(jìn)行處理。
從上面的源碼中可以看到,新建的TaskRunner對(duì)象首先放入runningTasks這樣一個(gè)ConcurrentHashMap里面,然后使用線程池的Execute方法運(yùn)行TaskRunner。Execute方法將會(huì)調(diào)用TaskRunner的run方法。在TaskRunner的run方法中執(zhí)行計(jì)算任務(wù)。
5.4.3 任務(wù)執(zhí)行失敗處理
TaskRunner在計(jì)算的過(guò)程中可能發(fā)生各種異常,甚至錯(cuò)誤,如抓取shuffle結(jié)果失敗、任務(wù)被殺死、沒(méi)權(quán)限向HDFS寫入數(shù)據(jù)等。當(dāng)TaskRunner的run方法運(yùn)行的時(shí)候,代碼中通過(guò)try-catch語(yǔ)句捕獲這些異常,并通過(guò)調(diào)用CoarseGrainedExecutorBackend的statusUpdate方法向CoarseGrainedSchedulerBackend匯報(bào)。
下面是CoarseGrainedExecutorBackend的statusUpdate方法的源碼如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. case Some(driverRef) => driverRef.send(msg) 5. case None => logWarning(s"Drop $msg because has not yet connected to driver") 6. } 7. }
在statusUpdate方法中,通過(guò)方法的參數(shù)taskId、state、data構(gòu)建一個(gè)StatusUpdate對(duì)象,并通過(guò)driverRef的send方法將該對(duì)象發(fā)送回CoarseGrainedSheduleBackend。CoarseGrainedScheduleBackend匹配到StatusUpdate時(shí),將根據(jù)StatusUpdate對(duì)象中的state值對(duì)該Task的執(zhí)行情況做出判斷,并執(zhí)行不同的處理邏輯。
從源碼中可以發(fā)現(xiàn)有TaskState對(duì)象,其實(shí)這里的TaskState是一個(gè)枚舉變量,該枚舉變量中包括LAUNCHING、RUNNING、FINISHED、FAILED、KILLED、LOST這些枚舉值,分別對(duì)應(yīng)任務(wù)執(zhí)行的不同狀態(tài)。Executor根據(jù)任務(wù)執(zhí)行的不同狀態(tài),通過(guò)statusUpdate方法返回特定的TaskState值,該值通過(guò)ExecutorBackend返回給SchedulerBackend,在SchedulerBackend中根據(jù)TaskState中的值進(jìn)行處理。
TaskState.scala的源碼如下。
1. private[spark] object TaskState extends Enumeration { 2. 3. val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value 4. 5. private val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST) 6. 7. type TaskState = Value 8. 9. def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state) 10. 11. def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains (state) 12. } 13.
以TaskState.FAILED這種情況為例,在Executor的run方法中,如果發(fā)生FetchFailed-Exeception、CommitDeniedExeception或其他Throwable的子類的異常,就會(huì)返回TaskState. FAILED狀態(tài),該狀態(tài)通過(guò)CoaseGainedExecutorBackend返回。在CoaseGaiendScheduler-Backend中,匹配到StatusUpdate消息,將進(jìn)行相應(yīng)的處理,匹配代碼如下所示。
CoarseGrainedSchedulerBackend.scala的StatusUpdate的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. //調(diào)用TaskSchedulerImpl更新?tīng)顟B(tài) 4. scheduler.statusUpdate(taskId, state, data.value) 5. //若狀態(tài)為FINISHED,則從executorDataMap中取出executorId對(duì)應(yīng)的ExecutorInfo 6. if (TaskState.isFinished(state)) { 7. executorDataMap.get(executorId) match { 8. case Some(executorInfo) => 9. executorInfo.freeCores += scheduler.CPUS_PER_TASK 10. //用makeOffers方法重新分配資源 11. makeOffers(executorId) 12. case None => 13. //因?yàn)椴恢纄xecutor,忽略更新 14. logWarning(s"Ignored task status update ($taskId state $state) " + 15. s"from unknown executor with ID $executorId") 16. 17. } 18. }
上面的代碼中,首先調(diào)用TaskSchedulerImpl的statusUpdate方法,該方法用于更新taskId對(duì)應(yīng)任務(wù)的狀態(tài)。完成更新之后,判斷state狀態(tài)是否FINISHED,若狀態(tài)為FINISHED,則從executorDataMap這個(gè)哈希表中取出executorId對(duì)應(yīng)的ExecutorData對(duì)象,修改該對(duì)象中的freeCores。因?yàn)闋顟B(tài)已經(jīng)為FINISHED,因此ExecutorData中的freeCores會(huì)增加CPUS_PER_TASK個(gè),這里的CPU_PER_TASK為每個(gè)任務(wù)占用的CPU核的個(gè)數(shù),該個(gè)數(shù)可以通過(guò)spark.task.cpus配置項(xiàng)進(jìn)行配置。
更新完成ExecutorData上的可用CPU后,這些閑置的CPU通過(guò)makeOffers方法再次分配給其他任務(wù)使用。
Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源碼如下。
1. private def makeOffers(executorId: String) { 2. //過(guò)濾存活的Executor 3. if (executorIsAlive(executorId)) { 4. //從 executorDataMap 這個(gè)哈希表中取出 executorId 對(duì)應(yīng)的 ExecutorData 對(duì)象, //ExecutorData表示Executor上的一組資源 5. val executorData = executorDataMap(executorId) 6. //使用executorData創(chuàng)建WorkerOffer對(duì)象,該對(duì)象代表Executor上可用的資源 7. val workOffers= IndexedSeq( 8. new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) 9. 10. //調(diào)用TaskSchedulerImpl上的resourceOffers方法,為任務(wù)分配運(yùn)行資源,該方法返 //回獲得運(yùn)行資源的任務(wù)集合,之后運(yùn)行l(wèi)aunchTasks方法,將這些任務(wù)發(fā)送到Executor //上運(yùn)行 11. launchTasks(scheduler.resourceOffers(workOffers)) 12. } 13. }
Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。
上段代碼中第3行之前新增加了同步鎖CoarseGrainedSchedulerBackend.this.s ynchronized,在執(zhí)行某項(xiàng)任務(wù)task時(shí),確保沒(méi)有executor被殺死。
上段代碼中第11行之前增加scheduler.resourceOffers(workOffers)以及Seq.empty為空的情況。
上段代碼中第11行l(wèi)aunchTasks方法調(diào)整增加taskDescs不為空的邏輯判斷。
1. ...... 2. //執(zhí)行某項(xiàng)任務(wù)時(shí),確保沒(méi)有Executor被殺死 3. val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { 4. ...... 5. scheduler.resourceOffers(workOffers) 6. } else { 7. Seq.empty 8. } 9. } 10. if (!taskDescs.isEmpty) { 11. launchTasks(taskDescs) 12. } 13. }
每個(gè)Executor上的資源發(fā)生變動(dòng)時(shí),都將調(diào)用makeOffers方法,該方法的作用是為等待執(zhí)行的任務(wù)分配資源,并通過(guò)launchTasks方法將這些任務(wù)發(fā)送到這些Executor上運(yùn)行。這些任務(wù)將被包裝成TaskRenner對(duì)象,運(yùn)行于Executor上的線程池中。
5.4.4 揭秘TaskRunner
TaskRunner位于Executor中,繼承自Runnable接口,代表一個(gè)可執(zhí)行的任務(wù)。Driver端下發(fā)的任務(wù)最終都要在Executor中封裝成TaskRunner。在TaskRunner的run方法中,將會(huì)進(jìn)行任務(wù)的解析,并調(diào)用Task接口的run方法進(jìn)行計(jì)算。TaskRunner定義的代碼如下所示。
Spark 2.1.1版本的Executor.scala的源碼如下。
1. class TaskRunner( 2. execBackend: ExecutorBackend,//通過(guò)execBackend和SchedulerBakend通信 3. val taskId: Long, 4. val attemptNumber: Int, 5. taskName: String, 6. serializedTask: ByteBuffer) 7. extends Runnable {
Spark 2.2.0版本的Executor.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第3~6行TaskRunner中的成員變量調(diào)整為新增成員變量TaskDescription。
1. ...... 2. private val taskDescription: TaskDescription) 3. .....
TaskRunner的構(gòu)造函數(shù)中有execBackend、taskId、attemptNumber、taskName、serializedTask 5個(gè)參數(shù)。其中,execBackend作為和CoarseGrainedSchedulerBackend通信的使者傳入到TaskRunner中,在任務(wù)計(jì)算狀態(tài)發(fā)生變化的時(shí)候,調(diào)用execBackend的statusUpdate方法向CoarseGrainedSchedulerBackend報(bào)告。傳入taskId是為了使用TaskMemoryManager管理該Task。attemptNumber代表任務(wù)嘗試執(zhí)行的次數(shù),serializedTask是序列化的任務(wù)。序列化的任務(wù)通過(guò)序列化工具反序列化得到任務(wù)對(duì)象。
在TaskRunner中是如何運(yùn)行任務(wù)的?我們知道,在線程池中啟動(dòng)Runnable任務(wù)會(huì)自動(dòng)調(diào)用Runnable的run方法,TaskRunner作為一個(gè)Runnable接口的實(shí)現(xiàn)類,啟動(dòng)時(shí)會(huì)自動(dòng)調(diào)用其run方法。run方法主要完成以下任務(wù)。
調(diào)用ExecutorBackend的statusUpdate方法向SchedulerBackend發(fā)送任務(wù)狀態(tài)更新消息。
反序列化出Task和相關(guān)依賴Jar包。
調(diào)用Task上的run方法運(yùn)行任務(wù)。
返回Task運(yùn)行結(jié)果。
Task是一個(gè)接口,ResultTask和ShffleMapTask是其兩種實(shí)現(xiàn)。Task接口中提供了run方法,用于運(yùn)行任務(wù)。TaskRunner的run方法中,會(huì)通過(guò)反序列化器反序列化出Task,并調(diào)用Task上的run方法運(yùn)行任務(wù),這里怎么知道是ResultTask,還是ShffleMapTask呢?其實(shí),這里不管是ResultTask,還是ShffleMapTask,都一視同仁,因?yàn)镽esultTask和ShffleMapTask都實(shí)現(xiàn)了Task接口,都有run方法。這正是面向接口編程帶來(lái)的最大的好處,靈活且最大限度地復(fù)用代碼。
Task運(yùn)行結(jié)果的處理情況有3種:第一種情況是resultSize大于maxResultSize,這種情況下構(gòu)建IndirectTaskResult對(duì)象,并返回該IndirectTaskResult對(duì)象,IndirectTaskResult對(duì)象中包含結(jié)果所在的BlockId,在SchedulerBackend中可以通過(guò)BlockManager獲得該BlockId對(duì)應(yīng)的結(jié)果數(shù)據(jù),這里的maxResultSize默認(rèn)為1GB;第二種情況是resultSize大于Akka幀的大小,這種情況下也是構(gòu)建IndirectTaskResult對(duì)象,并返回該IndirectTaskResult對(duì)象,Akka幀的大小為128MB;第三種情況是直接返回DirectTaskResult,這是在resultSize小于Akka幀大小的情況下采取的默認(rèn)返回方式。
- 構(gòu)建高質(zhì)量的C#代碼
- 精通MATLAB神經(jīng)網(wǎng)絡(luò)
- AWS:Security Best Practices on AWS
- 一本書玩轉(zhuǎn)數(shù)據(jù)分析(雙色圖解版)
- 可編程控制器技術(shù)應(yīng)用(西門子S7系列)
- 悟透AutoCAD 2009完全自學(xué)手冊(cè)
- Docker on Amazon Web Services
- 網(wǎng)站入侵與腳本攻防修煉
- 計(jì)算機(jī)組網(wǎng)技術(shù)
- Mastering pfSense
- Excel 2010函數(shù)與公式速查手冊(cè)
- IBM? SmartCloud? Essentials
- 重估:人工智能與賦能社會(huì)
- 數(shù)據(jù)要素:全球經(jīng)濟(jì)社會(huì)發(fā)展的新動(dòng)力
- FANUC工業(yè)機(jī)器人配置與編程技術(shù)