- Spark大數(shù)據(jù)商業(yè)實(shí)戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)
- 王家林
- 5745字
- 2019-12-12 17:29:54
3.7 Spark RDD中Runtime流程解析
本節(jié)講解Spark的Runtime架構(gòu)圖,并從一個(gè)作業(yè)的視角通過Driver、Master、Worker、Executor等角色透視Spark的Runtime生命周期。
3.7.1 Runtime架構(gòu)圖
(1)從Spark Runtime的角度講,包括五大核心對象:Master、Worker、Executor、Driver、CoarseGrainedExecutorBackend。
(2)Spark在做分布式集群系統(tǒng)設(shè)計(jì)的時(shí)候:最大化功能獨(dú)立、模塊化封裝具體獨(dú)立的對象、強(qiáng)內(nèi)聚松耦合。Spark運(yùn)行架構(gòu)圖如圖3-7所示。

圖3-7 Spark運(yùn)行架構(gòu)圖
(3)當(dāng)Driver中的SparkContext初始化時(shí)會提交程序給Master,Master如果接受該程序在Spark中運(yùn)行,就會為當(dāng)前的程序分配AppID,同時(shí)會分配具體的計(jì)算資源。需要特別注意的是,Master是根據(jù)當(dāng)前提交程序的配置信息來給集群中的Worker發(fā)指令分配具體的計(jì)算資源,但是,Master發(fā)出指令后并不關(guān)心具體的資源是否已經(jīng)分配,換言之,Master是發(fā)指令后就記錄了分配的資源,以后客戶端再次提交其他的程序,就不能使用該資源了。其弊端是可能會導(dǎo)致其他要提交的程序無法分配到本來應(yīng)該可以分配到的計(jì)算資源;最終的優(yōu)勢是Spark分布式系統(tǒng)功能在耦合的基礎(chǔ)上最快地運(yùn)行系統(tǒng)(否則如果Master要等到資源最終分配成功后才通知Driver,就會造成Driver阻塞,不能夠最大化并行計(jì)算資源的使用率)。需要補(bǔ)充說明的是:Spark在默認(rèn)情況下由于集群中一般都只有一個(gè)Application在運(yùn)行,所有Master分配資源策略的弊端就沒有那么明顯了。
3.7.2 生命周期
本節(jié)對Spark Runtime(Driver、Master、Worker、Executor)內(nèi)幕解密,從Spark Runtime全局的角度看Spark具體是怎么工作的,從一個(gè)作業(yè)的視角通過Driver、Master、Worker、Executor等角色來透視Spark的Runtime生命周期。
Job提交過程源碼解密中一個(gè)非常重要的技巧是通過在spark-shell中運(yùn)行一個(gè)Job來了解Job提交的過程,然后再用源碼驗(yàn)證這個(gè)過程。我們可以在spark-shell中運(yùn)行一個(gè)程序,從控制臺觀察日志。
1. ,sc.textFile("/library/dataforSortedShufffle").flatMap(_.split(" ")).map (word => (word, 1).reduceByKey(_+_)saveAsTextFile("/library/dataoutput2")
這里我們編寫WordCountJobRuntime.scala代碼,從IDEA中觀察日志。讀入的數(shù)據(jù)源文件內(nèi)容如下。
1. Hello Spark Hello Scala 2. Hello Hadoop 3. Hello Flink 4. Spark is Awesome
WordCountJobRuntime.scala的代碼如下。
1. package com.dt.spark.sparksql 2. 3. import org.apache.log4j.{Level, Logger} 4. import org.apache.spark.{SparkConf, SparkContext} 5. 6. /** 7. * 使用Scala開發(fā)本地測試的Spark WordCount程序 8. * @author DT大數(shù)據(jù)夢工廠 9. * 新浪微博:http://weibo.com/ilovepains/ 10. */ 11. object WordCountJobRuntime { 12. def main(args: Array[String]){ 13. Logger.getLogger("org").setLevel(Level.ALL) 14. /** 15. * 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序運(yùn)行時(shí)的配置信息, * 例如,通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置 * 為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置非常差(如只有1GB的內(nèi) * 存)的初學(xué)者 * 16. */ 17. 18. 19. val conf = new SparkConf() //創(chuàng)建SparkConf對象 20. conf.setAppName("Wow,WordCountJobRuntime!") //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱 21. conf.setMaster("local") //此時(shí),程序在本地運(yùn)行,不需要安裝Spark集群 22. 23. /** 24. * 第2步:創(chuàng)建SparkContext對象 25. * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、 * R等,都必須有一個(gè)SparkContext 26. * SparkContext 核心作用:初始化 Spark 應(yīng)用程序運(yùn)行所需要的核心組件,包括 * DAGScheduler、TaskScheduler、SchedulerBackend,同時(shí)還會負(fù)責(zé)Spark程序 * 往Master注冊程序等 27. * SparkContext是整個(gè)Spark應(yīng)用程序中至關(guān)重要的一個(gè)對象 28. */ 29. val sc = new SparkContext(conf) //創(chuàng)建SparkContext對象,通過傳入SparkConf //實(shí)例來定制Spark運(yùn)行的具體參數(shù)和配置信息 30. 31. /** 32. * 第 3 步:根據(jù)具體的數(shù)據(jù)來源(如 HDFS、HBase、Local FS、DB、S3 等)通過 * SparkContext創(chuàng)建RDD 33. * RDD的創(chuàng)建有3種方式:根據(jù)外部的數(shù)據(jù)來源(如HDFS)、根據(jù)Scala集合、由其他 * 的RDD操作 34. * 數(shù)據(jù)會被RDD劃分為一系列的Partitions,分配到每個(gè)Partition的數(shù)據(jù)屬于一 * 個(gè)Task的處理范疇 35. */ 36. val lines = sc.textFile("data/wordcount/helloSpark.txt") 37. /** 38. * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等 * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算 39. * 第4.1步:將每一行的字符串拆分成單個(gè)單詞 40. */ 41. 42. val words = lines.flatMap { line => line.split(" ")} //對每一行的字符串進(jìn)行單詞拆分,并把所有行的拆 //分結(jié)果通過flat合并成為一個(gè)大的單詞集合 43. 44. /** 45. * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等 * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算 46. * 第4.2步:在單詞拆分的基礎(chǔ)上對每個(gè)單詞實(shí)例計(jì)數(shù)為1,也就是word => (word, 1) 47. */ 48. val pairs = words.map { word => (word, 1) } 49. 50. /** 51. * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等 * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算 52. * 第4.3步:在每個(gè)單詞實(shí)例計(jì)數(shù)為1的基礎(chǔ)上統(tǒng)計(jì)每個(gè)單詞在文件中出現(xiàn)的總次數(shù) 53. */ 54. val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("data/ wordcount/wordCountResult.log") 55. 56. while(true){ 57. 58. } 59. sc.stop() 60. 61. } 62. }
在IDEA中運(yùn)行,WordCountJobRuntime的運(yùn)行結(jié)果保存在data/wordcount/ wordCountResult.log目錄的part-00000中。
1. (Awesome,1) 2. (Flink,1) 3. (Spark,2) 4. (is,1) 5. (Hello,4) 6. (Scala,1) 7. (Hadoop,1)
在IDEA的控制臺中觀察WordCountJobRuntime.scala運(yùn)行日志,這里Spark版本是version 2.1.0。其中,MemoryStore是從Storge內(nèi)存角度來看的,Storge是磁盤管理和內(nèi)存管理。這里,Spark讀取了Hadoop的HDFS,因此使用了Hadoop的內(nèi)容,如FileInputFormat,日志中顯示FileInputFormat: Total input paths to process : 1說明有一個(gè)文件要處理。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/05/24 05:48:20 INFO SparkContext: Running Spark version 2.1.0 3. ...... 4. 17/05/24 05:48:24 DEBUG DiskBlockManager: Adding shutdown hook 5. 17/05/24 05:48:24 DEBUG ShutdownHookManager: Adding shutdown hook 6. 17/05/24 05:48:24 INFO MemoryStore: MemoryStore started with capacity 637.2 MB 7. 17/05/24 05:48:24 INFO SparkEnv: Registering OutputCommitCoordinator 8. ...... 9. 17/05/24 05:48:27 DEBUG HadoopRDD: Creating new JobConf and caching it for later re-use 10. 17/05/24 05:48:27 DEBUG FileInputFormat: Time taken to get FileStatuses: 28 11. 17/05/24 05:48:27 INFO FileInputFormat: Total input paths to process : 1 12. 17/05/24 05:48:27 DEBUG FileInputFormat: Total # of splits generated by getSplits: 1, TimeTaken: 48 13. ......
在Spark中,所有的Action都會觸發(fā)至少一個(gè)Job,在WordCountJobRuntime.scala代碼中,是通過saveAsTextFile來觸發(fā)Job的。在日志中查看SparkContext: Starting job: saveAsTextFile觸發(fā)saveAsTextFile。緊接著交給DAGScheduler,日志中顯示DAGScheduler: Registering RDD,因?yàn)檫@里有兩個(gè)Stage,從具體計(jì)算的角度,前面Stage計(jì)算的時(shí)候保留輸出。然后是DAGScheduler獲得了job的ID(job 0)。
1. 17/05/24 05:48:28 INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:61 2. 17/05/24 05:48:28 DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined 3. 17/05/24 05:48:28 INFO DAGScheduler: Registering RDD 3 (map at WordCountJobRuntime.scala:55) 4. 17/05/24 05:48:28 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCountJobRuntime.scala:61) with 1 output partitions 5. ......
SparkContext在實(shí)例化的時(shí)候會構(gòu)造StandaloneSchedulerBackend(Spark 2.0版本將之前的SparkDeploySchedulerBackend名字更新為StandaloneSchedulerBackend)、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster等對象。
其中,StandaloneSchedulerBackend負(fù)責(zé)集群計(jì)算資源的管理和調(diào)度,這是從作業(yè)的角度來考慮的,注冊給Master的時(shí)候,Master給我們分配資源,資源從Executor本身轉(zhuǎn)過來向StandaloneSchedulerBackend注冊,這是從作業(yè)調(diào)度的角度來考慮的,不是從整個(gè)集群來考慮,整個(gè)集群是Master來管理計(jì)算資源的。
DAGScheduler負(fù)責(zé)高層調(diào)度(如Job中Stage的劃分、數(shù)據(jù)本地性等內(nèi)容)。
TaskSchedulerImple負(fù)責(zé)具體Stage內(nèi)部的底層調(diào)度(如具體每個(gè)Task的調(diào)度、Task的容錯(cuò)等)。
MapOutputTrackerMaster負(fù)責(zé)Shuffle中數(shù)據(jù)輸出和讀取的管理。Shuffle的時(shí)候?qū)?shù)據(jù)寫到本地,下一個(gè)Stage要使用上一個(gè)Stage的數(shù)據(jù),因此寫數(shù)據(jù)的時(shí)候要告訴Driver中的MapOutputTrackerMaster具體寫到哪里,下一個(gè)Stage讀取數(shù)據(jù)的時(shí)候也要訪問Driver的MapOutputTrackerMaster獲取數(shù)據(jù)的具體位置。
MapOutputTrackerMaster的源碼如下。
1. private[spark] class MapOutputTrackerMaster(conf: SparkConf, 2. broadcastManager: BroadcastManager, isLocal: Boolean) 3. extends MapOutputTracker(conf) {
DAGScheduler是面向Stage調(diào)度的高層調(diào)度實(shí)現(xiàn)。它為每一個(gè)Job計(jì)算DAG,跟蹤RDDS及Stage輸出結(jié)果進(jìn)行物化,并找到一個(gè)最小的計(jì)劃去運(yùn)行Job,然后提交stages中的TaskSets到底層調(diào)度器TaskScheduler提交集群運(yùn)行,TaskSet包含完全獨(dú)立的任務(wù),基于集群上已存在的數(shù)據(jù)運(yùn)行(如從上一個(gè)Stage輸出的文件),如果這個(gè)數(shù)據(jù)不可用,獲取數(shù)據(jù)可能會失敗。
Spark Stages根據(jù)RDD圖中Shuffle的邊界來創(chuàng)建,如果RDD的操作是窄依賴,如map()和filter(),在每個(gè)Stages中將一系列tasks組合成流水線執(zhí)行。但是,如果是寬依賴,Shuffle依賴需要多個(gè)Stages(上一個(gè)Stage進(jìn)行map輸出寫入文件,下一個(gè)Stage讀取數(shù)據(jù)文件),每個(gè)Stage依賴于其他的Stage,其中進(jìn)行多個(gè)算子操作。算子操作在各種類型的RDDS(如MappedRDD、FilteredRDD)的RDD.compute()中實(shí)際執(zhí)行。
在DAG階段,DAGScheduler根據(jù)當(dāng)前緩存狀態(tài)決定每個(gè)任務(wù)運(yùn)行的位置,并將任務(wù)傳遞給底層的任務(wù)調(diào)度器TaskScheduler。此外,它處理Shuffle輸出文件丟失的故障,在這種情況下,以前的Stage可能需要重新提交。Stage中不引起Shuffle文件丟失的故障由任務(wù)調(diào)度器TaskScheduler處理,在取消整個(gè)Stage前,將重試幾次任務(wù)。
當(dāng)瀏覽這個(gè)代碼時(shí),有幾個(gè)關(guān)鍵概念:
Jobs作業(yè)(表現(xiàn)為[ActiveJob])作為頂級工作項(xiàng)提交給調(diào)度程序。當(dāng)用戶調(diào)用一個(gè)action,如count()算子,Job將通過submitJob進(jìn)行提交。每個(gè)作業(yè)可能需要執(zhí)行多個(gè)stages來構(gòu)建中間數(shù)據(jù)。
Stages ([Stage])是一組任務(wù)的集合,在相同的RDD分區(qū)上,每個(gè)任務(wù)計(jì)算相同的功能,計(jì)算Jobs的中間結(jié)果。Stage根據(jù)Shuffle劃分邊界,我們必須等待前一階段Stage完成輸出。有兩種類型的Stage:[ResultStage]是執(zhí)行action的最后一個(gè)Stage,[ShuffleMapStage]是Shuffle Stages通過map寫入輸出文件中的。如果Jobs重用相同的RDDs,Stages可以跨越多個(gè)Jobs共享。
Tasks任務(wù)是單獨(dú)的工作單位,每個(gè)任務(wù)發(fā)送到一個(gè)分布式節(jié)點(diǎn)。
緩存跟蹤:DAGScheduler記錄哪些RDDS被緩存,避免重復(fù)計(jì)算,以及記錄Shuffle map Stages已經(jīng)生成的輸出文件,避免在map端重新計(jì)算。
數(shù)據(jù)本地化:DAGScheduler基于RDDS的數(shù)據(jù)本地性、緩存位置,或Shuffle數(shù)據(jù)在Stage中運(yùn)行每一個(gè)任務(wù)的Task。
清理:當(dāng)依賴于它們的運(yùn)行作業(yè)完成時(shí),所有數(shù)據(jù)結(jié)構(gòu)將被清除,防止在長期運(yùn)行的應(yīng)用程序中內(nèi)存泄漏。
為了從故障中恢復(fù),同一個(gè)Stage可能需要運(yùn)行多次,這被稱為重試“attempts”。如在上一個(gè)Stage中的輸出文件丟失,TaskScheduler中將報(bào)告任務(wù)失敗,DAGScheduler通過檢測CompletionEvent與FetchFailed或ExecutorLost事件重新提交丟失的Stage。DAGScheduler將等待看是否有其他節(jié)點(diǎn)或任務(wù)失敗,然后在丟失計(jì)算任務(wù)的階段Stage中重新提交TaskSets。在這個(gè)過程中,可能須創(chuàng)建之前被清理的Stage。舊Stage的任務(wù)仍然可以運(yùn)行,但必須在正確的Stage中接收事件并進(jìn)行操作。
做改變或者回顧時(shí)需要看的清單有:
Job運(yùn)行結(jié)束時(shí),所有的數(shù)據(jù)結(jié)構(gòu)將被清理,及清理程序運(yùn)行中的狀態(tài)。
添加一個(gè)新的數(shù)據(jù)結(jié)構(gòu)時(shí),在新結(jié)構(gòu)中更新'DAGSchedulerSuite.assertDataStructuresEmpty',包括新結(jié)構(gòu),將有助于捕獲內(nèi)存泄漏。
DAGScheduler.scala的源碼如下。
1. private[spark] 2. class DAGScheduler( 3. private[scheduler] val sc: SparkContext, 4. private[scheduler] val taskScheduler: TaskScheduler, 5. listenerBus: LiveListenerBus, 6. mapOutputTracker: MapOutputTrackerMaster, 7. blockManagerMaster: BlockManagerMaster, 8. env: SparkEnv, 9. clock: Clock = new SystemClock()) 10. extends Logging {
回到運(yùn)行日志,SparkContext在實(shí)例化的時(shí)候會構(gòu)造StandaloneSchedulerBackend、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster四大核心對象,DAGScheduler獲得Job ID,日志中顯示DAGScheduler: Final stage: ResultStage 1,F(xiàn)inal stage是ResultStage;Parents of final stage是ShuffleMapStage,DAGScheduler是面向Stage的。日志中顯示兩個(gè)Stage:Stage 1是Final stage,Stage 0是ShuffleMapStage。
接下來序號改變,運(yùn)行時(shí)最左側(cè)從0開始,日志中顯示DAGScheduler: missing: List(ShuffleMapStage 0),父Stage是ShuffleMapStage,DAGScheduler調(diào)度時(shí)必須先計(jì)算父Stage,因此首先提交的是ShuffleMapStage 0,這里RDD是MapPartitionsRDD,只有Stage中的最后一個(gè)算子是真正有效的,Stage 0中的最后一個(gè)操作是map,因此生成了MapPartitionsRDD。Stage 0無父Stage,因此提交,提交時(shí)進(jìn)行廣播等內(nèi)容,然后提交作業(yè)。
1. ...... 2. 17/05/24 05:48:28 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at WordCountJobRuntime.scala:61) 3. 17/05/24 05:48:28 INFO DAGScheduler: Parents of final stage: List (ShuffleMapStage 0) 4. 17/05/24 05:48:28 INFO DAGScheduler: Missing parents: List (ShuffleMapStage 0) 5. 17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ResultStage 1) 6. 17/05/24 05:48:28 DEBUG DAGScheduler: missing: List(ShuffleMapStage 0) 7. 17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ShuffleMapStage 0) 8. 17/05/24 05:48:28 DEBUG DAGScheduler: missing: List() 9. 17/05/24 05:48:28 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has no missing parents 10. 17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0) 11. 17/05/24 05:48:28 TRACE BlockInfoManager: Task -1024 trying to put broadcast_1 12. ......
我們從Web UI的角度看一下,如圖3-8所示,Web UI中顯示生成兩個(gè)Stage:Stage 0、Stage 1。

