- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 498字
- 2019-12-12 17:30:03
8.2 Stage劃分內幕
本節講解Stage劃分原理及Stage劃分源碼。一個Application中,每個Job由一個或多個Stage構成,Stage根據寬依賴(如reducByKey、groupByKey算子等)進行劃分。
8.2.1 Stage劃分原理詳解
Spark Application中可以因為不同的Action觸發眾多的Job。也就是說,一個Application中可以有很多的Job,每個Job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage。也就是說,只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
Stage劃分的依據就是寬依賴,什么時候產生寬依賴呢?例如,reducByKey、groupByKey等;Action(如collect)導致SparkContext.runJob的執行,最終導致DAGScheduler中的submitJob的執行,其核心是通過發送一個case class JobSubmitted對象給eventProcessLoop。
eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGSchedulerEvent-ProcessLoop是EventLoop的子類,具體實現EventLoop的onReceive方法。onReceive方法轉過來回調doOnReceive。在doOnReceive中通過模式匹配的方式把執行路由到JobSubmitted,在handleJobSubmitted中首先創建finalStage,創建finalStage時會建立父Stage的依賴鏈條。
8.2.2 Stage劃分源碼詳解
Spark的Action算子執行SparkContext.runJob,提交至DAGScheduler中的submitJob,submitJob發送JobSubmitted對象到eventProcessLoop循環消息隊列,提交該任務,其中JobSubmitted的源碼如下。
DAGSchedulerEvent.scala的源碼如下。
1. private[scheduler] case class JobSubmitted( 2. jobId: Int, 3. finalRDD: RDD[_], 4. func: (TaskContext, Iterator[_]) => _, 5. partitions: Array[Int], 6. callSite: CallSite, 7. listener: JobListener, 8. properties: Properties = null) 9. extends DAGSchedulerEvent
eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGScheduler-EventProcessLoop是EventLoop的子類,具體實現EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive。
DAGScheduler.scala的源碼如下。
1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { 2. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => 3. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 4. 5. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => 6. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) 7. ......