官术网_书友最值得收藏!

3.7 Spark RDD中Runtime流程解析

本節(jié)講解Spark的Runtime架構(gòu)圖,并從一個(gè)作業(yè)的視角通過Driver、Master、Worker、Executor等角色透視Spark的Runtime生命周期。

3.7.1 Runtime架構(gòu)圖

(1)從Spark Runtime的角度講,包括五大核心對象:Master、Worker、Executor、Driver、CoarseGrainedExecutorBackend。

(2)Spark在做分布式集群系統(tǒng)設(shè)計(jì)的時(shí)候:最大化功能獨(dú)立、模塊化封裝具體獨(dú)立的對象、強(qiáng)內(nèi)聚松耦合。Spark運(yùn)行架構(gòu)圖如圖3-7所示。

圖3-7 Spark運(yùn)行架構(gòu)圖

(3)當(dāng)Driver中的SparkContext初始化時(shí)會提交程序給Master,Master如果接受該程序在Spark中運(yùn)行,就會為當(dāng)前的程序分配AppID,同時(shí)會分配具體的計(jì)算資源。需要特別注意的是,Master是根據(jù)當(dāng)前提交程序的配置信息來給集群中的Worker發(fā)指令分配具體的計(jì)算資源,但是,Master發(fā)出指令后并不關(guān)心具體的資源是否已經(jīng)分配,換言之,Master是發(fā)指令后就記錄了分配的資源,以后客戶端再次提交其他的程序,就不能使用該資源了。其弊端是可能會導(dǎo)致其他要提交的程序無法分配到本來應(yīng)該可以分配到的計(jì)算資源;最終的優(yōu)勢是Spark分布式系統(tǒng)功能在耦合的基礎(chǔ)上最快地運(yùn)行系統(tǒng)(否則如果Master要等到資源最終分配成功后才通知Driver,就會造成Driver阻塞,不能夠最大化并行計(jì)算資源的使用率)。需要補(bǔ)充說明的是:Spark在默認(rèn)情況下由于集群中一般都只有一個(gè)Application在運(yùn)行,所有Master分配資源策略的弊端就沒有那么明顯了。

3.7.2 生命周期

本節(jié)對Spark Runtime(Driver、Master、Worker、Executor)內(nèi)幕解密,從Spark Runtime全局的角度看Spark具體是怎么工作的,從一個(gè)作業(yè)的視角通過Driver、Master、Worker、Executor等角色來透視Spark的Runtime生命周期。

Job提交過程源碼解密中一個(gè)非常重要的技巧是通過在spark-shell中運(yùn)行一個(gè)Job來了解Job提交的過程,然后再用源碼驗(yàn)證這個(gè)過程。我們可以在spark-shell中運(yùn)行一個(gè)程序,從控制臺觀察日志。

