官术网_书友最值得收藏!

4.2 DAGScheduler解析

DAGScheduler是面向Stage的高層調度器。本節(jié)講解DAG的定義、DAG的實例化、DAGScheduler劃分Stage的原理、DAGScheduler劃分Stage的具體算法、Stage內部Task獲取最佳位置的算法等內容。

4.2.1 DAG的定義

DAGScheduler是面向Stage的高層級的調度器,DAGScheduler把DAG拆分成很多的Tasks,每組的Tasks都是一個Stage,解析時是以Shuffle為邊界反向解析構建Stage,每當遇到Shuffle,就會產生新的Stage,然后以一個個TaskSet(每個Stage封裝一個TaskSet)的形式提交給底層調度器TaskScheduler。DAGScheduler需要記錄哪些RDD被存入磁盤等物化動作,同時要尋求Task的最優(yōu)化調度,如在Stage內部數(shù)據(jù)的本地性等。DAGScheduler還需要監(jiān)視因為Shuffle跨節(jié)點輸出可能導致的失敗,如果發(fā)現(xiàn)這個Stage失敗,可能就要重新提交該Stage。

為了更好地理解Spark高層調度器DAGScheduler,須綜合理解RDD、Application、Driver Program、Job內容,還需要了解以下概念。

(1)Stage:一個Job需要拆分成多組任務來完成,每組任務由Stage封裝。與一個Job所有涉及的PartitionRDD類似,Stage之間也有依賴關系。

(2)TaskSet:一組任務就是一個TaskSet,對應一個Stage。其中,一個TaskSet的所有Task之間沒有Shuffle依賴,因此互相之間可以并行運行。

(3)Task:一個獨立的工作單元,由Driver Program發(fā)送到Executor上去執(zhí)行。通常情況下,一個Task處理RDD的一個Partition的數(shù)據(jù)。根據(jù)Task返回類型的不同,Task又分為ShuffleMapTask和ResultTask。

4.2.2 DAG的實例化

在Spark源碼中,DAGScheduler是整個Spark Application的入口,即在SparkContext中聲明并實例化。在實例化DAGScheduler之前,已經(jīng)實例化了SchedulerBackend和底層調度器TaskScheduler,而SchedulerBackend和TaskScheduler是通過SparkContext的方法createTaskScheduler實例化的。DAGScheduler在提交TaskSet給底層調度器的時候是面向TaskScheduler接口的,這符合面向對象中依賴抽象,而不依賴具體實現(xiàn)的原則,帶來底層資源調度器的可插拔性,以至于Spark可以運行在眾多的部署模式上,如Standalone、Yarn、Mesos、Local及其他自定義的部署模式。

SparkContext.scala的源碼中相關的代碼如下。

1.   class SparkContext(config: SparkConf) extends Logging {
2.  .......
3.  @volatile private var _dagScheduler: DAGScheduler = _
4.  ......
5.    private[spark] def dagScheduler: DAGScheduler = _dagScheduler
6.    private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
7.      _dagScheduler = ds
8.    }
9.  ......
10.    val (sched, ts) = SparkContext.createTaskScheduler(this, master,
       deployMode)
11.     _schedulerBackend = sched
12.     _taskScheduler = ts
13.      //實例化DAGScheduler時傳入當前的SparkContext實例化對象
14.     _dagScheduler = new DAGScheduler(this)
15.     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
16. ......
17.  _taskScheduler.start()

DAGScheduler.scala的源碼中相關的代碼如下。

1.    private[spark]
2.  class DAGScheduler(
3.      private[scheduler] val sc: SparkContext,
4.      private[scheduler] val taskScheduler: TaskScheduler,
5.      listenerBus: LiveListenerBus,
6.      mapOutputTracker: MapOutputTrackerMaster,
7.      blockManagerMaster: BlockManagerMaster,
8.      env: SparkEnv,
9.      clock: Clock = new SystemClock())
10.   extends Logging {
11. ......
12.   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
13.     this(
14.       sc,
15.      taskScheduler,
16.      sc.listenerBus,
17.      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
18.      sc.env.blockManager.master,
19.      sc.env)
20.  }
21.
22.  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

4.2.3 DAGScheduler劃分Stage的原理

Spark將數(shù)據(jù)在分布式環(huán)境下分區(qū),然后將作業(yè)轉化為DAG,并分階段進行DAG的調度和任務的分布式并行處理。DAG將調度提交給DAGScheduler,DAGScheduler調度時會根據(jù)是否需要經(jīng)過Shuffle過程將Job劃分為多個Stage。

DAG劃分Stage及Stage并行計算示意圖如圖4-3所示。

圖4-3 DAG劃分Stage及Stage并行計算示意圖

其中,實線圓角方框標識的是RDD,方框中的矩形塊為RDD的分區(qū)。

在圖4-3中,RDD A到RDD B之間,以及RDD F到RDD G之間的數(shù)據(jù)需要經(jīng)過Shuffle過程,因此RDD A和RDD F分別是Stage 1跟Stage 3和Stage 2跟Stage 3的劃分點。而RDD B到RDD G之間,以及RDD C到RDD D到RDD F和RDD E到RDD F之間的數(shù)據(jù)不需要經(jīng)過Shuffle過程,因此,RDD G和RDD B的依賴是窄依賴,RDD B和RDD G劃分到同一個Stage 3,RDD F和RDD D和RDD E的依賴以及RDD D和RDD C的依賴是窄依賴,RDD C、RDD D、RDD E和RDD F劃分到同一個Stage 2。Stage 1和Stage 2是相互獨立的,可以并發(fā)執(zhí)行。而由于Stage 3依賴Stage 1和Stage 2的計算結果,所以Stage 3最后執(zhí)行計算。

根據(jù)以上RDD依賴關系的描述,圖4-3中的操作算子中,map和union是窄依賴的操作,因為子RDD(如D)的分區(qū)只依賴父RDD(如C)的一個分區(qū),其他常見的窄依賴的操作如filter、flatMap和join(每個分區(qū)和已知的分區(qū)join)等。groupByKey和join是寬依賴的操作,其他常見的寬依賴的操作如reduceByKey等。