圖3-8 Stage劃分
日志中顯示DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0,DAGScheduler提交作業(yè),顯示提交一個(gè)須計(jì)算的任務(wù),ShuffleMapStage在本地運(yùn)行是一個(gè)并行度,交給TaskSchedulerImpl運(yùn)行。這里是一個(gè)并行度,提交底層的調(diào)度器TaskScheduler,TaskScheduler收到任務(wù)后,就發(fā)布任務(wù)到集群中運(yùn)行,由TaskSetManager進(jìn)行管理:日志中顯示TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes),顯示具體運(yùn)行的位置,及worker運(yùn)行了哪些任務(wù)。這里在本地只運(yùn)行了一個(gè)任務(wù)。
1. 17/05/24 05:48:28 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has no missing parents 2. 17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0) 3. ...... 4. 17/05/24 05:48:28 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55) 5. 17/05/24 05:48:28 DEBUG DAGScheduler: New pending partitions: Set(0) 6. 17/05/24 05:48:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 7. 17/05/24 05:48:28 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0 8. 17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY 9. 17/05/24 05:48:28 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 10. 17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY 11. 17/05/24 05:48:28 DEBUG SecurityManager: user=null aclsEnabled=false viewAcls=dell viewAclsGroups= 12. 17/05/24 05:48:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes) 13. 17/05/24 05:48:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 14. 17/05/24 05:48:28 DEBUG Executor: Task 0's epoch is 0
然后是完成作業(yè),日志中顯示TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 327 ms on localhost (executor driver),在本地機(jī)器上完成作業(yè)。當(dāng)Stage的一個(gè)任務(wù)完成后,ShuffleMapStage就已完成。Task任務(wù)運(yùn)行完后向DAGScheduler匯報(bào),DAGScheduler查看曾經(jīng)提交了幾個(gè)Task,計(jì)算Task的數(shù)量如果等于Task的總數(shù)量,那Stage也就完成了。這個(gè)Stage完成以后,下一個(gè)Stage開始運(yùn)行。
1. ...... 2. 17/05/24 05:48:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1744 bytes result sent to driver 3. 17/05/24 05:48:29 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 4. 17/05/24 05:48:29 DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY 5. 17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 327 ms on localhost (executor driver) (1/1) 6. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 7. 17/05/24 05:48:29 DEBUG DAGScheduler: ShuffleMapTask finished on driver 8. 17/05/24 05:48:29 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCountJobRuntime.scala:55) finished in 0.358 s
ShuffleMapStage完成后,將運(yùn)行下一個(gè)Stage。日志中顯示DAGScheduler: looking for newly runnable stages,這里一共有兩個(gè)Stage,ShuffleMapStage運(yùn)行完成,那只有一個(gè)ResultStage將運(yùn)行。DAGScheduler又提交最后一個(gè)Stage的一個(gè)任務(wù),默認(rèn)并行度是繼承的。同樣,發(fā)布任務(wù)給Executor進(jìn)行計(jì)算。
1. ...... 2. 17/05/24 05:48:29 INFO DAGScheduler: looking for newly runnable stages 3. 17/05/24 05:48:29 INFO DAGScheduler: running: Set() 4. 17/05/24 05:48:29 INFO DAGScheduler: waiting: Set(ResultStage 1) 5. 17/05/24 05:48:29 INFO DAGScheduler: failed: Set() 6. 17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Increasing epoch to 1 7. 17/05/24 05:48:29 TRACE DAGScheduler: Checking if any dependencies of ShuffleMapStage 0 are now runnable 8. 17/05/24 05:48:29 TRACE DAGScheduler: running: Set() 9. 17/05/24 05:48:29 TRACE DAGScheduler: waiting: Set(ResultStage 1) 10. 17/05/24 05:48:29 TRACE DAGScheduler: failed: Set() 11. 17/05/24 05:48:29 DEBUG DAGScheduler: submitStage(ResultStage 1) 12. 17/05/24 05:48:29 DEBUG DAGScheduler: missing: List() 13. 17/05/24 05:48:29 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCountJobRuntime.scala:61), which has no missing parents 14. 17/05/24 05:48:29 DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 15. ...... 16. 17/05/24 05:48:29 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCountJobRuntime.scala:61) 17. 17/05/24 05:48:29 DEBUG DAGScheduler: New pending partitions: Set(0) 18. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19. ....
Task任務(wù)運(yùn)行完后向DAGScheduler匯報(bào),DAGScheduler計(jì)算曾經(jīng)提交了幾個(gè)Task,如果Task的數(shù)量等于Task的總數(shù)量,ResultStage也運(yùn)行完成。然后進(jìn)行相關(guān)的清理工作,兩個(gè)Stage(ShuffleMapStage、ResultStage)完成,Job也就完成。
1. ...... 2. 17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Fetching outputs for shuffle 0, partitions 0-1 3. 17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 4. 17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 5. 17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms 6. 17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: Got local blocks in 12 ms 7. 17/05/24 05:48:29 DEBUG TaskMemoryManager: Task 1 release 0.0 B from org.apache.spark.util.collection.ExternalAppendOnlyMap@3da8eddf 8. ...... 9. 17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 409 ms on localhost (executor driver) (1/1) 10. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 11. 17/05/24 05:48:29 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCountJobRuntime.scala:61) finished in 0.410 s 12. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 1, remaining stages = 1 13. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 0, remaining stages = 0 14. 17/05/24 05:48:29 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCountJobRuntime.scala:61, took 1.345921 s 15. ......
下面看一下WebUI,ShuffleMapStage中的任務(wù)交給Executor,圖3-9中顯示了任務(wù)的相關(guān)信息,如Shuffle的輸出等,第一個(gè)Stage肯定生成Shuffle的輸出,可以看一下最右側(cè)的Shuffle Write Size/Records。圖3-9中的Input Size/Records是從Hdfs中讀入的文件數(shù)據(jù)。