1.  ,sc.textFile("/library/dataforSortedShufffle").flatMap(_.split(" ")).map
    (word => (word, 1).reduceByKey(_+_)saveAsTextFile("/library/dataoutput2")

這里我們編寫WordCountJobRuntime.scala代碼,從IDEA中觀察日志。讀入的數(shù)據(jù)源文件內(nèi)容如下。

1.  Hello Spark Hello Scala
2.  Hello Hadoop
3.  Hello Flink
4.  Spark is Awesome

WordCountJobRuntime.scala的代碼如下。

1.   package com.dt.spark.sparksql
2.
3.  import org.apache.log4j.{Level, Logger}
4.  import org.apache.spark.{SparkConf, SparkContext}
5.
6.  /**
7.    * 使用Scala開發(fā)本地測試的Spark WordCount程序
8.    * @author DT大數(shù)據(jù)夢工廠
9.    * 新浪微博:http://weibo.com/ilovepains/
10.   */
11. object WordCountJobRuntime {
12.   def main(args: Array[String]){
13.     Logger.getLogger("org").setLevel(Level.ALL)
14.     /**
15.       * 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序運(yùn)行時(shí)的配置信息,
          * 例如,通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
     * 為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置非常差(如只有1GB的內(nèi)
     * 存)的初學(xué)者       *
16.  */
17.
18.
19.  val conf = new SparkConf() //創(chuàng)建SparkConf對象
20.  conf.setAppName("Wow,WordCountJobRuntime!")
                  //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱
21.  conf.setMaster("local") //此時(shí),程序在本地運(yùn)行,不需要安裝Spark集群
22.
23.  /**
24.    * 第2步:創(chuàng)建SparkContext對象
25.    * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
       * R等,都必須有一個(gè)SparkContext
26.    * SparkContext    核心作用:初始化  Spark  應(yīng)用程序運(yùn)行所需要的核心組件,包括
       * DAGScheduler、TaskScheduler、SchedulerBackend,同時(shí)還會負(fù)責(zé)Spark程序
       * 往Master注冊程序等
27.    * SparkContext是整個(gè)Spark應(yīng)用程序中至關(guān)重要的一個(gè)對象
28.    */
29.  val sc = new SparkContext(conf)
                                       //創(chuàng)建SparkContext對象,通過傳入SparkConf
                                       //實(shí)例來定制Spark運(yùn)行的具體參數(shù)和配置信息
30.
31.  /**
32.    * 第 3  步:根據(jù)具體的數(shù)據(jù)來源(如  HDFS、HBase、Local FS、DB、S3      等)通過
       * SparkContext創(chuàng)建RDD
33.    * RDD的創(chuàng)建有3種方式:根據(jù)外部的數(shù)據(jù)來源(如HDFS)、根據(jù)Scala集合、由其他
       * 的RDD操作
34.    * 數(shù)據(jù)會被RDD劃分為一系列的Partitions,分配到每個(gè)Partition的數(shù)據(jù)屬于一
       * 個(gè)Task的處理范疇
35.    */
36.   val lines = sc.textFile("data/wordcount/helloSpark.txt")
37.  /**
38.    * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等
       * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算
39.    *  第4.1步:將每一行的字符串拆分成單個(gè)單詞
40.    */
41.
42.  val words = lines.flatMap { line => line.split(" ")}
                                       //對每一行的字符串進(jìn)行單詞拆分,并把所有行的拆
                                       //分結(jié)果通過flat合并成為一個(gè)大的單詞集合
43.
44.  /**
45.    * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等
       * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算
46.    *  第4.2步:在單詞拆分的基礎(chǔ)上對每個(gè)單詞實(shí)例計(jì)數(shù)為1,也就是word => (word, 1)
47.    */
48.  val pairs = words.map { word => (word, 1) }
49.
50.  /**
51.    * 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等
       * 高階函數(shù)等的編程,進(jìn)行具體的數(shù)據(jù)計(jì)算
52.       *  第4.3步:在每個(gè)單詞實(shí)例計(jì)數(shù)為1的基礎(chǔ)上統(tǒng)計(jì)每個(gè)單詞在文件中出現(xiàn)的總次數(shù)
53.       */
54.     val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("data/
        wordcount/wordCountResult.log")
55.
56.     while(true){
57.
58.     }
59.     sc.stop()
60.
61.   }
62. }

在IDEA中運(yùn)行,WordCountJobRuntime的運(yùn)行結(jié)果保存在data/wordcount/ wordCountResult.log目錄的part-00000中。

1.   (Awesome,1)
2.  (Flink,1)
3.  (Spark,2)
4.  (is,1)
5.  (Hello,4)
6.  (Scala,1)
7.  (Hadoop,1)

在IDEA的控制臺中觀察WordCountJobRuntime.scala運(yùn)行日志,這里Spark版本是version 2.1.0。其中,MemoryStore是從Storge內(nèi)存角度來看的,Storge是磁盤管理和內(nèi)存管理。這里,Spark讀取了Hadoop的HDFS,因此使用了Hadoop的內(nèi)容,如FileInputFormat,日志中顯示FileInputFormat: Total input paths to process : 1說明有一個(gè)文件要處理。

1.   Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
     properties
2.  17/05/24 05:48:20 INFO SparkContext: Running Spark version 2.1.0
3.  ......
4.  17/05/24 05:48:24 DEBUG DiskBlockManager: Adding shutdown hook
5.  17/05/24 05:48:24 DEBUG ShutdownHookManager: Adding shutdown hook
6.  17/05/24 05:48:24 INFO MemoryStore: MemoryStore started with capacity
    637.2 MB
7.  17/05/24 05:48:24 INFO SparkEnv: Registering OutputCommitCoordinator
8.  ......
9.  17/05/24 05:48:27 DEBUG HadoopRDD: Creating new JobConf and caching it
    for later re-use
10. 17/05/24 05:48:27 DEBUG FileInputFormat: Time taken to get FileStatuses: 28
11. 17/05/24 05:48:27 INFO FileInputFormat: Total input paths to process : 1
12. 17/05/24 05:48:27 DEBUG FileInputFormat: Total # of splits generated by
    getSplits: 1, TimeTaken: 48
13. ......

在Spark中,所有的Action都會觸發(fā)至少一個(gè)Job,在WordCountJobRuntime.scala代碼中,是通過saveAsTextFile來觸發(fā)Job的。在日志中查看SparkContext: Starting job: saveAsTextFile觸發(fā)saveAsTextFile。緊接著交給DAGScheduler,日志中顯示DAGScheduler: Registering RDD,因?yàn)檫@里有兩個(gè)Stage,從具體計(jì)算的角度,前面Stage計(jì)算的時(shí)候保留輸出。然后是DAGScheduler獲得了job的ID(job 0)。

1.   17/05/24 05:48:28 INFO SparkContext: Starting job: saveAsTextFile at
     WordCountJobRuntime.scala:61
2.  17/05/24 05:48:28 DEBUG SortShuffleManager: Can't use serialized shuffle
    for shuffle 0 because an aggregator is defined
3.  17/05/24    05:48:28    INFO    DAGScheduler:     Registering     RDD  3  (map  at
    WordCountJobRuntime.scala:55)
4.  17/05/24 05:48:28 INFO DAGScheduler: Got job 0 (saveAsTextFile at
    WordCountJobRuntime.scala:61) with 1 output partitions
5.  ......

SparkContext在實(shí)例化的時(shí)候會構(gòu)造StandaloneSchedulerBackend(Spark 2.0版本將之前的SparkDeploySchedulerBackend名字更新為StandaloneSchedulerBackend)、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster等對象。

 其中,StandaloneSchedulerBackend負(fù)責(zé)集群計(jì)算資源的管理和調(diào)度,這是從作業(yè)的角度來考慮的,注冊給Master的時(shí)候,Master給我們分配資源,資源從Executor本身轉(zhuǎn)過來向StandaloneSchedulerBackend注冊,這是從作業(yè)調(diào)度的角度來考慮的,不是從整個(gè)集群來考慮,整個(gè)集群是Master來管理計(jì)算資源的。

 DAGScheduler負(fù)責(zé)高層調(diào)度(如Job中Stage的劃分、數(shù)據(jù)本地性等內(nèi)容)。

 TaskSchedulerImple負(fù)責(zé)具體Stage內(nèi)部的底層調(diào)度(如具體每個(gè)Task的調(diào)度、Task的容錯(cuò)等)。

 MapOutputTrackerMaster負(fù)責(zé)Shuffle中數(shù)據(jù)輸出和讀取的管理。Shuffle的時(shí)候?qū)?shù)據(jù)寫到本地,下一個(gè)Stage要使用上一個(gè)Stage的數(shù)據(jù),因此寫數(shù)據(jù)的時(shí)候要告訴Driver中的MapOutputTrackerMaster具體寫到哪里,下一個(gè)Stage讀取數(shù)據(jù)的時(shí)候也要訪問Driver的MapOutputTrackerMaster獲取數(shù)據(jù)的具體位置。