由此可見,在DAGScheduler的調度過程中,Stage階段的劃分是根據(jù)是否有Shuffle過程,也就是當存在ShuffleDependency的寬依賴時,需要進行Shuffle,這時才會將作業(yè)(Job)劃分成多個Stage。

4.2.4 DAGScheduler劃分Stage的具體算法

Spark作業(yè)調度的時候,在Job提交過程中進行Stage劃分以及確定Task的最佳位置。Stage的劃分是DAGScheduler工作的核心,涉及作業(yè)在集群中怎么運行,Task最佳位置數(shù)據(jù)本地性的內容。Spark算子的構建是鏈式的,涉及怎么進行計算,首先是劃分Stage,Stage劃分以后才是計算的本身;分布式大數(shù)據(jù)系統(tǒng)追求最大化的數(shù)據(jù)本地性。數(shù)據(jù)本地性是指數(shù)據(jù)進行計算的時候,數(shù)據(jù)就在內存中,甚至不用計算就直接獲得結果。

Spark Application中可以因為不同的Action觸發(fā)眾多的Job。也就是說,一個Application中可以有很多的Job,每個Job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage。也就是說,只有前面依賴的Stage計算完畢后,后面的Stage才會運行。

Stage劃分的根據(jù)是寬依賴。什么時候產生寬依賴呢?例如,reducByKey、groupByKey等。

我們從RDD的collect()方法開始,collect算子是一個Action,會觸發(fā)Job的運行。

RDD.scala的collect方法的源碼調用了runJob方法。

1.  def collect(): Array[T] = withScope {
2.    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
3.    Array.concat(results: _*)
4.  }

進入SparkContext.scala的runJob方法如下。

1.  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
    = {
2.    runJob(rdd, func, 0 until rdd.partitions.length)
3.  }

繼續(xù)重載runJob方法:

1.  def runJob[T, U: ClassTag](
2.      rdd: RDD[T],
3.      func: Iterator[T] => U,
4.      partitions: Seq[Int]): Array[U] = {
5.    val cleanedFunc = clean(func)
6.    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it),
      partitions)
7.  }

繼續(xù)重載runJob方法:

SparkContext.scala的源碼如下。

1.     def runJob[T, U: ClassTag](
2.      rdd: RDD[T],
3.      processPartition: Iterator[T] => U,
4.      resultHandler: (Int, U) => Unit)
5.  {
6.    val processFunc = (context: TaskContext, iter: Iterator[T]) =>
      processPartition(iter)
7.    runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length,
      resultHandler)
8.  }

繼續(xù)重載runJob方法:

1.       def runJob[T, U: ClassTag](
2.       rdd: RDD[T],
3.       func: (TaskContext, Iterator[T]) => U,
4.       partitions: Seq[Int],
5.       resultHandler: (Int, U) => Unit): Unit = {
6.     if (stopped.get()) {
7.       throw new IllegalStateException("SparkContext has been shutdown")
8.     }
9.     val callSite = getCallSite
10.    val cleanedFunc = clean(func)
11.    logInfo("Starting job: " + callSite.shortForm)
12.    if (conf.getBoolean("spark.logLineage", false)) {
13.      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
14.    }
15.    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite,
       resultHandler, localProperties.get)
16.    progressBar.foreach(_.finishAll())
17.    rdd.doCheckpoint()
18.  }

進入DAGScheduler.scala的runJob方法:

Spark 2.1.1版本的DAGScheduler.scala的源碼如下。