圖3-9 ShuffleMapStage運(yùn)行
接下來看一下第二個(gè)Stage。第二個(gè)Stage同樣顯示Executor的信息,圖3-10最右側(cè)顯示Shuffle Read Size/Records。如果在分布式集群運(yùn)行,須遠(yuǎn)程讀取數(shù)據(jù),例如,原來是4個(gè)Executor計(jì)算,在第二個(gè)Stage中是兩個(gè)Executor計(jì)算,因此一部分?jǐn)?shù)據(jù)是本地的,一部分是遠(yuǎn)程的,或從遠(yuǎn)程節(jié)點(diǎn)拉取數(shù)據(jù)。ResultStage最后要產(chǎn)生輸出,輸出到文件保存。

圖3-10 ResultStage運(yùn)行
Task的運(yùn)行解密:
(1)Task是運(yùn)行在Executor中的,而Executor又是位于CoarseGrainedExecutorBackend中的,且CoarseGrainedExecutorBackend和Executor是一一對應(yīng)的;計(jì)算運(yùn)行于Executor,而Executor位于CoarseGrainedExecutorBackend中,CoarseGrainedExecutorBackend是進(jìn)程。發(fā)任務(wù)消息也是在CoarseGrainedExecutorBackend。
(2)當(dāng)CoarseGrainedExecutorBackend接收到TaskSetManager發(fā)過來的LaunchTask消息后會反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor來執(zhí)行任務(wù)。
CoarseGrainedExecutorBackend收到Driver發(fā)送的LaunchTask任務(wù)消息,其中LaunchTask是case class,而不是case object,是因?yàn)槊總€(gè)消息是一個(gè)消息實(shí)例,每個(gè)消息狀態(tài)不一樣,而case object是唯一的,因此使用case class。
1. //Driver節(jié)點(diǎn)到 executors節(jié)點(diǎn) 2. case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
Executor.scala的源碼如下。
Spark 2.1.1版本的Executor.scala的launchTask的源碼如下。
1. ...... 2. //維護(hù)正在運(yùn)行的任務(wù)列表 3. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] 4. ...... 5. def launchTask( 6. context: ExecutorBackend, 7. taskId: Long, 8. attemptNumber: Int, 9. taskName: String, 10. serializedTask: ByteBuffer): Unit = { 11. val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) 12. runningTasks.put(taskId, tr) 13. threadPool.execute(tr) 14. } 15. ...... 16. 17. class TaskRunner( 18. execBackend: ExecutorBackend, 19. val taskId: Long, 20. val attemptNumber: Int, 21. taskName: String, 22. serializedTask: ByteBuffer) 23. extends Runnable { 24. .......
Spark 2.2.0版本的Executor.scala的launchTask的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。
上段代碼中第7~10行調(diào)整launchTask方法的第二個(gè)參數(shù):傳入封裝的taskDescription任務(wù)描述信息。
上段代碼中第11行構(gòu)建TaskRunner實(shí)例傳入的也是taskDescription參數(shù)。
上段代碼中第19~22行TaskRunner的第二個(gè)成員變量更新為TaskDescription類型。
1. ...... 2. def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { 3. val tr = new TaskRunner(context, taskDescription) 4. ..... 5. } 6. ...... 7. class TaskRunner( 8. ..... 9. private val taskDescription: TaskDescription) 10. ...
在Executor.scala中單擊launchTask,運(yùn)行的任務(wù)使用了ConcurrentHashMap數(shù)據(jù)結(jié)構(gòu),運(yùn)行l(wèi)aunchTask的時(shí)候構(gòu)建了一個(gè)TaskRunner,TaskRunner是一個(gè)Runnable,而Runnable是Java中的接口,Scala可以直接調(diào)用Java的代碼,run方法中包括任務(wù)的反序列化等內(nèi)容。通過Runnable封裝任務(wù),然后放入到runningTasks中,在threadPool中執(zhí)行任務(wù)。threadPool是一個(gè)newDaemonCachedThreadPool。任務(wù)交給Executor的線程池中的線程去執(zhí)行,執(zhí)行的時(shí)候下載資源、數(shù)據(jù)等內(nèi)容。
Spark 2.1.1版本的Executor.scala的threadPool的源碼如下。
1. //啟動worker線程池 2. private val threadPool = ThreadUtils.newDaemonCachedThreadPool ("Executor task launch worker")
Spark 2.2.0版本的Executor.scala的threadPool的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第2行Executor的線程池由ThreadUtils.newDaemonCachedThreadPool方式調(diào)整為Executors.newCachedThreadPool(threadFactory)線程池的方式。
1. //啟動worker線程池 2. private val threadPool = { 3. val threadFactory = new ThreadFactoryBuilder() 4. .setDaemon(true) 5. .setNameFormat("Executor task launch worker-%d") 6. .setThreadFactory(new ThreadFactory { 7. override def newThread(r: Runnable): Thread = 8. //使用UninterruptibleThread 運(yùn)行任務(wù),這樣我們就可以允許運(yùn)行代碼不被 //Thread.interrupt()線程中斷。例如,KAFKA-1894、HADOOP-10622, //如果某些方法被中斷,程序?qū)恢睊炱? 9. new UninterruptibleThread(r, "unused") //thread name will be set by ThreadFactoryBuilder 10. }) 11. .build() 12. Executors.newCachedThreadPool(threadFactory).asInstanceOf [ThreadPoolExecutor] 13. }
- Design for the Future
- 工業(yè)機(jī)器人產(chǎn)品應(yīng)用實(shí)戰(zhàn)
- Hands-On Data Science with SQL Server 2017
- CorelDRAW X4中文版平面設(shè)計(jì)50例
- 最簡數(shù)據(jù)挖掘
- 永磁同步電動機(jī)變頻調(diào)速系統(tǒng)及其控制(第2版)
- WordPress Theme Development Beginner's Guide(Third Edition)
- 四向穿梭式自動化密集倉儲系統(tǒng)的設(shè)計(jì)與控制
- 21天學(xué)通Visual C++
- 計(jì)算機(jī)與信息技術(shù)基礎(chǔ)上機(jī)指導(dǎo)
- Visual FoxPro程序設(shè)計(jì)
- Visual Studio 2010 (C#) Windows數(shù)據(jù)庫項(xiàng)目開發(fā)
- 經(jīng)典Java EE企業(yè)應(yīng)用實(shí)戰(zhàn)
- 基于RPA技術(shù)財(cái)務(wù)機(jī)器人的應(yīng)用與研究
- 人工智能:智能人機(jī)交互