MapOutputTrackerMaster的源碼如下。

1.  private[spark] class MapOutputTrackerMaster(conf: SparkConf,
2.     broadcastManager: BroadcastManager, isLocal: Boolean)
3.   extends MapOutputTracker(conf) {

DAGScheduler是面向Stage調(diào)度的高層調(diào)度實(shí)現(xiàn)。它為每一個(gè)Job計(jì)算DAG,跟蹤RDDS及Stage輸出結(jié)果進(jìn)行物化,并找到一個(gè)最小的計(jì)劃去運(yùn)行Job,然后提交stages中的TaskSets到底層調(diào)度器TaskScheduler提交集群運(yùn)行,TaskSet包含完全獨(dú)立的任務(wù),基于集群上已存在的數(shù)據(jù)運(yùn)行(如從上一個(gè)Stage輸出的文件),如果這個(gè)數(shù)據(jù)不可用,獲取數(shù)據(jù)可能會失敗。

Spark Stages根據(jù)RDD圖中Shuffle的邊界來創(chuàng)建,如果RDD的操作是窄依賴,如map()和filter(),在每個(gè)Stages中將一系列tasks組合成流水線執(zhí)行。但是,如果是寬依賴,Shuffle依賴需要多個(gè)Stages(上一個(gè)Stage進(jìn)行map輸出寫入文件,下一個(gè)Stage讀取數(shù)據(jù)文件),每個(gè)Stage依賴于其他的Stage,其中進(jìn)行多個(gè)算子操作。算子操作在各種類型的RDDS(如MappedRDD、FilteredRDD)的RDD.compute()中實(shí)際執(zhí)行。

在DAG階段,DAGScheduler根據(jù)當(dāng)前緩存狀態(tài)決定每個(gè)任務(wù)運(yùn)行的位置,并將任務(wù)傳遞給底層的任務(wù)調(diào)度器TaskScheduler。此外,它處理Shuffle輸出文件丟失的故障,在這種情況下,以前的Stage可能需要重新提交。Stage中不引起Shuffle文件丟失的故障由任務(wù)調(diào)度器TaskScheduler處理,在取消整個(gè)Stage前,將重試幾次任務(wù)。

當(dāng)瀏覽這個(gè)代碼時(shí),有幾個(gè)關(guān)鍵概念:

 Jobs作業(yè)(表現(xiàn)為[ActiveJob])作為頂級工作項(xiàng)提交給調(diào)度程序。當(dāng)用戶調(diào)用一個(gè)action,如count()算子,Job將通過submitJob進(jìn)行提交。每個(gè)作業(yè)可能需要執(zhí)行多個(gè)stages來構(gòu)建中間數(shù)據(jù)。

 Stages ([Stage])是一組任務(wù)的集合,在相同的RDD分區(qū)上,每個(gè)任務(wù)計(jì)算相同的功能,計(jì)算Jobs的中間結(jié)果。Stage根據(jù)Shuffle劃分邊界,我們必須等待前一階段Stage完成輸出。有兩種類型的Stage:[ResultStage]是執(zhí)行action的最后一個(gè)Stage,[ShuffleMapStage]是Shuffle Stages通過map寫入輸出文件中的。如果Jobs重用相同的RDDs,Stages可以跨越多個(gè)Jobs共享。

 Tasks任務(wù)是單獨(dú)的工作單位,每個(gè)任務(wù)發(fā)送到一個(gè)分布式節(jié)點(diǎn)。

 緩存跟蹤:DAGScheduler記錄哪些RDDS被緩存,避免重復(fù)計(jì)算,以及記錄Shuffle map Stages已經(jīng)生成的輸出文件,避免在map端重新計(jì)算。

 數(shù)據(jù)本地化:DAGScheduler基于RDDS的數(shù)據(jù)本地性、緩存位置,或Shuffle數(shù)據(jù)在Stage中運(yùn)行每一個(gè)任務(wù)的Task。

 清理:當(dāng)依賴于它們的運(yùn)行作業(yè)完成時(shí),所有數(shù)據(jù)結(jié)構(gòu)將被清除,防止在長期運(yùn)行的應(yīng)用程序中內(nèi)存泄漏。

 為了從故障中恢復(fù),同一個(gè)Stage可能需要運(yùn)行多次,這被稱為重試“attempts”。如在上一個(gè)Stage中的輸出文件丟失,TaskScheduler中將報(bào)告任務(wù)失敗,DAGScheduler通過檢測CompletionEvent與FetchFailed或ExecutorLost事件重新提交丟失的Stage。DAGScheduler將等待看是否有其他節(jié)點(diǎn)或任務(wù)失敗,然后在丟失計(jì)算任務(wù)的階段Stage中重新提交TaskSets。在這個(gè)過程中,可能須創(chuàng)建之前被清理的Stage。舊Stage的任務(wù)仍然可以運(yùn)行,但必須在正確的Stage中接收事件并進(jìn)行操作。

做改變或者回顧時(shí)需要看的清單有:

 Job運(yùn)行結(jié)束時(shí),所有的數(shù)據(jù)結(jié)構(gòu)將被清理,及清理程序運(yùn)行中的狀態(tài)。

 添加一個(gè)新的數(shù)據(jù)結(jié)構(gòu)時(shí),在新結(jié)構(gòu)中更新'DAGSchedulerSuite.assertDataStructuresEmpty',包括新結(jié)構(gòu),將有助于捕獲內(nèi)存泄漏。

DAGScheduler.scala的源碼如下。

1.   private[spark]
2.  class DAGScheduler(
3.      private[scheduler] val sc: SparkContext,
4.      private[scheduler] val taskScheduler: TaskScheduler,
5.      listenerBus: LiveListenerBus,
6.      mapOutputTracker: MapOutputTrackerMaster,
7.      blockManagerMaster: BlockManagerMaster,
8.      env: SparkEnv,
9.      clock: Clock = new SystemClock())
10.   extends Logging {

回到運(yùn)行日志,SparkContext在實(shí)例化的時(shí)候會構(gòu)造StandaloneSchedulerBackend、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster四大核心對象,DAGScheduler獲得Job ID,日志中顯示DAGScheduler: Final stage: ResultStage 1,F(xiàn)inal stage是ResultStage;Parents of final stage是ShuffleMapStage,DAGScheduler是面向Stage的。日志中顯示兩個(gè)Stage:Stage 1是Final stage,Stage 0是ShuffleMapStage。

接下來序號改變,運(yùn)行時(shí)最左側(cè)從0開始,日志中顯示DAGScheduler: missing: List(ShuffleMapStage 0),父Stage是ShuffleMapStage,DAGScheduler調(diào)度時(shí)必須先計(jì)算父Stage,因此首先提交的是ShuffleMapStage 0,這里RDD是MapPartitionsRDD,只有Stage中的最后一個(gè)算子是真正有效的,Stage 0中的最后一個(gè)操作是map,因此生成了MapPartitionsRDD。Stage 0無父Stage,因此提交,提交時(shí)進(jìn)行廣播等內(nèi)容,然后提交作業(yè)。

1.  ......
2.  17/05/24    05:48:28    INFO    DAGScheduler:     Final   stage:    ResultStage  1
    (saveAsTextFile at WordCountJobRuntime.scala:61)
3.  17/05/24 05:48:28 INFO DAGScheduler: Parents of final stage: List
    (ShuffleMapStage 0)
4.  17/05/24 05:48:28 INFO DAGScheduler: Missing parents: List
    (ShuffleMapStage 0)
5.  17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ResultStage 1)
6.  17/05/24 05:48:28 DEBUG DAGScheduler: missing: List(ShuffleMapStage 0)
7.  17/05/24 05:48:28 DEBUG DAGScheduler: submitStage(ShuffleMapStage 0)
8.  17/05/24 05:48:28 DEBUG DAGScheduler: missing: List()
9.  17/05/24    05:48:28    INFO   DAGScheduler:     Submitting     ShuffleMapStage  0
    (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has
    no missing parents
10. 17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage
    0)
11. 17/05/24 05:48:28 TRACE BlockInfoManager: Task -1024 trying to put
    broadcast_1
12. ......

我們從Web UI的角度看一下,如圖3-8所示,Web UI中顯示生成兩個(gè)Stage:Stage 0、Stage 1。

圖3-8 Stage劃分

日志中顯示DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0,DAGScheduler提交作業(yè),顯示提交一個(gè)須計(jì)算的任務(wù),ShuffleMapStage在本地運(yùn)行是一個(gè)并行度,交給TaskSchedulerImpl運(yùn)行。這里是一個(gè)并行度,提交底層的調(diào)度器TaskScheduler,TaskScheduler收到任務(wù)后,就發(fā)布任務(wù)到集群中運(yùn)行,由TaskSetManager進(jìn)行管理:日志中顯示TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes),顯示具體運(yùn)行的位置,及worker運(yùn)行了哪些任務(wù)。這里在本地只運(yùn)行了一個(gè)任務(wù)。