1.   def runJob[T, U](
2.        rdd: RDD[T],
3.        func: (TaskContext, Iterator[T]) => U,
4.        partitions: Seq[Int],
5.        callSite: CallSite,
6.        resultHandler: (Int, U) => Unit,
7.        properties: Properties): Unit = {
8.      val start = System.nanoTime
9.      val waiter = submitJob(rdd, func, partitions, callSite, resultHandler,
        properties)
10.     //注意:不要調用 Await.ready(future),因為它調用scala.concurrent.blocking,
        //如果使用fork-join pool連接池,則并發(fā)SQL執(zhí)行失敗。注意,由于Scala的特質,
        //awaitPermission實際上沒有在任何地方使用,所以這里安全地傳遞null。更多的細
        //節(jié),可參閱SPARK-13747
11.     val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
12.     waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
13.     waiter.completionFuture.value.get match {
14.       case scala.util.Success(_) =>
15.         logInfo("Job %d finished: %s, took %f s".format
16.         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
17.       case scala.util.Failure(exception) =>
18.         logInfo("Job %d failed: %s, took %f s".format
19.         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
20.         //SPARK-8644:包括來自用戶DAGScheduler異常堆棧跟蹤
21.         val callerStackTrace = Thread.currentThread().getStackTrace.tail
22.         exception.setStackTrace(exception.getStackTrace ++
            callerStackTrace)
23.         throw exception
24.     }
25.   }

Spark 2.2.0版本DAGScheduler.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第10~12行代碼刪除。

 上段代碼中第13行代碼之前新增一行代碼:

ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

Spark 2.2.0版本刪掉了null.asInstanceOf[scala.concurrent.CanAwait]的使用,調整為使用ThreadUtils.awaitReady (waiter.completionFuture, Duration.Inf)方法。

1.   ......
2.    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
3.  ......

DAGScheduler runJob的時候就交給了submitJob,waiter等待作業(yè)調度的結果,作業(yè)成功或者失敗,打印相關的日志信息。進入DAGScheduler的submitJob方法如下。

1.     def submitJob[T, U](
2.       rdd: RDD[T],
3.       func: (TaskContext, Iterator[T]) => U,
4.       partitions: Seq[Int],
5.       callSite: CallSite,
6.       resultHandler: (Int, U) => Unit,
7.       properties: Properties): JobWaiter[U] = {
8.     //檢查,以確保我們不在不存在的分區(qū)上啟動任務
9.     val maxPartitions = rdd.partitions.length
10.    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
11.      throw new IllegalArgumentException(
12.        "Attempting to access a non-existent partition: " + p + ". " +
13.          "Total number of partitions: " + maxPartitions)
14.    }
15.
16.    val jobId = nextJobId.getAndIncrement()
17.    if (partitions.size == 0) {
18.      //如果作業(yè)正在運行0個任務,則立即返回
19.      return new JobWaiter[U](this, jobId, 0, resultHandler)
20.    }
21.
22.    assert(partitions.size > 0)
23.    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
24.    val waiter = new JobWaiter(this, jobId, partitions.size,
       resultHandler)
25.    eventProcessLoop.post(JobSubmitted(
26.      jobId, rdd, func2, partitions.toArray, callSite, waiter,
27.      SerializationUtils.clone(properties)))
28.    waiter
29.  }

submitJob方法中,submitJob首先獲取rdd.partitions.length,校驗運行的時候partitions是否存在。submitJob方法關鍵的代碼是eventProcessLoop.post(JobSubmitted的JobSubmitted,JobSubmitted是一個case class,而不是一個case object,因為application中有很多的Job,不同的Job的JobSubmitted實例不一樣,如果使用case object,case object展示的內容是一樣的,就像全局唯一變量,而現(xiàn)在我們需要不同的實例,因此使用case class。JobSubmitted的成員finalRDD是最后一個RDD。

由Action(如collect)導致SparkContext.runJob的執(zhí)行,最終導致DAGScheduler中的submitJob的執(zhí)行,其核心是通過發(fā)送一個case class JobSubmitted對象給eventProcessLoop。其中,JobSubmitted的源碼如下。

1.   private[scheduler] case class JobSubmitted(
2.    jobId: Int,
3.    finalRDD: RDD[_],
4.    func: (TaskContext, Iterator[_]) => _,
5.    partitions: Array[Int],
6.    callSite: CallSite,
7.    listener: JobListener,
8.    properties: Properties = null)
9.  extends DAGSchedulerEvent

JobSubmitted是private[scheduler]級別的,用戶不可直接調用它。JobSubmitted封裝了jobId,封裝了最后一個finalRDD,封裝了具體對RDD操作的函數(shù)func,封裝了有哪些partitions要進行計算,也封裝了作業(yè)監(jiān)聽器listener、狀態(tài)等內容。

DAGScheduler的submitJob方法關鍵代碼eventProcessLoop.post(JobSubmitted中,將JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一個線程中發(fā)一個消息。eventProcessLoop的源碼如下。

1.  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop
    (this)

DAGSchedulerEventProcessLoop繼承自EventLoop。

1.   private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler:
     DAGScheduler)
2.  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with
    Logging {

EventLoop中開啟了一個線程eventThread,線程設置成Daemon后臺運行的方式;run方法里面調用了onReceive(event)方法。post方法就是往eventQueue.put事件隊列中放入一個元素。EventLoop的源碼如下。

1.   private[spark] abstract class EventLoop[E](name: String) extends Logging {
2.
3.    private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
4.
5.    private val stopped = new AtomicBoolean(false)
6.
7.    private val eventThread = new Thread(name) {
8.      setDaemon(true)
9.
10.     override def run(): Unit = {
11.       try {
12.         while (!stopped.get) {
13.           val event = eventQueue.take()
14.           try {
15.             onReceive(event)
16.           } catch {
17.             case NonFatal(e) =>
18.               try {
19.                 onError(e)
20.               } catch {
21.                 case NonFatal(e) => logError("Unexpected error in " + name, e)
22.               }
23.           }
24.         }
25.       } catch {
26.         case ie: InterruptedException => //即使eventQueue不為空,退出
27.         case NonFatal(e) => logError("Unexpected error in " + name, e)
28.       }
29.     }
30.
31.   }
32.
33.   def start(): Unit = {
34.     if (stopped.get) {
35.       throw new IllegalStateException(name + " has already been stopped")
36.     }
37.     //調用OnStart啟動事件線程,確保其發(fā)生在onReceive方法前
38.     onStart()
39.     eventThread.start()
40.   }
41. ......
42. def post(event: E): Unit = {
43.     eventQueue.put(event)
44.   }

eventProcessLoop是DAGSchedulerEventProcessLoo實例,DAGSchedulerEventProcessLoop繼承自EventLoop,具體實現(xiàn)onReceive方法,onReceive方法又調用doOnReceive方法。

doOnReceive收到消息后開始處理。

Spark 2.1.1版本的DAGScheduler.scala的源碼如下。

1.   private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
       properties) =>
3.       dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
         callSite, listener, properties)
4.
5.     case MapStageSubmitted(jobId, dependency, callSite, listener,
       properties) =>
6.       dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite,
         listener, properties)
7.
8.     case StageCancelled(stageId) =>
9.       dagScheduler.handleStageCancellation(stageId)
10.
11.    case JobCancelled(jobId) =>
12.      dagScheduler.handleJobCancellation(jobId)
13.
14.    case JobGroupCancelled(groupId) =>
15.      dagScheduler.handleJobGroupCancelled(groupId)
16.
17.    case AllJobsCancelled =>
18.      dagScheduler.doCancelAllJobs()
19.
20.    case ExecutorAdded(execId, host) =>
21.      dagScheduler.handleExecutorAdded(execId, host)
22.
23.    case ExecutorLost(execId, reason) =>
24.      val filesLost = reason match {
25.        case SlaveLost(_, true) => true
26.        case _ => false
27.      }
28.      dagScheduler.handleExecutorLost(execId, filesLost)
29.
30.    case BeginEvent(task, taskInfo) =>
31.      dagScheduler.handleBeginEvent(task, taskInfo)
32.
33.    case GettingResultEvent(taskInfo) =>
34.      dagScheduler.handleGetTaskResult(taskInfo)
35.
36.    case completion: CompletionEvent =>
37.      dagScheduler.handleTaskCompletion(completion)
38.
39.    case TaskSetFailed(taskSet, reason, exception) =>
40.      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
41.
42.    case ResubmitFailedStages =>
43.      dagScheduler.resubmitFailedStages()
44.  }

