官术网_书友最值得收藏!

8.1 Job到底在什么時候產生

典型的Job邏輯執行圖如圖8-1所示,經過下面四個步驟可以得到最終執行結果。

圖8-1 典型的Job邏輯執行圖

(1)從數據源(可以是本地file、內存數據結構、HDFS、HBase等)讀取數據創建最初的RDD。

(2)對RDD進行一系列的transformation()操作,每個transformation()會產生一個或多個包含不同類型T的RDD[T]。T可以是Scala里面的基本類型或數據結構,不限于(K, V)。

(3)對最后的final RDD進行action()操作,每個partition計算后產生結果result。

(4)將result回送到driver端,進行最后的f(list[result])計算。RDD可以被Cache到內存或者checkpoint到磁盤上。RDD中的partition個數不固定,通常由用戶設定。RDD和RDD間的partition的依賴關系可以不是1對1,如圖8-1所示,既有1對1關系,也有多對多關系。

8.1.1 觸發Job的原理和源碼解析

對于Spark Job觸發流程的源碼,以RDD的count方法為例開始。RDD的count方法代碼如下所示。

1.  /**
     * 返回RDD中元素的數量
2.   */
3.  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

從上面的代碼可以看出,count方法觸發SparkContext的runJob方法的調用。SparkContext的runJob方法代碼如下所示。

1.    /**
       * 觸發一個Job處理一個RDD的所有partitions,并且把處理結果返回到一個數組
2.     */
3.    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] ={
4.  runJob(rdd, func, 0 until rdd.partitions.length)
5.    }

進入SparkContext的runJob方法的同名重載方法,代碼如下所示。