1.   17/05/24 05:48:28 INFO DAGScheduler: Submitting ShuffleMapStage 0
     (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55), which has
     no missing parents
2.  17/05/24 05:48:28 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 0)
3.  ......
4.  17/05/24 05:48:28 INFO DAGScheduler: Submitting 1 missing tasks from
    ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCountJobRuntime.scala:55)
5.  17/05/24 05:48:28 DEBUG DAGScheduler: New pending partitions: Set(0)
6.  17/05/24 05:48:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
7.  17/05/24 05:48:28 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
8.  17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet
    0.0: NO_PREF, ANY
9. 17/05/24 05:48:28 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0,
    runningTasks: 0
10. 17/05/24 05:48:28 DEBUG TaskSetManager: Valid locality levels for TaskSet
    0.0: NO_PREF, ANY
11. 17/05/24 05:48:28 DEBUG SecurityManager: user=null aclsEnabled=false
    viewAcls=dell viewAclsGroups=
12. 17/05/24 05:48:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
    0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6012 bytes)
13. 17/05/24 05:48:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14. 17/05/24 05:48:28 DEBUG Executor: Task 0's epoch is 0

然后是完成作業(yè),日志中顯示TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 327 ms on localhost (executor driver),在本地機(jī)器上完成作業(yè)。當(dāng)Stage的一個(gè)任務(wù)完成后,ShuffleMapStage就已完成。Task任務(wù)運(yùn)行完后向DAGScheduler匯報(bào),DAGScheduler查看曾經(jīng)提交了幾個(gè)Task,計(jì)算Task的數(shù)量如果等于Task的總數(shù)量,那Stage也就完成了。這個(gè)Stage完成以后,下一個(gè)Stage開始運(yùn)行。