Spark 2.2.0版本的DAGScheduler.scala的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第8~9行StageCancelled增加一個成員變量reason,說明Stage取消的原因。

 上段代碼中第11~12行JobCancelled增加一個成員變量reason,說明Job取消的原因。

1.   ....
2.      case StageCancelled(stageId, reason) =>
3.        dagScheduler.handleStageCancellation(stageId, reason)
4.
5.      case JobCancelled(jobId, reason) =>
6.        dagScheduler.handleJobCancellation(jobId, reason)
7.  .......

總結:EventLoop里面開啟一個線程,線程里面不斷循環(huán)一個隊列,post的時候就是將消息放到隊列中,由于消息放到隊列中,在不斷循環(huán),所以可以拿到這個消息,轉過來回調方法onReceive(event),在onReceive處理的時候就調用了doOnReceive方法。

關于線程的異步通信:為什么要新開辟一條線程?例如,在DAGScheduler發(fā)送消息為何不直接調用doOnReceive,而需要一個消息循環(huán)器。DAGScheduler這里自己給自己發(fā)消息,不管是自己發(fā)消息,還是別人發(fā)消息,都采用一條線程去處理,兩者處理的邏輯是一致的,擴展性就非常好。使用消息循環(huán)器,就能統(tǒng)一處理所有的消息,保證處理的業(yè)務邏輯都是一致的。

eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGScheduler-EventProcessLoop是EventLoop的子類,具體實現(xiàn)EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive。

在doOnReceive中通過模式匹配的方式把執(zhí)行路由到case JobSubmitted,調用dagScheduler.handleJobSubmitted方法。

1.  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
       properties) =>
3.       dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
         callSite, listener, properties)

DAGScheduler的handleJobSubmitted的源碼如下。

1.    private[scheduler] def handleJobSubmitted(jobId: Int,
2.       finalRDD: RDD[_],
3.       func: (TaskContext, Iterator[_]) => _,
4.       partitions: Array[Int],
5.       callSite: CallSite,
6.       listener: JobListener,
7.       properties: Properties) {
8.     var finalStage: ResultStage = null
9.     try {
10.      //如果作業(yè)運行在HadoopRDD上,而底層HDFS的文件已被刪除,那么在創(chuàng)建新的Stage
         //時將會跑出一個異常
11.      finalStage = createResultStage(finalRDD, func, partitions, jobId,
         callSite)
12.    } catch {
13.      case e: Exception =>
14.        logWarning("Creating new stage failed due to exception - job: " +
           jobId, e)
15.        listener.jobFailed(e)
16.        return
17.    }
18.
19.    val job = new ActiveJob(jobId, finalStage, callSite, listener,
       properties)
20.    clearCacheLocs()
21.    logInfo("Got job %s (%s) with %d output partitions".format(
22.      job.jobId, callSite.shortForm, partitions.length))
23.    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
24.    logInfo("Parents of final stage: " + finalStage.parents)
25.    logInfo("Missing parents: " + getMissingParentStages(finalStage))
26.
27.    val jobSubmissionTime = clock.getTimeMillis()
28.    jobIdToActiveJob(jobId) = job
29.    activeJobs += job
30.    finalStage.setActiveJob(job)
31.    val stageIds = jobIdToStageIds(jobId).toArray
32.    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).
       map(_.latestInfo))
33.    listenerBus.post(
34.      SparkListenerJobStart(job.jobId,          jobSubmissionTime,      stageInfos,
         properties))
35.    submitStage(finalStage)
36.  }

Stage開始:每次調用一個runJob就產生一個Job;finalStage是一個ResultStage,最后一個Stage是ResultStage,前面的Stage是ShuffleMapStage。

在handleJobSubmitted中首先創(chuàng)建finalStage,創(chuàng)建finalStage時會建立父Stage的依賴鏈條。

通過createResultStage創(chuàng)建finalStage,傳入的參數(shù)包括最后一個finalRDD,操作的函數(shù)func,分區(qū)partitions、jobId、callSite等內容。創(chuàng)建過程中可能捕獲異常。例如,在Hadoop上,底層的hdfs文件被刪除了或者被修改了,就出現(xiàn)異常。

createResultStage的源碼如下。

1.   private def createResultStage(
2.        rdd: RDD[_],
3.        func: (TaskContext, Iterator[_]) => _,
4.        partitions: Array[Int],
5.        jobId: Int,
6.        callSite: CallSite): ResultStage = {
7.      val parents = getOrCreateParentStages(rdd, jobId)
8.      val id = nextStageId.getAndIncrement()
9.      val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
        callSite)
10.     stageIdToStage(id) = stage
11.     updateJobIdStageIdMaps(jobId, stage)
12.     stage
13.   }

createResultStage中,基于作業(yè)ID,作業(yè)ID(jobId)是作為第三個參數(shù)傳進來的,創(chuàng)建了ResultStage。

createResultStage的getOrCreateParentStages獲取或創(chuàng)建一個給定RDD的父Stages列表,新的Stages將提供firstJobId創(chuàng)建。

getOrCreateParentStages的源碼如下。

1.     private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int):
       List[Stage] = {
2.    getShuffleDependencies(rdd).map { shuffleDep =>
3.      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
4.    }.toList
5.  }

getOrCreateParentStages調用了getShuffleDependencies(rdd),getShuffleDependencies返回給定RDD的父節(jié)點中直接的shuffle依賴。這個函數(shù)不會返回更遠祖先節(jié)點的依賴。例如,如果C shuffle依賴于B,B shuffle依賴于A:A <-- B <-- C。在RDD C中調用getShuffleDependencies函數(shù),將只返回B <-- C的依賴。此功能可用作單元測試。

下面根據(jù)DAG劃分Stage示意圖,如圖4-4所示。

圖4-4 DAG劃分Stage示意圖