1.    /**
       *觸發一個Job處理一個RDD的指定部分的partitions,并且把處理結果返回到一個數組
       *比第一個runJob方法多了一個partitions數組參數
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: Iterator[T] => U,
6.        partitions: Seq[Int]): Array[U] = {
7.      val cleanedFunc = clean(func)
8.  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) =>cleanedFunc(it),
    partitions)
9.    }

再進入SparkContext的runJob方法的另一個同名重載方法,代碼如下所示。

1.    /**
       *觸發一個Job處理一個RDD的指定部分的partitions,并且把處理結果返回到一個數組
       *比第一個runJob方法多了一個partitions數組參數,并且func的類型不同
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: (TaskContext, Iterator[T]) => U,
6.        partitions: Seq[Int]): Array[U] = {
7.      val results = new Array[U](partitions.size)
8.  runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
9.      results
10.   }

最后一次進入SparkContext的runJob方法的另一個同名重載方法,代碼如下所示。

1.    /**
       *觸發一個Job處理一個RDD的指定部分的partitions,并把處理結果給指定的handler
       *函數,這是Spark所有Action的主入口
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: (TaskContext, Iterator[T]) => U,
6.        partitions: Seq[Int],
7.  resultHandler: (Int, U) => Unit): Unit = {
8.      if (stopped.get()) {
9.        throw new IllegalStateException("SparkContext has been shutdown")
10.     }
11. //記錄了方法調用的方法棧
12.     val callSite = getCallSite
13. //清除閉包,為了函數能夠序列化
14.     val cleanedFunc = clean(func)
15. logInfo("Starting Job: " + callSite.shortForm)
16.     if (conf.getBoolean("spark.logLineage", false)) {
17. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
18.     }
19.     //向高層調度器(DAGScheduler)提交Job,從而獲得Job執行結果
20. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,
    localProperties.get)
21. progressBar.foreach(_.finishAll())
22. rdd.doCheckpoint()
23.   }

8.1.2 觸發Job的算子案例

Spark Application里可以產生一個或者多個Job,例如,spark-shell默認啟動時內部就沒有Job,只是作為資源的分配程序,可以在spark-shell里面寫代碼產生若干個Job,普通程序中一般可以有不同的Action,每個Action一般也會觸發一個Job。

給定Job的邏輯執行圖,如何生成物理執行圖,也就是給定這樣一個復雜數據依賴圖,如何合理劃分Stage,并確定Task的類型和個數?

一個直觀的想法是將前后關聯的RDDs組成一個Stage,每個Stage生成一個Task。這樣雖然可以解決問題,但效率不高。除了效率問題,這個想法還有一個更嚴重的問題:大量中間數據需要存儲。對于task來說,其執行結果要么存到磁盤,要么存到內存,或者兩者皆有。如果每個箭頭都是Task,每個RDD里面的數據都需要存起來,占用空間可想而知。

仔細觀察一下邏輯執行圖會發現:在每個RDD中,每個Partition是獨立的。也就是說,在RDD內部,每個Partition的數據依賴各自不會相互干擾。因此,一個大膽的想法是將整個流程圖看成一個Stage,為最后一個finalRDD中的每個Partition分配一個Task。

Spark算法構造和物理執行時最基本的核心:最大化Pipeline?;赑ipeline的思想,數據被使用的時候才開始計算,從數據流動的視角來說,是數據流動到計算的位置。實質上,從邏輯的角度看,是算子在數據上流動。從算法構建的角度而言:肯定是算子作用于數據,所以是算子在數據上流動;方便算法的構建。

從物理執行的角度而言:是數據流動到計算的位置;方便系統最為高效地運行。對于Pipeline而言,數據計算的位置就是每個Stage中最后的RDD,一個震撼人心的內幕真相是:每個Stage中除了最后一個RDD算子是真實的外,前面的算子都是假的。計算的Lazy特性導致計算從后往前回溯,形成Computing Chain,導致的結果是需要首先計算出具體一個Stage內部左側的RDD中本次計算依賴的Partition,如圖8-2所示。

整個Computing Chain根據數據依賴關系自后向前建立,遇到ShuffleDependency后形成Stage。在每個Stage中,每個RDD中的compute()調用parentRDD.iter()將parent RDDs中的records一個個fetch過來。

圖8-2 Stage示意圖

例如,collect前面的RDD是transformation級別的,不會立即執行。從后往前推,回溯時如果是窄依賴,則在內存中迭代,否則把中間結果寫出到磁盤,暫存給后面的計算使用。

依賴分為窄依賴和寬依賴。例如,現實生活中,工作依賴一個對象,是窄依賴,依賴很多對象,是寬依賴。窄依賴除了一對一,還有range級別的依賴,依賴固定的個數,隨著數據的規模擴大而改變。如果是寬依賴,DAGScheduler會劃分成不同的Stage,Stage內部是基于內存迭代的,也可以基于磁盤迭代,Stage內部計算的邏輯是完全一樣的,只是計算的數據不同而已。具體的任務就是計算一個數據分片,一個Partition的大小是128MB。一個partition不是完全精準地等于一個block的大小,一般最后一條記錄跨兩個block。

Spark程序的運行有兩種部署方式:Client和Cluster。

默認情況下建議使用Client模式,此模式下可以看到更多的交互性信息,及運行過程的信息。此時要專門使用一臺機器來提交我們的Spark程序,配置和普通的Worker配置一樣,而且要和Cluster Manager在同樣的網絡環境中,因為要指揮所有的Worker去工作,Worker里的線程要和Driver不斷地交互。由于Driver要驅動整個集群,頻繁地和所有為當前程序分配的Executor去交互,頻繁地進行網絡通信,所以必須在同樣的網絡中。

也可以指定部署方式為Cluster,這樣真正的Driver會由Master決定在Worker中的某一臺機器。Master為你分配的第一個Executor就是Driver級別的Executor。不推薦學習、開發的時候使用Cluster,因為Cluster無法直接看到一些日志信息,所以建議使用Client方式。

主站蜘蛛池模板: 白城市| 亚东县| 苍山县| 日土县| 高青县| 珲春市| 刚察县| 沾化县| 邢台县| 聂拉木县| 旌德县| 广宗县| 海安县| 乐山市| 农安县| 奉化市| 同心县| 高安市| 水城县| 睢宁县| 娄底市| 区。| 聊城市| 汝南县| 尼木县| 迁西县| 宁武县| 宁波市| 邹城市| 墨竹工卡县| 水富县| 高陵县| 历史| 定兴县| 吉林市| 崇文区| 宜君县| 神农架林区| 雷山县| 平果县| 醴陵市|