1.  ......
2.  17/05/24 05:48:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
    1744 bytes result sent to driver
3. 17/05/24 05:48:29 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0,
    runningTasks: 0
4. 17/05/24 05:48:29 DEBUG TaskSetManager: No tasks for locality level NO_PREF,
    so moving to locality level ANY
5.  17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID
    0) in 327 ms on localhost (executor driver) (1/1)
6.  17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
    have all completed, from pool
7.  17/05/24 05:48:29 DEBUG DAGScheduler: ShuffleMapTask finished on driver
8.  17/05/24     05:48:29    INFO   DAGScheduler:      ShuffleMapStage      0  (map  at
    WordCountJobRuntime.scala:55) finished in 0.358 s

ShuffleMapStage完成后,將運(yùn)行下一個(gè)Stage。日志中顯示DAGScheduler: looking for newly runnable stages,這里一共有兩個(gè)Stage,ShuffleMapStage運(yùn)行完成,那只有一個(gè)ResultStage將運(yùn)行。DAGScheduler又提交最后一個(gè)Stage的一個(gè)任務(wù),默認(rèn)并行度是繼承的。同樣,發(fā)布任務(wù)給Executor進(jìn)行計(jì)算。

1.  ......
2.  17/05/24 05:48:29 INFO DAGScheduler: looking for newly runnable stages
3.  17/05/24 05:48:29 INFO DAGScheduler: running: Set()
4.  17/05/24 05:48:29 INFO DAGScheduler: waiting: Set(ResultStage 1)
5.  17/05/24 05:48:29 INFO DAGScheduler: failed: Set()
6.   17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Increasing epoch to 1
7.  17/05/24 05:48:29 TRACE DAGScheduler: Checking if any dependencies of
    ShuffleMapStage 0 are now runnable
