- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 2193字
- 2019-12-12 17:30:03
8.1 Job到底在什么時候產生
典型的Job邏輯執行圖如圖8-1所示,經過下面四個步驟可以得到最終執行結果。

圖8-1 典型的Job邏輯執行圖
(1)從數據源(可以是本地file、內存數據結構、HDFS、HBase等)讀取數據創建最初的RDD。
(2)對RDD進行一系列的transformation()操作,每個transformation()會產生一個或多個包含不同類型T的RDD[T]。T可以是Scala里面的基本類型或數據結構,不限于(K, V)。
(3)對最后的final RDD進行action()操作,每個partition計算后產生結果result。
(4)將result回送到driver端,進行最后的f(list[result])計算。RDD可以被Cache到內存或者checkpoint到磁盤上。RDD中的partition個數不固定,通常由用戶設定。RDD和RDD間的partition的依賴關系可以不是1對1,如圖8-1所示,既有1對1關系,也有多對多關系。
8.1.1 觸發Job的原理和源碼解析
對于Spark Job觸發流程的源碼,以RDD的count方法為例開始。RDD的count方法代碼如下所示。
1. /** * 返回RDD中元素的數量 2. */ 3. def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
從上面的代碼可以看出,count方法觸發SparkContext的runJob方法的調用。SparkContext的runJob方法代碼如下所示。
1. /** * 觸發一個Job處理一個RDD的所有partitions,并且把處理結果返回到一個數組 2. */ 3. def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] ={ 4. runJob(rdd, func, 0 until rdd.partitions.length) 5. }
進入SparkContext的runJob方法的同名重載方法,代碼如下所示。
1. /** *觸發一個Job處理一個RDD的指定部分的partitions,并且把處理結果返回到一個數組 *比第一個runJob方法多了一個partitions數組參數 2. */ 3. def runJob[T, U: ClassTag]( 4. rdd: RDD[T], 5. func: Iterator[T] => U, 6. partitions: Seq[Int]): Array[U] = { 7. val cleanedFunc = clean(func) 8. runJob(rdd, (ctx: TaskContext, it: Iterator[T]) =>cleanedFunc(it), partitions) 9. }
再進入SparkContext的runJob方法的另一個同名重載方法,代碼如下所示。
1. /** *觸發一個Job處理一個RDD的指定部分的partitions,并且把處理結果返回到一個數組 *比第一個runJob方法多了一個partitions數組參數,并且func的類型不同 2. */ 3. def runJob[T, U: ClassTag]( 4. rdd: RDD[T], 5. func: (TaskContext, Iterator[T]) => U, 6. partitions: Seq[Int]): Array[U] = { 7. val results = new Array[U](partitions.size) 8. runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) 9. results 10. }
最后一次進入SparkContext的runJob方法的另一個同名重載方法,代碼如下所示。
1. /** *觸發一個Job處理一個RDD的指定部分的partitions,并把處理結果給指定的handler *函數,這是Spark所有Action的主入口 2. */ 3. def runJob[T, U: ClassTag]( 4. rdd: RDD[T], 5. func: (TaskContext, Iterator[T]) => U, 6. partitions: Seq[Int], 7. resultHandler: (Int, U) => Unit): Unit = { 8. if (stopped.get()) { 9. throw new IllegalStateException("SparkContext has been shutdown") 10. } 11. //記錄了方法調用的方法棧 12. val callSite = getCallSite 13. //清除閉包,為了函數能夠序列化 14. val cleanedFunc = clean(func) 15. logInfo("Starting Job: " + callSite.shortForm) 16. if (conf.getBoolean("spark.logLineage", false)) { 17. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) 18. } 19. //向高層調度器(DAGScheduler)提交Job,從而獲得Job執行結果 20. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) 21. progressBar.foreach(_.finishAll()) 22. rdd.doCheckpoint() 23. }
8.1.2 觸發Job的算子案例
Spark Application里可以產生一個或者多個Job,例如,spark-shell默認啟動時內部就沒有Job,只是作為資源的分配程序,可以在spark-shell里面寫代碼產生若干個Job,普通程序中一般可以有不同的Action,每個Action一般也會觸發一個Job。
給定Job的邏輯執行圖,如何生成物理執行圖,也就是給定這樣一個復雜數據依賴圖,如何合理劃分Stage,并確定Task的類型和個數?
一個直觀的想法是將前后關聯的RDDs組成一個Stage,每個Stage生成一個Task。這樣雖然可以解決問題,但效率不高。除了效率問題,這個想法還有一個更嚴重的問題:大量中間數據需要存儲。對于task來說,其執行結果要么存到磁盤,要么存到內存,或者兩者皆有。如果每個箭頭都是Task,每個RDD里面的數據都需要存起來,占用空間可想而知。
仔細觀察一下邏輯執行圖會發現:在每個RDD中,每個Partition是獨立的。也就是說,在RDD內部,每個Partition的數據依賴各自不會相互干擾。因此,一個大膽的想法是將整個流程圖看成一個Stage,為最后一個finalRDD中的每個Partition分配一個Task。
Spark算法構造和物理執行時最基本的核心:最大化Pipeline?;赑ipeline的思想,數據被使用的時候才開始計算,從數據流動的視角來說,是數據流動到計算的位置。實質上,從邏輯的角度看,是算子在數據上流動。從算法構建的角度而言:肯定是算子作用于數據,所以是算子在數據上流動;方便算法的構建。
從物理執行的角度而言:是數據流動到計算的位置;方便系統最為高效地運行。對于Pipeline而言,數據計算的位置就是每個Stage中最后的RDD,一個震撼人心的內幕真相是:每個Stage中除了最后一個RDD算子是真實的外,前面的算子都是假的。計算的Lazy特性導致計算從后往前回溯,形成Computing Chain,導致的結果是需要首先計算出具體一個Stage內部左側的RDD中本次計算依賴的Partition,如圖8-2所示。
整個Computing Chain根據數據依賴關系自后向前建立,遇到ShuffleDependency后形成Stage。在每個Stage中,每個RDD中的compute()調用parentRDD.iter()將parent RDDs中的records一個個fetch過來。

圖8-2 Stage示意圖
例如,collect前面的RDD是transformation級別的,不會立即執行。從后往前推,回溯時如果是窄依賴,則在內存中迭代,否則把中間結果寫出到磁盤,暫存給后面的計算使用。
依賴分為窄依賴和寬依賴。例如,現實生活中,工作依賴一個對象,是窄依賴,依賴很多對象,是寬依賴。窄依賴除了一對一,還有range級別的依賴,依賴固定的個數,隨著數據的規模擴大而改變。如果是寬依賴,DAGScheduler會劃分成不同的Stage,Stage內部是基于內存迭代的,也可以基于磁盤迭代,Stage內部計算的邏輯是完全一樣的,只是計算的數據不同而已。具體的任務就是計算一個數據分片,一個Partition的大小是128MB。一個partition不是完全精準地等于一個block的大小,一般最后一條記錄跨兩個block。
Spark程序的運行有兩種部署方式:Client和Cluster。
默認情況下建議使用Client模式,此模式下可以看到更多的交互性信息,及運行過程的信息。此時要專門使用一臺機器來提交我們的Spark程序,配置和普通的Worker配置一樣,而且要和Cluster Manager在同樣的網絡環境中,因為要指揮所有的Worker去工作,Worker里的線程要和Driver不斷地交互。由于Driver要驅動整個集群,頻繁地和所有為當前程序分配的Executor去交互,頻繁地進行網絡通信,所以必須在同樣的網絡中。
也可以指定部署方式為Cluster,這樣真正的Driver會由Master決定在Worker中的某一臺機器。Master為你分配的第一個Executor就是Driver級別的Executor。不推薦學習、開發的時候使用Cluster,因為Cluster無法直接看到一些日志信息,所以建議使用Client方式。