RDD G在getOrCreateParentStages的getShuffleDependencies的時候同時依賴于RDD B,RDD F;看依賴關系,RDD G和RDD B在同一個Stage里,RDD G和RDD F不在同一個Stage里,根據(jù)Shuffle依賴產生了一個新的Stage。如果不是Shuffle級別的依賴,就將其加入waitingForVisit.push(dependency.rdd),waitingForVisit是一個棧Stack,把當前依賴的RDD push進去。然后進行while循環(huán),當waitingForVisit不是空的情況下,將waitingForVisit.pop()的內容彈出來放入到toVisit,如果已經(jīng)訪問過的數(shù)據(jù)結構visited中沒有訪問記錄,那么toVisit.dependencies再次循環(huán)遍歷:如果是Shuffle依賴,就加入到parents數(shù)據(jù)結構;如果是窄依賴,就加入到waitingForVisit。

例如,首先將RDD G放入到waitingForVisit,然后看RDD G的依賴關系,依賴RDD B、RDD F;RDD G和RDD F構成的是寬依賴,所以就加入父Stage里,是一個新的Stage。但如果是窄依賴,就把RDD B放入到棧waitingForVisit中,RDD G和RDD B在同一個Stage中。棧waitingForVisit現(xiàn)在又有新的元素RDD B,然后再次進行循環(huán),獲取到寬依賴RDD A,將構成一個新的Stage。RDD G的getShuffleDependencies最終返回HashSet (ShuffleDependency(RDD F),ShuffleDependency(RDD A))。然后getShuffleDependencies(rdd). map遍歷調用getOrCreateShuffleMapStage直接創(chuàng)建父Stage。

getShuffleDependencies的源碼如下。

1.       private[scheduler] def getShuffleDependencies(
2.       rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
3.     val parents = new HashSet[ShuffleDependency[_, _, _]]
4.     val visited = new HashSet[RDD[_]]
5.     val waitingForVisit = new Stack[RDD[_]]
6.     waitingForVisit.push(rdd)
7.     while (waitingForVisit.nonEmpty) {
8.       val toVisit = waitingForVisit.pop()
9.       if (!visited(toVisit)) {
10.        visited += toVisit
11.        toVisit.dependencies.foreach {
12.          case shuffleDep: ShuffleDependency[_, _, _] =>
13.            parents += shuffleDep
14.          case dependency =>
15.            waitingForVisit.push(dependency.rdd)
16.        }
17.      }
18.    }
19.    parents
20.  }

getOrCreateParentStages方法中通過getShuffleDependencies(rdd).map進行map轉換時用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage數(shù)據(jù)結構中shuffleId已經(jīng)存在,那就獲取一個shuffle map stage,否則,如果shuffle map stage不存在,除了即將進行計算的更遠祖先節(jié)點的shuffle map stage,還將創(chuàng)建一個自己的shuffle map stage。

getOrCreateShuffleMapStage的源碼如下。

1.   private def getOrCreateShuffleMapStage(
2.       shuffleDep: ShuffleDependency[_, _, _],
3.       firstJobId: Int): ShuffleMapStage = {
4.     shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
5.       case Some(stage) =>
6.         stage
7.
8.       case None =>
9.         //創(chuàng)建所有即將計算的祖先shuffle依賴的階段
10.        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach
           { dep =>
11.    //盡管getMissingAncestorShuffleDependencies 只返回shuffle 的依賴,其已
       //不在shuffleIdToMapStage中。我們在foreach循環(huán)中得到一個特定的依賴是可能
       //的,將被增加到shuffleIdToMapStage依賴中,其是通過早期的依賴關系創(chuàng)建的階段,
       //參考SPARK-13902
12.          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
13.            createShuffleMapStage(dep, firstJobId)
14.          }
15.        }
16.        //最后,為給定的shuffle 依賴創(chuàng)建一個階段
17.        createShuffleMapStage(shuffleDep, firstJobId)
18.    }
19.  }

getOrCreateShuffleMapStage方法中:

 如果根據(jù)shuffleId模式匹配獲取到Stage,就返回Stage。首先從shuffleIdToMapStage中根據(jù)shuffleId獲取Stage。shuffleIdToMapStage是一個HashMap數(shù)據(jù)結構,將Shuffle dependency ID對應到ShuffleMapStage的映射關系,shuffleIdToMapStage只包含當前運行作業(yè)的映射數(shù)據(jù),當Shuffle Stage作業(yè)完成時,Shuffle映射數(shù)據(jù)將被刪除,Shuffle的數(shù)據(jù)將記錄在MapOutputTracker中。

 如果根據(jù)shuffleId模式匹配沒有獲取到Stage,調用getMissingAncestorShuffle-Dependencies方法,createShuffleMapStage創(chuàng)建所有即將進行計算的祖先shuffle依賴的Stages。

getMissingAncestorShuffleDependencies查找shuffle依賴中還沒有進行shuffleToMapStage注冊的祖先節(jié)點。

getMissingAncestorShuffleDependencies的源碼如下。

1.      private def getMissingAncestorShuffleDependencies(
2.       rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
3.     val ancestors = new Stack[ShuffleDependency[_, _, _]]
4.     val visited = new HashSet[RDD[_]]
5.     //手動維護堆棧來防止通過遞歸訪問造成的堆棧溢出異常
6.     val waitingForVisit = new Stack[RDD[_]]
7.     waitingForVisit.push(rdd)
8.     while (waitingForVisit.nonEmpty) {
9.       val toVisit = waitingForVisit.pop()
10.      if (!visited(toVisit)) {
11.        visited += toVisit
12.        getShuffleDependencies(toVisit).foreach { shuffleDep =>
13.          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
14.            ancestors.push(shuffleDep)
15.            waitingForVisit.push(shuffleDep.rdd)
16.          } //依賴關系及其已經(jīng)注冊的祖先
17.        }
18.      }
19.    }
20.    ancestors
21.  }

createShuffleMapStage根據(jù)Shuffle依賴的分區(qū)創(chuàng)建一個ShuffleMapStage,如果前一個Stage已生成相同的Shuffle數(shù)據(jù),那Shuffle數(shù)據(jù)仍是可用的,createShuffleMapStage方法將復制Shuffle數(shù)據(jù)的位置信息去獲取數(shù)據(jù),無須再重新生成一次數(shù)據(jù)。createShuffleMapStage的源碼如下。