8.  17/05/24 05:48:29 TRACE DAGScheduler: running: Set()
9.  17/05/24 05:48:29 TRACE DAGScheduler: waiting: Set(ResultStage 1)
10. 17/05/24 05:48:29 TRACE DAGScheduler: failed: Set()
11. 17/05/24 05:48:29 DEBUG DAGScheduler: submitStage(ResultStage 1)
12. 17/05/24 05:48:29 DEBUG DAGScheduler: missing: List()
13. 17/05/24     05:48:29     INFO    DAGScheduler:     Submitting      ResultStage   1
    (MapPartitionsRDD[5] at saveAsTextFile at WordCountJobRuntime.scala:61),
    which has no missing parents
14. 17/05/24 05:48:29 DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
15. ......
16. 17/05/24 05:48:29 INFO DAGScheduler: Submitting 1 missing tasks from
    ResultStage       1     (MapPartitionsRDD[5]          at     saveAsTextFile      at
    WordCountJobRuntime.scala:61)
17. 17/05/24 05:48:29 DEBUG DAGScheduler: New pending partitions: Set(0)
18. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
19. ....

Task任務(wù)運(yùn)行完后向DAGScheduler匯報(bào),DAGScheduler計(jì)算曾經(jīng)提交了幾個(gè)Task,如果Task的數(shù)量等于Task的總數(shù)量,ResultStage也運(yùn)行完成。然后進(jìn)行相關(guān)的清理工作,兩個(gè)Stage(ShuffleMapStage、ResultStage)完成,Job也就完成。

