官术网_书友最值得收藏!

4.4 SchedulerBackend解析

本節講解SchedulerBackend原理剖析、SchedulerBackend源碼解析、Spark程序的注冊機制、Spark程序對計算資源Executor的管理等內容。

4.4.1 SchedulerBackend原理剖析

以Spark Standalone部署方式為例,StandaloneSchedulerBackend在啟動的時候構造了StandaloneAppClient實例,并在該實例start的時候啟動了ClientEndpoint消息循環體,ClientEndpoint在啟動的時候會向Master注冊當前程序。而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型為DriverEndPoint(這就是程序運行時的經典的對象Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息,當ExecutorBackend啟動的時候,會發送RegisteredExecutor信息向DriverEndpoint注冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是通過StandaloneSchedulerBackend擁有的計算資源來具體運行Task的。

4.4.2 SchedulerBackend源碼解析

StandaloneSchedulerBackend收集和分配資源給調度的Task使用。

StandaloneSchedulerBackend.scala的源碼如下。

1.  private[spark] class StandaloneSchedulerBackend(
2.  ......
3.   override def start() {
4.  ......
5.   val command = Command("org.apache.spark.executor.
     CoarseGrainedExecutorBackend",
6.        args, sc.executorEnvs, classPathEntries ++ testingClassPath,
          libraryPathEntries, javaOpts)
7.  ......
8.    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this,
      conf)
9.      client.start()
10. ......

在StandaloneAppClient的start方法中調用new()函數創建一個ClientEndpoint,將在ClientEndpoint中向Master注冊。

StandaloneAppClient.scala的源碼如下。

1.  ......
2.    def start() {
3.      //啟動 rpcEndpoint; 將直接回調到listener
4.      endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
        (rpcEnv)))
5.    }

4.4.3 Spark程序的注冊機制

在上面的源碼分析中,StandaloneAppClient在啟動的時候創建了StandaloneAppClient內部類ClientEndpoint的實例對象作為消息循環體,以便向Master注冊當前的Application。既然ClientEndpoint是RpcEndpoint的子類,那么就會有這樣的生命周期:constructor -> onStart -> receive ->onStop。根據這個原理,我們來看ClientEndpoint的onStart方法代碼。

StandaloneAppClient.scala的源碼如下。

1.  override def onStart(): Unit = {
2.    try {
3.      registerWithMaster(1)
4.    } catch {
5.   .......

ClientEndpoint在啟動時就立即調用registerWithMaster來注冊Application,繼續查看registerWithMaster方法代碼。

StandaloneAppClient.scala的源碼如下。

1.    private def registerWithMaster(nthRetry: Int) {
2.  //向所有Master異步地嘗試注冊Application
3.        registerMasterFutures.set(tryRegisterAllMasters())
4.      ......
5.      }

ClientEndpoint在tryRegiesterAllMasters方法中會向所有的Master嘗試注冊Application。向Master發送RegisterApplication消息。

StandaloneAppClient.scala的源碼如下。

1.    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.     val   masterRef     =  rpcEnv.setupEndpointRef(masterAddress,  Master.
       ENDPOINT_NAME)
4.            masterRef.send(RegisterApplication(appDescription, self))
5.  ......

Master也是RpcEndpoint的子類,所以可以通過receive方法接收DeployMessage類型的消息RegisterApplication。

Master.scala的源碼如下。

1.       override def receive: PartialFunction[Any, Unit] = {
2.    ......
3.  case RegisterApplication(description, driver) =>
4.     .......
5.         registerApplication(app)
6.       .......
7.         driver.send(RegisteredApplication(app.id, self))
8.    .......

ClientEndpoint最后在receive方法中得到來自Master注冊好Application的確認消息RegisteredApplication。

StandaloneAppClient.scala的源碼如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.     case RegisteredApplication(appId_, masterRef) =>
3.  ......
4.       appId.set(appId_)
5.       registered.set(true)
6.   ......

至此,Application向Master注冊完畢。在上面的RegisterApplication中,調用了schedule方法,這個方法將完成Application的調度,并在Worker節點上啟動分配好的Executor給Application使用。

4.4.4 Spark程序對計算資源Executor的管理

從TaskSchedulerImpl的submitTasks的方法中我們知道,Spark Standalone部署模式調用StandaloneSchedulerBackend的reviveOffers方法進行TaskSet所需要資源的分配,得到足夠的資源后,將TaskSet中的Task逐個發送到Executor去執行。下面來看這里的資源,即Executor是如何得到和分配的。

StandaloneSchedulerBackend的reviveOffers方法很簡單,就是發送一個ReviveOffers消息給內部類DriverEndpoint,代碼如下所示。

CoarseGrainedSchedulerBackend.scala的源碼如下。

1.  override def reviveOffers() {
2.    driverEndpoint.send(ReviveOffers)
3.  }

DriverEndpoint的receive方法處理ReviveOffers消息也很簡單,就是調用makeOffers方法。receive方法部分關鍵代碼如下所示。

CoarseGrainedSchedulerBackend.scala的源碼如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.  case ReviveOffers =>
4.          makeOffers()
5.  ......

DriverEndpoint的makeOffers方法首先過濾出Alive狀態的Executor放到activeExecutorsHahMap變量中,然后使用id、ExecutorData.ExecutorHost、ExecutorData.freeCores構建代表Executor可用資源的WorkerOffer。然后是最重要的兩個方法調用。先是調用TaskSchedulerImpl的resourceOffers得到TaskDescription的二維數組,包含Task ID、Executor ID、Task Index等Task執行需要的信息。然后回調DriverEndpoint的launchTask給每個Task對應的Executor發執行Task的LaunchTask消息(其實是由CourseGrainedExecutorBackend轉發LauchTask消息)。

TaskSchedulerImpl的resourceOffers方法返回二維數組TaskDescription后作為DriverEndpoint的launchTasks方法的參數,DriverEndpoint的launchTasks方法中首先對傳入的tasks進行扁平化操作(例如,將多維數組降維成一維數組),得到所有的Task,然后遍歷所有的Task。在遍歷過程中,調用serialize()方法對task進行序列化,得到serializedTask。判斷如果serializedTask大于等于Akka幀減去Akka預留空間大小,則調用TaskSetManager的abort方法終止該任務的執行,否則將LaunchTask(new SerializableBuffer(serializedTask))消息發送到CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先調用deserialized方法,反序列化出task,然后調用Executor的lauchTask方法執行Task的處理。

主站蜘蛛池模板: 淮南市| 扶绥县| 湘潭县| 留坝县| 辽宁省| 靖江市| 平舆县| 尼玛县| 综艺| 虹口区| 通榆县| 通许县| 惠州市| 上栗县| 杨浦区| 永康市| 峨山| 长武县| 绍兴市| 自贡市| 宁南县| 青川县| 海安县| 广西| 文昌市| 安福县| 米林县| 荣昌县| 宁津县| 北宁市| 兴安盟| 武功县| 庆元县| 肥乡县| 香格里拉县| 米易县| 林西县| 稻城县| 湖州市| 南漳县| 马尔康县|