1.   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId:
     Int): ShuffleMapStage = {
2.      val rdd = shuffleDep.rdd
3.      val numTasks = rdd.partitions.length
4.      val parents = getOrCreateParentStages(rdd, jobId)
5.      val id = nextStageId.getAndIncrement()
6.      val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId,
        rdd.creationSite, shuffleDep)
7.      stageIdToStage(id) = stage
8.      shuffleIdToMapStage(shuffleDep.shuffleId) = stage
9.      updateJobIdStageIdMaps(jobId, stage)
10.     if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
11.       //以前運行的階段為這個shuffle生成的分區(qū),對于每個輸出仍然可用,將輸出位置的
          //信息復制到新階段(所以沒必要重新計算數(shù)據(jù))
12.       val    serLocs    =   mapOutputTracker.getSerializedMapOutputStatuses
          (shuffleDep.shuffleId)
13.       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
14.       (0 until locs.length).foreach { i =>
15.         if (locs(i) ne null) {
16.           //locs(i) will be null if missing
17.           stage.addOutputLoc(i, locs(i))
18.         }
19.       }
20.     } else {
21.       //這里需要注冊RDDS與緩存和map輸出跟蹤器,不能在RDD構造函數(shù)實現(xiàn),因為分區(qū)
          //是未知的
22.       logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
23.       mapOutputTracker.registerShuffle(shuffleDep.shuffleId,
          rdd.partitions.length)
24.     }
25.     stage
26.   }

回到handleJobSubmitted,創(chuàng)建finalStage以后將提交finalStage。

1.     private[scheduler] def handleJobSubmitted(jobId: Int,
2.  ......
3.    finalStage = createResultStage(finalRDD, func, partitions, jobId,
      callSite)
4.  ......
5.    submitStage(finalStage)
6.    }

submitStage提交Stage,首先遞歸提交即將計算的父Stage。

submitStage的源碼如下。

1.  private def submitStage(stage: Stage) {
2.    val jobId = activeJobForStage(stage)
3.    if (jobId.isDefined) {
4.      logDebug("submitStage(" + stage + ")")
5.      if (!waitingStages(stage) && !runningStages(stage) && !failedStages
        (stage)) {
6.        val missing = getMissingParentStages(stage).sortBy(_.id)
7.        logDebug("missing: " + missing)
8.        if (missing.isEmpty) {
9.          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has
            no missing parents")
10.          submitMissingTasks(stage, jobId.get)
11.        } else {
12.          for (parent <- missing) {
13.            submitStage(parent)
14.          }
15.          waitingStages += stage
16.        }
17.      }
18.    } else {
19.      abortStage(stage, "No active job for stage " + stage.id, None)
20.    }
21.  }

其中調用了getMissingParentStages,源碼如下。

1.     private def getMissingParentStages(stage: Stage): List[Stage] = {
2.     val missing = new HashSet[Stage]
3.     val visited = new HashSet[RDD[_]]
4.     //人工維護堆棧來防止通過遞歸訪問造成的堆棧溢出異常
5.     val waitingForVisit = new Stack[RDD[_]]
6.     def visit(rdd: RDD[_]) {
7.       if (!visited(rdd)) {
8.         visited += rdd
9.         val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
10.        if (rddHasUncachedPartitions) {
11.          for (dep <- rdd.dependencies) {
12.            dep match {
13.              case shufDep: ShuffleDependency[_, _, _] =>
14.                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.
                   firstJobId)
15.                if (!mapStage.isAvailable) {
16.                  missing += mapStage
17.                }
18.              case narrowDep: NarrowDependency[_] =>
19.                waitingForVisit.push(narrowDep.rdd)
20.            }
21.          }
22.        }
23.      }
24.    }
25.    waitingForVisit.push(stage.rdd)
26.    while (waitingForVisit.nonEmpty) {
27.      visit(waitingForVisit.pop())
28.    }
29.    missing.toList
30.  }

接下來,我們結合Spark DAG劃分Stage示意(圖4-5)進行詳細闡述。

RDD A到RDD B,以及RDD F到RDD G之間的數(shù)據(jù)需要經(jīng)過Shuffle過程,因此,RDD A和RDD F分別是Stage 1跟Stage 3、Stage 2跟Stage 3的劃分點。而RDD B到RDD G沒有Shuffle,因此,RDD G和RDD B的依賴是窄依賴,RDD B和RDD G劃分到同一個Stage 3;RDD C到RDD D、RDD F,RDD E到RDD F之間的數(shù)據(jù)不需要經(jīng)過Shuffle,RDD F和RDD D加RDD E的依賴、RDD D和RDD C的依賴是窄依賴,因此,RDD C、RDD D、RDD E和RDD F劃分到同一個Stage 2。Stage 1和Stage 2是相互獨立的,可以并發(fā)執(zhí)行。而由于Stage 3依賴Stage 1和Stage 2的計算結果,所以Stage 3最后執(zhí)行計算。

 createResultStage:基于作業(yè)ID(jobId)創(chuàng)建ResultStage。調用getOrCreateParentStages創(chuàng)建所有父Stage,返回parents: List[Stage]作為父Stage,將parents傳入ResultStage,實例化生成ResultStage。

在DAG劃分Stage示意圖中,對RDD G調用createResultStage,通過getOrCreate-ParentStages獲取所有父List[Stage]:Stage 1、Stage 2,然后創(chuàng)建自己的Stage 3。

 getOrCreateParentStages:獲取或創(chuàng)建給定RDD的父Stage列表。將根據(jù)提供的firstJobId創(chuàng)建新的Stages。

圖4-5 DAG劃分Stage示意圖

