官术网_书友最值得收藏!

8.2 Stage劃分內幕

本節講解Stage劃分原理及Stage劃分源碼。一個Application中,每個Job由一個或多個Stage構成,Stage根據寬依賴(如reducByKey、groupByKey算子等)進行劃分。

8.2.1 Stage劃分原理詳解

Spark Application中可以因為不同的Action觸發眾多的Job。也就是說,一個Application中可以有很多的Job,每個Job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage。也就是說,只有前面依賴的Stage計算完畢后,后面的Stage才會運行。

Stage劃分的依據就是寬依賴,什么時候產生寬依賴呢?例如,reducByKey、groupByKey等;Action(如collect)導致SparkContext.runJob的執行,最終導致DAGScheduler中的submitJob的執行,其核心是通過發送一個case class JobSubmitted對象給eventProcessLoop。

eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGSchedulerEvent-ProcessLoop是EventLoop的子類,具體實現EventLoop的onReceive方法。onReceive方法轉過來回調doOnReceive。在doOnReceive中通過模式匹配的方式把執行路由到JobSubmitted,在handleJobSubmitted中首先創建finalStage,創建finalStage時會建立父Stage的依賴鏈條。

8.2.2 Stage劃分源碼詳解

Spark的Action算子執行SparkContext.runJob,提交至DAGScheduler中的submitJob,submitJob發送JobSubmitted對象到eventProcessLoop循環消息隊列,提交該任務,其中JobSubmitted的源碼如下。

DAGSchedulerEvent.scala的源碼如下。

1.  private[scheduler] case class JobSubmitted(
2.     jobId: Int,
3.     finalRDD: RDD[_],
4.     func: (TaskContext, Iterator[_]) => _,
5.     partitions: Array[Int],
6.     callSite: CallSite,
7.     listener: JobListener,
8.     properties: Properties = null)
9.   extends DAGSchedulerEvent

eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGScheduler-EventProcessLoop是EventLoop的子類,具體實現EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive。

DAGScheduler.scala的源碼如下。

1.  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
        properties) =>
3.        dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
          callSite, listener, properties)
4.
5.      case MapStageSubmitted(jobId, dependency, callSite, listener,
        properties) =>
6.        dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite,
          listener, properties)
7.   ......
主站蜘蛛池模板: 拉萨市| 普兰县| 通渭县| 唐山市| 阿巴嘎旗| 柯坪县| 平和县| 冀州市| 荆州市| 尼木县| 渝北区| 聊城市| 抚顺县| 双鸭山市| 务川| 南宁市| 海阳市| 萨嘎县| 浏阳市| 新竹县| 襄樊市| 牡丹江市| 涞源县| 年辖:市辖区| 红安县| 申扎县| 平和县| 尚志市| 通化市| 乐昌市| 沧源| 上犹县| 运城市| 郴州市| 长岛县| 米泉市| 普兰县| 云阳县| 万盛区| 宝山区| 咸宁市|