1.  ......
2.  17/05/24 05:48:29 DEBUG MapOutputTrackerMaster: Fetching outputs for
    shuffle 0, partitions 0-1
3.  17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight:
    50331648, targetRequestSize: 10066329
4.  17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
    blocks out of 1 blocks
5.  17/05/24 05:48:29 INFO ShuffleBlockFetcherIterator: Started 0 remote
    fetches in 5 ms
6.  17/05/24 05:48:29 DEBUG ShuffleBlockFetcherIterator: Got local blocks in
    12 ms
7.  17/05/24 05:48:29 DEBUG TaskMemoryManager: Task 1 release 0.0 B from
    org.apache.spark.util.collection.ExternalAppendOnlyMap@3da8eddf
8.  ......
9.  17/05/24 05:48:29 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID
    1) in 409 ms on localhost (executor driver) (1/1)
10. 17/05/24 05:48:29 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
    have all completed, from pool
11. 17/05/24 05:48:29 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at
    WordCountJobRuntime.scala:61) finished in 0.410 s
12. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 1, remaining
    stages = 1
13. 17/05/24 05:48:29 DEBUG DAGScheduler: After removal of stage 0, remaining
    stages = 0
14. 17/05/24 05:48:29 INFO DAGScheduler: Job 0 finished: saveAsTextFile at
    WordCountJobRuntime.scala:61, took 1.345921 s
15. ......

下面看一下WebUI,ShuffleMapStage中的任務(wù)交給Executor,圖3-9中顯示了任務(wù)的相關(guān)信息,如Shuffle的輸出等,第一個(gè)Stage肯定生成Shuffle的輸出,可以看一下最右側(cè)的Shuffle Write Size/Records。圖3-9中的Input Size/Records是從Hdfs中讀入的文件數(shù)據(jù)。

圖3-9 ShuffleMapStage運(yùn)行

接下來看一下第二個(gè)Stage。第二個(gè)Stage同樣顯示Executor的信息,圖3-10最右側(cè)顯示Shuffle Read Size/Records。如果在分布式集群運(yùn)行,須遠(yuǎn)程讀取數(shù)據(jù),例如,原來是4個(gè)Executor計(jì)算,在第二個(gè)Stage中是兩個(gè)Executor計(jì)算,因此一部分?jǐn)?shù)據(jù)是本地的,一部分是遠(yuǎn)程的,或從遠(yuǎn)程節(jié)點(diǎn)拉取數(shù)據(jù)。ResultStage最后要產(chǎn)生輸出,輸出到文件保存。

圖3-10 ResultStage運(yùn)行

Task的運(yùn)行解密:

(1)Task是運(yùn)行在Executor中的,而Executor又是位于CoarseGrainedExecutorBackend中的,且CoarseGrainedExecutorBackend和Executor是一一對應(yīng)的;計(jì)算運(yùn)行于Executor,而Executor位于CoarseGrainedExecutorBackend中,CoarseGrainedExecutorBackend是進(jìn)程。發(fā)任務(wù)消息也是在CoarseGrainedExecutorBackend。

(2)當(dāng)CoarseGrainedExecutorBackend接收到TaskSetManager發(fā)過來的LaunchTask消息后會反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor來執(zhí)行任務(wù)。

CoarseGrainedExecutorBackend收到Driver發(fā)送的LaunchTask任務(wù)消息,其中LaunchTask是case class,而不是case object,是因?yàn)槊總€(gè)消息是一個(gè)消息實(shí)例,每個(gè)消息狀態(tài)不一樣,而case object是唯一的,因此使用case class。

1.     //Driver節(jié)點(diǎn)到 executors節(jié)點(diǎn)
2.  case class LaunchTask(data: SerializableBuffer) extends
    CoarseGrainedClusterMessage

Executor.scala的源碼如下。

Spark 2.1.1版本的Executor.scala的launchTask的源碼如下。