在DAG劃分Stage示意圖中,RDD G的getOrCreateParentStages會調用getShuffleDependencies獲得RDD G所有直接寬依賴集合HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A)),這里是RDD F和RDD A的寬依賴集合,然后遍歷集合,對(ShuffleDependency(RDD F), ShuffleDependency(RDD A))分別調用getOrCreateShuffleMapStage。

 對ShuffleDependency(RDD A)調用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根據(jù)shuffleDep.shuffleId模式匹配調用getMissingAncestorShuffle-Dependencies,返回為空;對ShuffleDependency(RDD A)調用createShuffleMapStage,RDD A已無父Stage,因此創(chuàng)建Stage 1。

 對ShuffleDependency(RDD F)調用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根據(jù)shuffleDep.shuffleId模式匹配調用getMissingAncestorShuffle-Dependencies,返回為空;對ShuffleDependency(RDD F)調用createShuffleMapStage,RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之間都沒有Shuffle,沒有寬依賴就不會產生Stage。因此,RDD F已無父Stage,創(chuàng)建Stage 2。

 最后,把List(Stage 1,Stage 2)作為Stage 3的父Stage,創(chuàng)建Stage 3。Stage 3是ResultStage。

回到DAGScheduler.scala的handleJobSubmitted方法,首先通過createResultStage構建finalStage。

handleJobSubmitted的源碼如下。

1.  private[scheduler] def handleJobSubmitted(jobId: Int,
2.    .......
3.      finalStage = createResultStage(finalRDD, func, partitions, jobId,
         callSite)
4.    .......
5.     val job = new ActiveJob(jobId, finalStage, callSite, listener,
       properties)
6.    .......
7.     logInfo("Missing parents: " + getMissingParentStages(finalStage))
8.     ......
9.     submitStage(finalStage)
10.  }

handleJobSubmitted方法中的ActiveJob是一個普通的數(shù)據(jù)結構,保存了當前Job的一些信息:

1.  private[spark] class ActiveJob(
2.     val jobId: Int,
3.     val finalStage: Stage,
4.     val callSite: CallSite,
5.     val listener: JobListener,
6.     val properties: Properties) {

handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissing-ParentStages根據(jù)finalStage找父Stage,如果有父Stage,就直接返回;如果沒有父Stage,就進行創(chuàng)建。

handleJobSubmitted方法中的submitStage比較重要。submitStage的源碼如下。

1.   private def submitStage(stage: Stage) {
2.      val jobId = activeJobForStage(stage)
3.      if (jobId.isDefined) {
4.        logDebug("submitStage(" + stage + ")")
5.        if (!waitingStages(stage) && !runningStages(stage) && !failedStages
          (stage)) {
6.          val missing = getMissingParentStages(stage).sortBy(_.id)
7.          logDebug("missing: " + missing)
8.          if (missing.isEmpty) {
9.            logInfo("Submitting " + stage + " (" + stage.rdd + "), which has
              no missing parents")
10.           submitMissingTasks(stage, jobId.get)
11.         } else {
12.           for (parent <- missing) {
13.             submitStage(parent)
14.           }
15.           waitingStages += stage
16.         }
17.       }
18.     } else {
19.       abortStage(stage, "No active job for stage " + stage.id, None)
20.     }
21.   }

submitStage首先從activeJobForStage中獲得JobID;如果JobID已經(jīng)定義isDefined,那就獲得即將計算的Stage(getMissingParentStages),然后進行升序排列。如果父Stage為空,那么提交submitMissingTasks,DAGScheduler把處理的過程交給具體的TaskScheduler去處理。如果父Stage不為空,將循環(huán)遞歸調用submitStage(parent),從后往前回溯。后面的Stage依賴于前面的Stage。也就是說,只有前面依賴的Stage計算完畢后,后面的Stage才會運行。submitStage一直循環(huán)調用,導致的結果是父Stage的父Stage……一直回溯到最左側的父Stage開始計算。

4.2.5 Stage內部Task獲取最佳位置的算法

Task任務本地性算法實現(xiàn):

DAGScheudler的submitMissingTasks方法中體現(xiàn)了如何利用RDD的本地性得到Task的本地性,從而獲取Stage內部Task的最佳位置。接下來看一下submitMissingTasks的源碼,關注Stage本身的算法以及任務本地性。runningStages是一個HashSet[Stage]數(shù)據(jù)結構,表示正在運行的Stages,將當前運行的Stage增加到runningStages中,根據(jù)Stage進行判斷,如果是ShuffleMapStage,則從getPreferredLocs(stage.rdd, id)獲取任務本地性信息;如果是ResultStage,則從getPreferredLocs(stage.rdd, p)獲取任務本地性信息。

DAGScheduler.scala的源碼如下。

1.   private def submitMissingTasks(stage: Stage, jobId: Int) {
2.    ......
3.     runningStages += stage
4.    ......
5.     stage match {
6.       case s: ShuffleMapStage =>
7.         outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId =
           s.numPartitions - 1)
8.       case s: ResultStage =>
9.         outputCommitCoordinator.stageStart(
10.          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
11.    }
12.

在submitMissingTasks中會通過調用以下代碼來獲得任務的本地性。

1.   val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
2.     stage match {
3.       case s: ShuffleMapStage =>
4.         partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd,
           id))}.toMap
5.       case s: ResultStage =>
6.         partitionsToCompute.map { id =>
7.           val p = s.partitions(id)
8.           (id, getPreferredLocs(stage.rdd, p))
9.         }.toMap
10.    }

partitionsToCompute獲得要計算的Partitions的id。

1.  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

如果stage是ShuffleMapStage,在代碼partitionsToCompute.map { id => (id, getPreferredLocs (stage.rdd, id))}.toMap中,id是partitions的id,使用匿名函數(shù)生成一個Tuple,第一個元素值是數(shù)據(jù)分片的id,第二個元素是把rdd和id傳進去,獲取位置getPreferredLocs。然后通過toMap轉換,返回Map[Int, Seq[TaskLocation]]。第一個值是partitions的id,第二個值是TaskLocation。

具體一個Partition中的數(shù)據(jù)本地性的算法實現(xiàn)在下述getPreferredLocs代碼中。

1.  private[spark]
2.  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
3.    getPreferredLocsInternal(rdd, partition, new HashSet)
4.  }

getPreferredLocsInternal是getPreferredLocs的遞歸實現(xiàn):這個方法是線程安全的,它只能被DAGScheduler通過線程安全方法getCacheLocs()使用。

