- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 1298字
- 2019-12-12 17:29:56
4.4 SchedulerBackend解析
本節講解SchedulerBackend原理剖析、SchedulerBackend源碼解析、Spark程序的注冊機制、Spark程序對計算資源Executor的管理等內容。
4.4.1 SchedulerBackend原理剖析
以Spark Standalone部署方式為例,StandaloneSchedulerBackend在啟動的時候構造了StandaloneAppClient實例,并在該實例start的時候啟動了ClientEndpoint消息循環體,ClientEndpoint在啟動的時候會向Master注冊當前程序。而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型為DriverEndPoint(這就是程序運行時的經典的對象Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息,當ExecutorBackend啟動的時候,會發送RegisteredExecutor信息向DriverEndpoint注冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是通過StandaloneSchedulerBackend擁有的計算資源來具體運行Task的。
4.4.2 SchedulerBackend源碼解析
StandaloneSchedulerBackend收集和分配資源給調度的Task使用。
StandaloneSchedulerBackend.scala的源碼如下。
1. private[spark] class StandaloneSchedulerBackend( 2. ...... 3. override def start() { 4. ...... 5. val command = Command("org.apache.spark.executor. CoarseGrainedExecutorBackend", 6. args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) 7. ...... 8. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) 9. client.start() 10. ......
在StandaloneAppClient的start方法中調用new()函數創建一個ClientEndpoint,將在ClientEndpoint中向Master注冊。
StandaloneAppClient.scala的源碼如下。
1. ...... 2. def start() { 3. //啟動 rpcEndpoint; 將直接回調到listener 4. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 5. }
4.4.3 Spark程序的注冊機制
在上面的源碼分析中,StandaloneAppClient在啟動的時候創建了StandaloneAppClient內部類ClientEndpoint的實例對象作為消息循環體,以便向Master注冊當前的Application。既然ClientEndpoint是RpcEndpoint的子類,那么就會有這樣的生命周期:constructor -> onStart -> receive ->onStop。根據這個原理,我們來看ClientEndpoint的onStart方法代碼。
StandaloneAppClient.scala的源碼如下。
1. override def onStart(): Unit = { 2. try { 3. registerWithMaster(1) 4. } catch { 5. .......
ClientEndpoint在啟動時就立即調用registerWithMaster來注冊Application,繼續查看registerWithMaster方法代碼。
StandaloneAppClient.scala的源碼如下。
1. private def registerWithMaster(nthRetry: Int) { 2. //向所有Master異步地嘗試注冊Application 3. registerMasterFutures.set(tryRegisterAllMasters()) 4. ...... 5. }
ClientEndpoint在tryRegiesterAllMasters方法中會向所有的Master嘗試注冊Application。向Master發送RegisterApplication消息。
StandaloneAppClient.scala的源碼如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master. ENDPOINT_NAME) 4. masterRef.send(RegisterApplication(appDescription, self)) 5. ......
Master也是RpcEndpoint的子類,所以可以通過receive方法接收DeployMessage類型的消息RegisterApplication。
Master.scala的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case RegisterApplication(description, driver) => 4. ....... 5. registerApplication(app) 6. ....... 7. driver.send(RegisteredApplication(app.id, self)) 8. .......
ClientEndpoint最后在receive方法中得到來自Master注冊好Application的確認消息RegisteredApplication。
StandaloneAppClient.scala的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredApplication(appId_, masterRef) => 3. ...... 4. appId.set(appId_) 5. registered.set(true) 6. ......
至此,Application向Master注冊完畢。在上面的RegisterApplication中,調用了schedule方法,這個方法將完成Application的調度,并在Worker節點上啟動分配好的Executor給Application使用。
4.4.4 Spark程序對計算資源Executor的管理
從TaskSchedulerImpl的submitTasks的方法中我們知道,Spark Standalone部署模式調用StandaloneSchedulerBackend的reviveOffers方法進行TaskSet所需要資源的分配,得到足夠的資源后,將TaskSet中的Task逐個發送到Executor去執行。下面來看這里的資源,即Executor是如何得到和分配的。
StandaloneSchedulerBackend的reviveOffers方法很簡單,就是發送一個ReviveOffers消息給內部類DriverEndpoint,代碼如下所示。
CoarseGrainedSchedulerBackend.scala的源碼如下。
1. override def reviveOffers() { 2. driverEndpoint.send(ReviveOffers) 3. }
DriverEndpoint的receive方法處理ReviveOffers消息也很簡單,就是調用makeOffers方法。receive方法部分關鍵代碼如下所示。
CoarseGrainedSchedulerBackend.scala的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case ReviveOffers => 4. makeOffers() 5. ......
DriverEndpoint的makeOffers方法首先過濾出Alive狀態的Executor放到activeExecutorsHahMap變量中,然后使用id、ExecutorData.ExecutorHost、ExecutorData.freeCores構建代表Executor可用資源的WorkerOffer。然后是最重要的兩個方法調用。先是調用TaskSchedulerImpl的resourceOffers得到TaskDescription的二維數組,包含Task ID、Executor ID、Task Index等Task執行需要的信息。然后回調DriverEndpoint的launchTask給每個Task對應的Executor發執行Task的LaunchTask消息(其實是由CourseGrainedExecutorBackend轉發LauchTask消息)。
TaskSchedulerImpl的resourceOffers方法返回二維數組TaskDescription后作為DriverEndpoint的launchTasks方法的參數,DriverEndpoint的launchTasks方法中首先對傳入的tasks進行扁平化操作(例如,將多維數組降維成一維數組),得到所有的Task,然后遍歷所有的Task。在遍歷過程中,調用serialize()方法對task進行序列化,得到serializedTask。判斷如果serializedTask大于等于Akka幀減去Akka預留空間大小,則調用TaskSetManager的abort方法終止該任務的執行,否則將LaunchTask(new SerializableBuffer(serializedTask))消息發送到CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先調用deserialized方法,反序列化出task,然后調用Executor的lauchTask方法執行Task的處理。
- Hands-On Deep Learning with Apache Spark
- Visualforce Development Cookbook(Second Edition)
- Circos Data Visualization How-to
- 7天精通Dreamweaver CS5網頁設計與制作
- 微型計算機控制技術
- 中國戰略性新興產業研究與發展·智能制造
- 計算機網絡安全
- 21天學通C語言
- Mastering Ansible(Second Edition)
- 電腦故障排除與維護終極技巧金典
- 簡明學中文版Flash動畫制作
- Hands-On Business Intelligence with Qlik Sense
- Practical AWS Networking
- 單片機與微機原理及應用
- Qt中的C++技術