1.  ......
2.     //維護(hù)正在運(yùn)行的任務(wù)列表
3.    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
4.  ......
5.    def  launchTask(
6.        context: ExecutorBackend,
7.        taskId: Long,
8.        attemptNumber: Int,
9.        taskName: String,
10.       serializedTask: ByteBuffer): Unit = {
11.     val tr = new TaskRunner(context, taskId = taskId, attemptNumber =
        attemptNumber, taskName,      serializedTask)
12.     runningTasks.put(taskId, tr)
13.     threadPool.execute(tr)
14.   }
15. ......
16.
17.  class TaskRunner(
18.       execBackend: ExecutorBackend,
19.       val taskId: Long,
20.       val attemptNumber: Int,
21.       taskName: String,
22.       serializedTask: ByteBuffer)
23.     extends Runnable {
24. .......

Spark 2.2.0版本的Executor.scala的launchTask的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。

 上段代碼中第7~10行調(diào)整launchTask方法的第二個(gè)參數(shù):傳入封裝的taskDescription任務(wù)描述信息。

 上段代碼中第11行構(gòu)建TaskRunner實(shí)例傳入的也是taskDescription參數(shù)。

 上段代碼中第19~22行TaskRunner的第二個(gè)成員變量更新為TaskDescription類型。

1.  ......
2.  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription):
    Unit = {
3.      val tr = new TaskRunner(context, taskDescription)
4.     .....
5.    }
6.  ......
7.    class TaskRunner(
8.    .....
9.      private val taskDescription: TaskDescription)
10.  ...

在Executor.scala中單擊launchTask,運(yùn)行的任務(wù)使用了ConcurrentHashMap數(shù)據(jù)結(jié)構(gòu),運(yùn)行l(wèi)aunchTask的時(shí)候構(gòu)建了一個(gè)TaskRunner,TaskRunner是一個(gè)Runnable,而Runnable是Java中的接口,Scala可以直接調(diào)用Java的代碼,run方法中包括任務(wù)的反序列化等內(nèi)容。通過Runnable封裝任務(wù),然后放入到runningTasks中,在threadPool中執(zhí)行任務(wù)。threadPool是一個(gè)newDaemonCachedThreadPool。任務(wù)交給Executor的線程池中的線程去執(zhí)行,執(zhí)行的時(shí)候下載資源、數(shù)據(jù)等內(nèi)容。

Spark 2.1.1版本的Executor.scala的threadPool的源碼如下。

1.  //啟動worker線程池
2.   private val threadPool = ThreadUtils.newDaemonCachedThreadPool
     ("Executor task launch worker")

Spark 2.2.0版本的Executor.scala的threadPool的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第2行Executor的線程池由ThreadUtils.newDaemonCachedThreadPool方式調(diào)整為Executors.newCachedThreadPool(threadFactory)線程池的方式。

1.    //啟動worker線程池
2.   private val threadPool = {
3.     val threadFactory = new ThreadFactoryBuilder()
4.       .setDaemon(true)
5.       .setNameFormat("Executor task launch worker-%d")
6.       .setThreadFactory(new ThreadFactory {
7.         override def newThread(r: Runnable): Thread =
8.           //使用UninterruptibleThread 運(yùn)行任務(wù),這樣我們就可以允許運(yùn)行代碼不被
             //Thread.interrupt()線程中斷。例如,KAFKA-1894、HADOOP-10622,
             //如果某些方法被中斷,程序?qū)恢睊炱?
9.           new UninterruptibleThread(r, "unused") //thread name will be set
             by ThreadFactoryBuilder
10.      })
11.      .build()
12.    Executors.newCachedThreadPool(threadFactory).asInstanceOf
       [ThreadPoolExecutor]
13.  }
主站蜘蛛池模板: 大安市| 正定县| 东莞市| 伊宁市| 建湖县| 资中县| 鲜城| 奈曼旗| 获嘉县| 盐亭县| 塔城市| 镇原县| 左贡县| 贵阳市| 慈利县| 罗定市| 长岛县| 文水县| 高邑县| 雷山县| 兴文县| 平塘县| 达拉特旗| 改则县| 泸溪县| 潜山县| 壶关县| 聊城市| 威宁| 靖远县| 江门市| 长治市| 台山市| 城固县| 阳曲县| 炎陵县| 师宗县| 佳木斯市| 阳山县| 乐山市| 平南县|