getPreferredLocsInternal的源碼如下。

1.   private def getPreferredLocsInternal(
2.       rdd: RDD[_],
3.       partition: Int,
4.       visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
5.     //如果分區(qū)已被訪問,則無須重新訪問。這避免了路徑探索。SPARK-695
6.     if (!visited.add((rdd, partition))) {
7.       //已訪問的分區(qū)返回零
8.       return Nil
9.     }
10.    //如果分區(qū)已經(jīng)緩存,返回緩存的位置
11.    val cached = getCacheLocs(rdd)(partition)
12.    if (cached.nonEmpty) {
13.      return cached
14.    }
15.    //如果RDD位置優(yōu)先(輸入RDDs的情況),就獲取它
16.    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition))
       .toList
17.    if (rddPrefs.nonEmpty) {
18.      return rddPrefs.map(TaskLocation(_))
19.    }
20.
21.    //如果RDD是窄依賴,將選擇第一個窄依賴的第一分區(qū)作為位置首選項。理想情況下,我們
       //將基于傳輸大小選擇
22.    rdd.dependencies.foreach {
23.      case n: NarrowDependency[_] =>
24.        for (inPart <- n.getParents(partition)) {
25.          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
26.          if (locs != Nil) {
27.            return locs
28.          }
29.        }
30.
31.      case _ =>
32.    }
33.
34.    Nil
35.  }

getPreferredLocsInternal代碼中:

在visited中把當前的RDD和partition加進去是否能成功,visited是一個HashSet,如果已經(jīng)有就出錯。

如果partition被緩存(partition被緩存是指數(shù)據(jù)已經(jīng)在DAGScheduler中),則在getCacheLocs(rdd)(partition)傳入rdd和partition,獲取緩存的位置信息。如果獲取到緩存位置信息,就返回。

getCacheLocs的源碼如下。

1.  private[scheduler]
2.   def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
     cacheLocs.synchronized {
3.     //注意:這個不用getOrElse(),因為方法被調用0(任務數(shù))次
4.     if (!cacheLocs.contains(rdd.id)) {
5.       //注:如果存儲級別為NONE,我們不需要從塊管理器獲取位置信息
6.       val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel
         == StorageLevel.NONE) {
7.         IndexedSeq.fill(rdd.partitions.length)(Nil)
8.       } else {
9.         val blockIds =
10.          rdd.partitions.indices.map(index => RDDBlockId(rdd.id,
             index)).toArray[BlockId]
11.        blockManagerMaster.getLocations(blockIds).map { bms =>
12.          bms.map(bm => TaskLocation(bm.host, bm.executorId))
13.        }
14.      }
15.      cacheLocs(rdd.id) = locs
16.    }
17.    cacheLocs(rdd.id)
18.  }

getCacheLocs中的cacheLocs是一個HashMap,包含每個RDD的分區(qū)上的緩存位置信息。map的key值是RDD的ID,Value是由分區(qū)編號索引的數(shù)組。每個數(shù)組值是RDD分區(qū)緩存位置的集合。

1.  private val cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]

getPreferredLocsInternal方法在具體算法實現(xiàn)的時候首先查詢DAGScheduler的內存數(shù)據(jù)結構中是否存在當前Partition的數(shù)據(jù)本地性的信息,如果有,則直接返回;如果沒有,首先會調用rdd.getPreferedLocations。

如果自定義RDD,那一定要寫getPreferedLocations,這是RDD的五大特征之一。例如,想讓Spark運行在HBase上或者運行在一種現(xiàn)在還沒有直接支持的數(shù)據(jù)庫上面,此時開發(fā)者需要自定義RDD。為了保證Task計算的數(shù)據(jù)本地性,最關鍵的方式是必須實現(xiàn)RDD的getPreferedLocations。數(shù)據(jù)不動代碼動,以HBase為例,Spark要操作HBase的數(shù)據(jù),要求Spark運行在HBase所在的集群中,HBase是高速數(shù)據(jù)檢索的引擎,數(shù)據(jù)在哪里,Spark也需要運行在哪里。Spark能支持各種來源的數(shù)據(jù),核心就在于getPreferedLocations。如果不實現(xiàn)getPreferedLocations,就要從數(shù)據(jù)庫或HBase中將數(shù)據(jù)抓過來,速度會很慢。

RDD.scala的getPreferedLocations的源碼如下。

1.   final def preferredLocations(split: Partition): Seq[String] = {
2.    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
3.      getPreferredLocations(split)
4.    }
5.  }

這是RDD的getPreferredLocations。

1.  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

這樣,數(shù)據(jù)本地性在運行前就已經(jīng)完成,因為RDD構建的時候已經(jīng)有元數(shù)據(jù)的信息。說明:本節(jié)代碼基于Spark 2.2的源碼版本。

DAGScheduler計算數(shù)據(jù)本地性的時候巧妙地借助了RDD自身的getPreferedLocations中的數(shù)據(jù),最大化地優(yōu)化效率,因為getPreferedLocations中表明了每個Partition的數(shù)據(jù)本地性,雖然當前Partition可能被persist或者checkpoint,但是persist或者checkpoint默認情況下肯定和getPreferedLocations中的Partition的數(shù)據(jù)本地性是一致的,所以這就極大地簡化了Task數(shù)據(jù)本地性算法的實現(xiàn)和效率的優(yōu)化。

主站蜘蛛池模板: 华蓥市| 石家庄市| 蒙阴县| 南漳县| 佛学| 阿图什市| 土默特左旗| 台南县| 崇文区| 恩施市| 安溪县| 四川省| 湄潭县| 乌审旗| 鄯善县| 麟游县| 都昌县| 增城市| 潜山县| 张家川| 彭州市| 淮安市| 松原市| 赞皇县| 化隆| 新丰县| 灵石县| 苗栗县| 那坡县| 乐陵市| 芦山县| 东宁县| 江孜县| 高州市| 班戈县| 射洪县| 成安县| 大连市| 平原县| 永顺县| 怀化市|