- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 4134字
- 2019-12-12 17:29:56
4.5 打通Spark系統運行內幕機制循環流程
Spark通過DAGScheduler面向整個Job劃分出了不同的Stage,劃分Stage之后,Stage從后往前劃分,執行的時候從前往后執行,每個Stage內部有一系列的任務,Stage里面的任務是并行計算,并行任務的邏輯是完全相同的,但處理的數據不同。DAGScheduler以TaskSet的方式,把一個DAG構建的Stage中的所有任務提交給底層的調度器TaskScheduler。TaskScheduler是一個接口,與具體的任務解耦合,可以運行在不同的調度模式下,如可運行在Standalone模式,也可運行在Yarn上。
Spark基礎調度(圖4-6)包括RDD Objects、DAGScheduler、TaskScheduler、Worker等內容。

圖4-6 Spark基礎調度圖
DAGScheduler在提交TaskSet給底層調度器的時候是面向接口TaskScheduler的,這符合面向對象中依賴抽象而不依賴具體的原則,帶來底層資源調度器的可插拔性,導致Spark可以運行在眾多的資源調度器模式上,如Standalone、Yarn、Mesos、Local、EC2、其他自定義的資源調度器;在Standalone的模式下我們聚焦于TaskSchedulerImpl。
TaskScheduler是一個接口Trait,底層任務調度接口,由[org.apache.spark.scheduler. TaskSchedulerImpl]實現。這個接口允許插入不同的任務調度程序。每個任務調度器在單獨的SparkContext中調度任務。任務調度程序從每個Stage的DAGScheduler獲得提交的任務集,負責發送任務到集群運行,如果任務運行失敗,將重試,返回DAGScheduler事件。
Spark 2.1.1版本的TaskScheduler.scala的源碼如下。
1. private[spark] trait TaskScheduler { 2. 3. private val appId = "spark-application-" + System.currentTimeMillis 4. 5. def rootPool: Pool 6. 7. def schedulingMode: SchedulingMode 8. 9. def start(): Unit 10. 11. //成功初始化后調用(通常在Spark上下文中)。Yarn使用這個來引導基于優先位置的資源 //分配,等待從節點登記等 12. def postStartHook() { } 13. 14. //從群集斷開連接 15. def stop(): Unit 16. 17. //提交要運行的任務序列 18. def submitTasks(taskSet: TaskSet): Unit 19. 20. //取消Stage 21. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit 22. 23. //系統為upcalls設置DAG調度,這是保證在submitTasks被調用前被設置 24. def setDAGScheduler(dagScheduler: DAGScheduler): Unit 25. 26. //獲取集群中使用的默認并行級別,作為對作業的提示 27. def defaultParallelism(): Int 28. 29. /** * 更新正運行任務,讓master 知道BlockManager 仍活著。如果driver 知道給定的 * 塊管理器,則返回true;否則,返回false,指示塊管理器應重新注冊 30. */ 31. 32. def executorHeartbeatReceived( 33. execId: String, 34. accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], 35. blockManagerId: BlockManagerId): Boolean 36. 37. /** 38. *獲取與作業相關聯的應用程序ID 39. * @return An application ID 40. */ 41. def applicationId(): String = appId 42. 43. /** *處理丟失的 executor 44. */ 45. def executorLost(executorId: String, reason: ExecutorLossReason): Unit 46. 47. /** 48. *獲取與作業相關聯的應用程序的嘗試ID 49. * 50. * @return應用程序的嘗試ID 51. */ 52. def applicationAttemptId(): Option[String] 53. 54. }
Spark 2.2.0版本的TaskScheduler.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第21行之后新增加了killTaskAttempt方法。
1. ...... 2. /** 3. *殺死任務嘗試 4. * 5. * @return任務是否成功被殺死 6. */ 7. def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean 8. .......
DAGScheduler把TaskSet交給底層的接口TaskScheduler,具體實現時有不同的方法。TaskScheduler主要由TaskSchedulerImpl實現。
TaskSchedulerImpl也有自己的子類YarnScheduler。
1. private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { 2. 3. //RackResolver 記錄INFO日志信息時,解析rack的信息 4. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { 5. Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) 6. } 7. 8. //默認情況下,rack是未知的 9. override def getRackForHost(hostPort: String): Option[String] = { 10. val host = Utils.parseHostPort(hostPort)._1 11. Option(RackResolver.resolve(sc.hadoopConfiguration, host). getNetworkLocation) 12. } 13. }
YarnScheduler的子類YarnClusterScheduler實現如下。
1. private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { 2. logInfo("Created YarnClusterScheduler") 3. 4. override def postStartHook() { 5. ApplicationMaster.sparkContextInitialized(sc) 6. super.postStartHook() 7. logInfo("YarnClusterScheduler.postStartHook done") 8. } 9. 10. }
默認情況下,我們研究Standalone的模式,所以主要研究TaskSchedulerImpl。DAGScheduler把TaskSet交給TaskScheduler,TaskScheduler中通過TastSetManager管理具體的任務。TaskScheduler的核心任務是提交TaskSet到集群運算,并匯報結果。
為TaskSet創建和維護一個TaskSetManager,并追蹤任務的本地性以及錯誤信息。
遇到延后的Straggle任務,會放到其他節點重試。
向DAGScheduler匯報執行情況,包括在Shuffle輸出lost的時候報告fetch failed錯誤等信息。
TaskSet是一個普通的類,第一個成員是tasks,tasks是一個數組。TaskSet的源碼如下。
1. private[spark] class TaskSet( 2. val tasks: Array[Task[_]], 3. val stageId: Int, 4. val stageAttemptId: Int, 5. val priority: Int, 6. val properties: Properties) { 7. val id: String = stageId + "." + stageAttemptId 8. 9. override def toString: String = "TaskSet " + id 10. }
TaskScheduler內部有SchedulerBackend,SchedulerBackend管理Executor資源。從Standalone的模式來講,具體實現是StandaloneSchedulerBackend(Spark 2.0版本將之前的AppClient名字更新為StandaloneAppClient)。
SchedulerBackend本身是一個接口,是一個trait。SchedulerBackend的源碼如下。
1. private[spark] trait SchedulerBackend { 2. private val appId = "spark-application-" + System.currentTimeMillis 3. 4. def start(): Unit 5. def stop(): Unit 6. def reviveOffers(): Unit 7. def defaultParallelism(): Int 8. 9. def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = 10. throw new UnsupportedOperationException 11. def isReady(): Boolean = true 12. 13. /** 14. *獲取與作業關聯的應用ID 15. * 16. * @return 應用程序 ID 17. */ 18. def applicationId(): String = appId 19. 20. /** 21. *如果集群管理器支持多個嘗試,則獲取此運行的嘗試ID,應用程序運行在客戶端模式將沒 *有嘗試ID 22. * 23. * @return如果可用,返回應用程序嘗試ID 24. */ 25. def applicationAttemptId(): Option[String] = None 26. 27. /** 28. *得到driver 日志的URL。這些URL是用來在用戶界面中顯示鏈接driver的Executors *選項卡 29. * 30. * @return Map 包含日志名稱和URLs 31. */ 32. def getDriverLogUrls: Option[Map[String, String]] = None 33. 34. }
StandaloneSchedulerBackend:專門負責收集Worker的資源信息。接收Worker向Driver注冊的信息,ExecutorBackend啟動的時候進行注冊,為當前應用程序準備計算資源,以進程為單位。
StandaloneSchedulerBackend的源碼如下。
1. private[spark] class StandaloneSchedulerBackend( 2. scheduler: TaskSchedulerImpl, 3. sc: SparkContext, 4. masters: Array[String]) 5. extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) 6. with StandaloneAppClientListener 7. with Logging { 8. private var client: StandaloneAppClient = null 9. ......
StandaloneSchedulerBackend里有一個Client: StandaloneAppClient。
1. private[spark] class StandaloneAppClient( 2. rpcEnv: RpcEnv, 3. masterUrls: Array[String], 4. appDescription: ApplicationDescription, 5. listener: StandaloneAppClientListener, 6. conf: SparkConf) 7. extends Logging {
StandaloneAppClient允許應用程序與Spark standalone集群管理器通信。獲取Master的URL、應用程序描述和集群事件監聽器,當各種事件發生時可以回調監聽器。masterUrls的格式為spark://host:port,StandaloneAppClient需要向Master注冊。
StandaloneAppClient在StandaloneSchedulerBackend.scala的start方法啟動時進行賦值,用new()函數創建一個StandaloneAppClient。
Spark 2.1.1版本的StandaloneSchedulerBackend.scala的源碼如下。
1. private[spark] class StandaloneSchedulerBackend( 2. ...... 3. 4. override def start() { 5. ...... 6. val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 7. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) 8. client.start() 9. launcherBackend.setState(SparkAppHandle.State.SUBMITTED) 10. waitForRegistration() 11. launcherBackend.setState(SparkAppHandle.State.RUNNING) 12. }
Spark 2.2.0版本的StandaloneSchedulerBackend.scala的源碼與Spark 2.1.1版本相比具有如下特點:上段代碼中第6行ApplicationDescription傳入的第5個參數appUIAddress更改為webUrl。
1. ...... 2. val appDesc = ApplicationDescription(sc.appName, maxCores, sc. executorMemory, command, webUrl, sc.eventLogDir,sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 3. .....
StandaloneAppClient.scala中,里面有一個類是ClientEndpoint,核心工作是在啟動時向Master注冊。StandaloneAppClient的start方法啟動時,就調用new函數創建一個ClientEndpoint。
StandaloneAppClient的源碼如下。
1. private[spark] class StandaloneAppClient( 2. ...... 3. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 4. with Logging { 5. ...... 6. def start() { 7. //啟動 rpcEndpoint; 將回調listener 8. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 9. }
StandaloneSchedulerBackend在啟動時構建StandaloneAppClient實例,并在StandaloneAppClient實例start時啟動了ClientEndpoint消息循環體。ClientEndpoint在啟動時會向Master注冊當前程序。
StandaloneAppClient中ClientEndpoint類的onStart()方法如下。
1. override def onStart(): Unit = { 2. try { 3. registerWithMaster(1) 4. } catch { 5. case e: Exception => 6. logWarning("Failed to connect to master", e) 7. markDisconnected() 8. stop() 9. } 10. }
這是StandaloneSchedulerBackend的第一個注冊的核心功能。StandaloneSchedulerBackend繼承自CoarseGrainedSchedulerBackend。而CoarseGrainedSchedulerBackend在啟動時就創建DriverEndpoint,從實例的角度講,DriverEndpoint也屬于StandaloneSchedulerBackend實例。
1. private[spark] 2. class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) 3. extends ExecutorAllocationClient with SchedulerBackend with Logging 4. { 5. ...... 6. class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) 7. extends ThreadSafeRpcEndpoint with Logging { 8. ......
StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型為DriverEndpoint(這就是我們程序運行時的經典對象Driver)的消息循環體。StandaloneSchedulerBackend在運行時向Master注冊申請資源,當Worker的ExecutorBackend啟動時會發送RegisteredExecutor信息向DriverEndpoint注冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是通過StandaloneSchedulerBackend擁有的計算資源來具體運行Task的;StandaloneSchedulerBackend不是應用程序的總管,應用程序的總管是DAGScheduler、TaskScheduler,StandaloneSchedulerBackend向應用程序的Task分配具體的計算資源,并把Task發送到集群中。
SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在應用程序啟動時只實例化一次,應用程序存在期間始終存在這些對象。
這里基于Spark 2.2版本講解:
Spark調度器三大核心資源:SparkContext、DAGScheduler、TaskSchedulerImpl,TaskSchedulerImpl作為具體的底層調度器,運行時需要計算資源,因此需要StandaloneSchedulerBackend。StandaloneSchedulerBackend設計巧妙的地方是啟動時啟動StandaloneAppClient,而StandaloneAppClient在start時有一個ClientEndpoint的消息循環體,ClientEndpoint的消息循環體啟動的時候向Master注冊應用程序。
StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start啟動的時候會實例化DriverEndpoint,所有的ExecutorBackend啟動的時候都要向DriverEndpoint注冊,注冊最后落到了StandaloneSchedulerBackend的內存數據結構中,表面上看是在CoarseGrainedSchedulerBackend,但是實例化的時候是StandaloneSchedulerBackend,注冊給父類的成員其實就是子類的成員。
作為前提問題:TaskScheduler、StandaloneSchedulerBackend是如何啟動的?TaskSchedulerImpl是什么時候實例化的?
TaskSchedulerImpl是在SparkContext中實例化的。在SparkContext類實例化的時候,只要不是方法體里面的內容,都會被執行,(sched, ts)是SparkContext的成員,將調用createTaskScheduler方法。調用createTaskScheduler方法返回一個Tuple,包括兩個元素:sched是我們的schedulerBackend;ts是taskScheduler。
1. class SparkContext(config: SparkConf) extends Logging { 2. ...... 3. //創建啟動調度器 scheduler 4. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 5. _schedulerBackend = sched 6. _taskScheduler = ts 7. _dagScheduler = new DAGScheduler(this) 8. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
createTaskScheduler里有很多運行模式,這里關注Standalone模式,首先調用new()函數創建一個TaskSchedulerImpl,TaskSchedulerImpl和SparkContext是一一對應的,整個程序運行的時候只有一個TaskSchedulerImpl,也只有一個SparkContext;接著實例化StandaloneSchedulerBackend,整個程序運行的時候只有一個StandaloneSchedulerBackend。createTaskScheduler方法如下。
1. private def createTaskScheduler( 2. sc: SparkContext, 3. master: String, 4. deployMode: String): (SchedulerBackend, TaskScheduler) = { 5. import SparkMasterRegex._ 6. ...... 7. master match { 8. ...... 9. case SPARK_REGEX(sparkUrl) => 10. val scheduler = new TaskSchedulerImpl(sc) 11. val masterUrls = sparkUrl.split(",").map("spark://" + _) 12. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 13. scheduler.initialize(backend) 14. (backend, scheduler) 15. ......
在SparkContext實例化的時候通過createTaskScheduler來創建TaskSchedulerImpl和StandaloneSchedulerBackend。然后在createTaskScheduler中調用scheduler.initialize(backend)。
initialize的方法參數把StandaloneSchedulerBackend傳進來,schedulingMode模式匹配有兩種方式:FIFO、FAIR。
initialize的方法中調用schedulableBuilder.buildPools()。buildPools方法根據FIFOSchedulableBuilder、FairSchedulableBuilder不同的模式重載方法實現。
1. private[spark] trait SchedulableBuilder { 2. def rootPool: Pool 3. 4. def buildPools(): Unit 5. 6. def addTaskSetManager(manager: Schedulable, properties: Properties): Unit 7. }
initialize的方法把StandaloneSchedulerBackend傳進來了,但還沒有啟動StandaloneSchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend傳進來,從而賦值為TaskSchedulerImpl的backend;在TaskSchedulerImpl調用start方法時會調用backend.start方法,在start方法中會最終注冊應用程序。
下面來看SparkContext.scala的taskScheduler的啟動。
1. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 2. _schedulerBackend = sched 3. _taskScheduler = ts 4. _dagScheduler = new DAGScheduler(this) 5. ...... 6. _taskScheduler.start() 7. _applicationId = _taskScheduler.applicationId() 8. _applicationAttemptId = taskScheduler.applicationAttemptId() 9. _conf.set("spark.app.id", _applicationId) 10. ......
其中調用了_taskScheduler的start方法。
1. private[spark] trait TaskScheduler { 2. ...... 3. 4. def start(): Unit 5. .....
TaskScheduler的start()方法沒有具體實現。TaskScheduler子類的TaskSchedulerImpl的start()方法的源碼如下。
1. override def start() { 2. backend.start() 3. 4. if (!isLocal && conf.getBoolean("spark.speculation", false)) { 5. logInfo("Starting speculative execution thread") 6. speculationScheduler.scheduleAtFixedRate(new Runnable { 7. override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 8. checkSpeculatableTasks() 9. } 10. }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit. MILLISECONDS) 11. } 12. }
TaskSchedulerImpl的start通過backend.start啟動了StandaloneSchedulerBackend的start方法。
StandaloneSchedulerBackend的start方法中,將command封裝注冊給Master,Master轉過來要Worker啟動具體的Executor。command已經封裝好指令,Executor具體要啟動進程入口類CoarseGrainedExecutorBackend。然后調用new()函數創建一個StandaloneAppClient,通過client.start啟動client。
StandaloneAppClient的start方法中調用new()函數創建一個ClientEndpoint:
1. def start() { 2. //啟動 rpcEndpoint; 將回調listener 3. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 4. }
ClientEndpoint的源碼如下。
1. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 2. with Logging { 3. ...... 4. override def onStart(): Unit = { 5. try { 6. registerWithMaster(1) 7. } catch { 8. case e: Exception => 9. logWarning("Failed to connect to master", e) 10. markDisconnected() 11. stop() 12. } 13. }
ClientEndpoint是一個ThreadSafeRpcEndpoint。ClientEndpoint的onStart方法中調用registerWithMaster(1)進行注冊,向Master注冊程序。registerWithMaster方法如下。
1. private def registerWithMaster(nthRetry: Int) { 2. registerMasterFutures.set(tryRegisterAllMasters()) 3. registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable { 4. override def run(): Unit = { 5. if (registered.get) { 6. registerMasterFutures.get.foreach(_.cancel(true)) 7. registerMasterThreadPool.shutdownNow() 8. } else if (nthRetry >= REGISTRATION_RETRIES) { 9. markDead("All masters are unresponsive! Giving up.") 10. } else { 11. registerMasterFutures.get.foreach(_.cancel(true)) 12. registerWithMaster(nthRetry + 1) 13. } 14. } 15. }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) 16. }
程序注冊后,Master通過schedule分配資源,通知Worker啟動Executor,Executor啟動的進程是CoarseGrainedExecutorBackend,Executor啟動后又轉過來向Driver注冊,Driver其實是StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend的一個消息循環體DriverEndpoint。
總結:
在SparkContext實例化的時候調用createTaskScheduler來創建TaskSchedulerImpl和StandaloneSchedulerBackend,同時在SparkContext實例化的時候會調用TaskSchedulerImpl的start,在start方法中會調用StandaloneSchedulerBackend的start,在該start方法中會創建StandaloneAppClient對象,并調用StandaloneAppClient對象的start方法,在該start方法中會創建ClientEndpoint,創建ClientEndpoint時會傳入Command來指定具體為當前應用程序啟動的Executor的入口類的名稱為CoarseGrainedExecutorBackend,然后ClientEndpoint啟動并通過tryRegisterMaster來注冊當前的應用程序到Master中,Master接收到注冊信息后如果可以運行程序,為該程序生產Job ID并通過schedule來分配計算資源,具體計算資源的分配是通過應用程序的運行方式、Memory、cores等配置信息決定的。最后,Master會發送指令給Worker,Worker為當前應用程序分配計算資源時會首先分配ExecutorRunner。ExecutorRunner內部會通過Thread的方式構建ProcessBuilder來啟動另外一個JVM進程,這個JVM進程啟動時加載的main方法所在的類的名稱就是在創建ClientEndpoint時傳入的Command來指定具體名稱為CoarseGrainedExecutorBackend的類,此時JVM在通過ProcessBuilder啟動時獲得了CoarseGrainedExecutorBackend后加載并調用其中的main方法,在main方法中會實例化CoarseGrainedExecutorBackend本身這個消息循環體,而CoarseGrainedExecutorBackend在實例化時會通過回調onStart向DriverEndpoint發送RegisterExecutor來注冊當前的CoarseGrainedExecutorBackend,此時DriverEndpoint收到該注冊信息并保存在StandaloneSchedulerBackend實例的內存數據結構中,這樣Driver就獲得了計算資源。
CoarseGrainedExecutorBackend.scala的main方法如下。
1. def main(args: Array[String]) { 2. var driverUrl: String = null 3. var executorId: String = null 4. var hostname: String = null 5. var cores: Int = 0 6. var appId: String = null 7. var workerUrl: Option[String] = None 8. val userClassPath = new mutable.ListBuffer[URL]() 9. 10. var argv = args.toList 11. ...... 12. run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) 13. System.exit(0) 14. }
CoarseGrainedExecutorBackend的main然后開始調用run方法。
1. private def run( 2. driverUrl: String, 3. executorId: String, 4. hostname: String, 5. cores: Int, 6. appId: String, 7. workerUrl: Option[String], 8. userClassPath: Seq[URL]) { 9. ...... 10. env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( 11. env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) 12. ......
在CoarseGrainedExecutorBackend的main方法中,通過env.rpcEnv.setupEndpoint ("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))構建了CoarseGrainedExecutorBackend實例本身。
- 基于LabWindows/CVI的虛擬儀器設計與應用
- Learning Social Media Analytics with R
- Windows程序設計與架構
- iClone 4.31 3D Animation Beginner's Guide
- 數據挖掘方法及天體光譜挖掘技術
- 學會VBA,菜鳥也高飛!
- 走近大數據
- Pentaho Analytics for MongoDB
- IBM? SmartCloud? Essentials
- 智慧未來
- Getting Started with Tableau 2018.x
- ARM嵌入式系統開發完全入門與主流實踐
- Learn SOLIDWORKS 2020
- 巧學活用WPS
- 樂高機器人:Scratch與WeDo編程基礎實戰應用