- Spark大數(shù)據(jù)商業(yè)實(shí)戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)
- 王家林
- 7503字
- 2019-12-12 17:29:57
5.1 Master啟動(dòng)原理和源碼詳解
本節(jié)講解Master啟動(dòng)的原理和源碼;Master HA雙機(jī)切換;Master的注冊(cè)機(jī)制和狀態(tài)管理解密等內(nèi)容。
5.1.1 Master啟動(dòng)的原理詳解
Spark應(yīng)用程序作為獨(dú)立的集群進(jìn)程運(yùn)行,由主程序中的SparkContext對(duì)象(稱為驅(qū)動(dòng)程序)協(xié)調(diào)。Spark集群部署組件圖5-1所示。

圖5-1 Spark集群部署組件圖
其中各個(gè)術(shù)語(yǔ)及相關(guān)術(shù)語(yǔ)的描述如下。
(1)Driver Program:運(yùn)行Application的main函數(shù)并新建SparkContext實(shí)例的程序,稱為驅(qū)動(dòng)程序(Driver Program)。通常可以使用SparkContext代表驅(qū)動(dòng)程序。
(2)Cluster Manager:集群管理器(Cluster Manager)是集群資源管理的外部服務(wù)。Spark上現(xiàn)在主要有Standalone、YARN、Mesos 3種集群資源管理器。Spark自帶的Standalone模式能夠滿足絕大部分純粹的Spark計(jì)算環(huán)境中對(duì)集群資源管理的需求,基本上只有在集群中運(yùn)行多套計(jì)算框架的時(shí)候才建議考慮YARN和Mesos。
(3)Worker Node:集群中可以運(yùn)行Application代碼的工作節(jié)點(diǎn)(Worker Node),相當(dāng)于Hadoop的Slave節(jié)點(diǎn)。
(4)Executor:在Worker Node上為Application啟動(dòng)的一個(gè)工作進(jìn)程,在進(jìn)程中負(fù)責(zé)任務(wù)(Task)的運(yùn)行,并且負(fù)責(zé)將數(shù)據(jù)存放在內(nèi)存或磁盤上,在Executor內(nèi)部通過(guò)多線程的方式(即線程池)并發(fā)處理應(yīng)用程序的具體任務(wù)。
每個(gè)Application都有各自獨(dú)立的Executors,因此應(yīng)用程序之間是相互隔離的。
(5)Task:任務(wù)(Task)是指被Driver送到Executor上的工作單元。通常,一個(gè)任務(wù)會(huì)處理一個(gè)Partition的數(shù)據(jù),每個(gè)Partition一般是一個(gè)HDFS的Block塊的大小。
(6)Application:是創(chuàng)建了SparkContext實(shí)例對(duì)象的Spark用戶程序,包含了一個(gè)Driver program和集群中多個(gè)Worker上的Executor。
(7)Job:和Spark的action對(duì)應(yīng),每個(gè)action,如count、savaAsTextFile等都會(huì)對(duì)應(yīng)一個(gè)Job實(shí)例,每個(gè)Job會(huì)拆分成多個(gè)Stages,一個(gè)Stage中包含一個(gè)任務(wù)集(TaskSet),任務(wù)集中的各個(gè)任務(wù)通過(guò)一定的調(diào)度機(jī)制發(fā)送到工作單位(Executor)上并行執(zhí)行。
Spark Standalone集群的部署采用典型的Master/Slave架構(gòu)。其中,Master節(jié)點(diǎn)負(fù)責(zé)整個(gè)集群的資源管理與調(diào)度,Worker節(jié)點(diǎn)(也可以稱Slave節(jié)點(diǎn))在Master節(jié)點(diǎn)的調(diào)度下啟動(dòng)Executor,負(fù)責(zé)執(zhí)行具體工作(包括應(yīng)用程序以及應(yīng)用程序提交的任務(wù))。
5.1.2 Master啟動(dòng)的源碼詳解
Spark中各個(gè)組件是通過(guò)腳本來(lái)啟動(dòng)部署的。下面以腳本為入口點(diǎn)開始分析Master的部署。每個(gè)組件對(duì)應(yīng)提供了啟動(dòng)的腳本,同時(shí)也會(huì)提供停止的腳本。停止腳本比較簡(jiǎn)單,在此僅分析啟動(dòng)腳本。
1.Master部署的啟動(dòng)腳本解析
首先看一下Master的啟動(dòng)腳本./sbin/start-master.sh,內(nèi)容如下。
1. # 在腳本的執(zhí)行節(jié)點(diǎn)啟動(dòng)Master組件 2. 3. #如果沒有設(shè)置環(huán)境變量SPARK_HOME,會(huì)根據(jù)腳本所在位置自動(dòng)設(shè)置 4. if [ -z "${SPARK_HOME}" ]; then 5. export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" 6. fi 7. 8. #注:提取的類名必須和SparkSubmit的類相匹配。任何變化都需在類中進(jìn)行反映 9. 10. # Master 組件對(duì)應(yīng)的類 11. CLASS="org.apache.spark.deploy.master.Master" 12. 13. #腳本的幫助信息 14. 15. if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then 16. echo "Usage: ./sbin/start-master.sh [options]" 17. pattern="Usage:" 18. pattern+="\|Using Spark's default log4j profile:" 19. pattern+="\|Registered signal handlers for" 20. 21. # 通過(guò)腳本spark-class執(zhí)行指定的Master類,參數(shù)為--help 22. "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 23. exit 1 24. fi 25. 26. ORIGINAL_ARGS="$@" 27. 28. #控制啟動(dòng)Master時(shí),是否同時(shí)啟動(dòng) Tachyon的Master組件 29. START_TACHYON=false 30. 31. while (( "$#" )); do 32. case $1 in 33. --with-tachyon) 34. if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then 35. echo "Error: --with-tachyon specified, but tachyon not found." 36. exit -1 37. fi 38. START_TACHYON=true 39. ;; 40. esac 41. shift 42. done 43. 44. . "${SPARK_HOME}/sbin/spark-config.sh" 45. 46. . "${SPARK_HOME}/bin/load-spark-env.sh" 47. 48. #下面的一些參數(shù)對(duì)應(yīng)的默認(rèn)配置屬性 49. if [ "$SPARK_MASTER_PORT" = "" ]; then 50. SPARK_MASTER_PORT=7077 51. fi 52. 53. //用于MasterURL,所以當(dāng)沒有設(shè)置時(shí),默認(rèn)使用hostname,而不是IP地址 54. //該MasterURL在Worker注冊(cè)或應(yīng)用程序提交時(shí)使用 55. if [ "$SPARK_MASTER_IP" = "" ]; then 56. SPARK_MASTER_IP=`hostname` 57. fi 58. 59. if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then 60. SPARK_MASTER_WEBUI_PORT=8080 61. fi 62. 63. #通過(guò)啟動(dòng)后臺(tái)進(jìn)程的腳本spark-daemon.sh來(lái)啟動(dòng)Master組件 64. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ 65. --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_ MASTER_WEBUI_PORT \ 66. $ORIGINAL_ARGS 67. 68. #需要時(shí)同時(shí)啟動(dòng)Tachyon,此時(shí)Tachyon是編譯在Spark內(nèi)的 69. if [ "$START_TACHYON" == "true" ]; then 70. "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP 71. "${SPARK_HOME}"/tachyon/bin/tachyon format -s 72. "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master 73. fi
通過(guò)腳本的簡(jiǎn)單分析,可以看出Master組件是以后臺(tái)守護(hù)進(jìn)程的方式啟動(dòng)的,對(duì)應(yīng)的后臺(tái)守護(hù)進(jìn)程的啟動(dòng)腳本spark-daemon.sh,在后臺(tái)守護(hù)進(jìn)程的啟動(dòng)腳本spark-daemon.sh內(nèi)部,通過(guò)腳本spark-class啟動(dòng)一個(gè)指定主類的JVM進(jìn)程,相關(guān)代碼如下所示。
1. case "$mode" in 2. #這里對(duì)應(yīng)的是啟動(dòng)一個(gè)Spark類 3. (class) 4. nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null & 5. newpid="$!" 6. ;; 7. #這里對(duì)應(yīng)提交一個(gè)Spark 應(yīng)用程序 8. (submit) 9. nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null & 10. newpid="$!" 11. ;; 12. 13. (*) 14. echo "unknown mode: $mode" 15. exit 1 16. ;;
通過(guò)腳本的分析,可以知道最終執(zhí)行的是Master類(對(duì)應(yīng)的代碼為前面的CLASS="org.apache.spark.deploy.master.Master"),對(duì)應(yīng)的入口點(diǎn)是Master伴生對(duì)象中的main方法。下面以該方法作為入口點(diǎn)進(jìn)一步解析Master部署框架。
部署Master組件時(shí),最簡(jiǎn)單的方式是直接啟動(dòng)腳本,不帶任何選項(xiàng)參數(shù),命令如下所示。
1. ./sbin/start-master.sh
如需設(shè)置選項(xiàng)參數(shù),可以查看幫助信息,根據(jù)自己的需要進(jìn)行設(shè)置。
2.Master的源碼解析
首先查看Master伴生對(duì)象中的main方法。
Master.scala的源碼如下。
1. def main(argStrings: Array[String]) { 2. Utils.initDaemon(log) 3. val conf = new SparkConf 4. //構(gòu)建參數(shù)解析的實(shí)例 5. val args = new MasterArguments(argStrings, conf) 6. //啟動(dòng)RPC通信環(huán)境以及Master的RPC通信終端 7. val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) 8. rpcEnv.awaitTermination() 9. }
和其他類(如SparkSubmit)一樣,Master類的入口點(diǎn)處也包含了對(duì)應(yīng)的參數(shù)類MasterArguments。MasterArguments類包括Spark屬性配置相關(guān)的一些解析。
MasterArguments.scala的源碼如下。
1. private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging { 2. var host = Utils.localHostName() 3. var port = 7077 4. var webUiPort = 8080 5. var propertiesFile: String = null 6. 7. //讀取啟動(dòng)腳本中設(shè)置的環(huán)境變量 8. if (System.getenv("SPARK_MASTER_IP") != null) { 9. logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_ HOST") 10. host = System.getenv("SPARK_MASTER_IP") 11. } 12. 13. if (System.getenv("SPARK_MASTER_HOST") != null) { 14. host = System.getenv("SPARK_MASTER_HOST") 15. } 16. if (System.getenv("SPARK_MASTER_PORT") != null) { 17. port = System.getenv("SPARK_MASTER_PORT").toInt 18. } 19. if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { 20. webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt 21. } 22. //命令行選項(xiàng)參數(shù)的解析 23. parse(args.toList) 24. 25. //加載SparkConf文件,所有的訪問(wèn)必須經(jīng)過(guò)此行 26. propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) 27. 28. if (conf.contains("spark.master.ui.port")) { 29. webUiPort = conf.get("spark.master.ui.port").toInt 30. } 31. ......
MasterArguments中的printUsageAndExit方法對(duì)應(yīng)的就是命令行中的幫助信息。
MasterArguments.scala的源碼如下。
1. private def printUsageAndExit(exitCode: Int) { 2. //scalastyle:off println 3. System.err.println( 4. "Usage: Master [options]\n" + 5. "\n" + 6. "Options:\n" + 7. " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" + 8. " -h HOST, --host HOST Hostname to listen on\n" + 9. " -p PORT, --port PORT Port to listen on (default: 7077)\n" + 10. " --webui-port PORT Port for web UI (default: 8080)\n" + 11. " --properties-file FILE Path to a custom Spark properties file.\n" + 12. " Default is conf/spark-defaults.conf.") 13. //scalastyle:on println 14. System.exit(exitCode) 15. }
解析完Master的參數(shù)后,調(diào)用startRpcEnvAndEndpoin方法啟動(dòng)RPC通信環(huán)境以及Master的RPC通信終端。
Spark 2.1.1版本的Master.scala的startRpcEnvAndEndpoint的源碼如下。
1. 2. /** 3. * 啟動(dòng)Master并返回一個(gè)三元組: 4. * (1) The Master RpcEnv 5. * (2) The web UI bound port 6. * (3) The REST server bound port, if any 7. */ 8. 9. def startRpcEnvAndEndpoint( 10. host: String, 11. port: Int, 12. webUiPort: Int, 13. conf: SparkConf): (RpcEnv, Int, Option[Int]) = { 14. val securityMgr = new SecurityManager(conf) 15. //構(gòu)建RPC通信環(huán)境 16. val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) 17. //構(gòu)建RPC通信終端,實(shí)例化Master 18. val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, 19. new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) 20. 21. //向Master的通信終端發(fā)送請(qǐng)求,獲取綁定的端口號(hào) 22. //包含Master的Web UI監(jiān)聽端口號(hào)和REST的監(jiān)聽端口號(hào) 23. 24. val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest) 25. (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) 26. } 27. }
Spark 2.2.0版本的Master.scala的startRpcEnvAndEndpoint的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第24行MasterEndpoint的askWithRetry方法調(diào)整為askSync方法。
1. ...... 2. val portsResponse = masterEndpoint.askSync[BoundPortsResponse] (BoundPortsRequest) 3. ......
startRpcEnvAndEndpoint方法中定義的ENDPOINT_NAME如下。
Master.scala的源碼如下。
1. private[deploy] object Master extends Logging { 2. val SYSTEM_NAME = "sparkMaster" 3. val ENDPOINT_NAME = "Master" 4. .......
startRpcEnvAndEndpoint方法中通過(guò)masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest)給Master自己發(fā)送一個(gè)消息BoundPortsRequest,是一個(gè)case object。發(fā)送消息BoundPortsRequest給自己,確保masterEndpoint正常啟動(dòng)起來(lái)。返回消息的類型是BoundPortsResponse,是一個(gè)case class。
MasterMessages.scala的源碼如下。
1. private[master] object MasterMessages { 2. ...... 3. case object BoundPortsRequest 4. case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) 5. }
Master收到消息BoundPortsRequest,發(fā)送返回消息BoundPortsResponse。
Master.scala的源碼如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case BoundPortsRequest => 4. context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
在BoundPortsResponse傳入的參數(shù)restServerBoundPort是在Master的onStart方法中定義的。
Master.scala的源碼如下。
1. ...... 2. private var restServerBoundPort: Option[Int] = None 3. 4. override def onStart(): Unit = { 5. ...... 6. restServerBoundPort = restServer.map(_.start()) 7. .......
而restServerBoundPort是通過(guò)restServer進(jìn)行map操作啟動(dòng)賦值。下面看一下restServer。
Master.scala的源碼如下。
1. private var restServer: Option[StandaloneRestServer] = None 2. ...... 3. if (restServerEnabled) { 4. val port = conf.getInt("spark.master.rest.port", 6066) 5. restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 6. } 7. ......
其中調(diào)用new()函數(shù)創(chuàng)建一個(gè)StandaloneRestServer。StandaloneRestServer服務(wù)器響應(yīng)請(qǐng)求提交的[RestSubmissionClient],將被嵌入到standalone Master中,僅用于集群模式。服務(wù)器根據(jù)不同的情況使用不同的HTTP代碼進(jìn)行響應(yīng)。
200 OK-請(qǐng)求已成功處理。
400錯(cuò)誤請(qǐng)求:請(qǐng)求格式錯(cuò)誤,未成功驗(yàn)證,或意外類型。
468未知協(xié)議版本:請(qǐng)求指定了此服務(wù)器不支持的協(xié)議。
500內(nèi)部服務(wù)器錯(cuò)誤:服務(wù)器在處理請(qǐng)求時(shí)引發(fā)內(nèi)部異常。
服務(wù)器在HTTP主體中總包含一個(gè)JSON表示的[SubmitRestProtocolResponse]。如果發(fā)生錯(cuò)誤,服務(wù)器將包括一個(gè)[ErrorResponse]。如果構(gòu)造了這個(gè)錯(cuò)誤響應(yīng)內(nèi)部失敗時(shí),響應(yīng)將由一個(gè)空體組成。響應(yīng)體指示內(nèi)部服務(wù)器錯(cuò)誤。
StandaloneRestServer.scala的源碼如下。
1. private[deploy] class StandaloneRestServer( 2. host: String, 3. requestedPort: Int, 4. masterConf: SparkConf, 5. masterEndpoint: RpcEndpointRef, 6. masterUrl: String) 7. extends RestSubmissionServer(host, requestedPort, masterConf) {
下面看一下RestSubmissionClient客戶端??蛻舳颂峤簧暾?qǐng)[RestSubmissionServer]。在協(xié)議版本V1中,REST URL以表單形式出現(xiàn)http://[host:port]/v1/submissions/[action],[action]可以是create、kill或狀態(tài)中的其中一種。每種請(qǐng)求類型都表示為發(fā)送到以下前綴的HTTP消息:
(1)submit - POST to /submissions/create
(2)kill - POST /submissions/kill/[submissionId]
(3)status - GET /submissions/status/[submissionId]
在(1)情況下,參數(shù)以JSON字段的形式發(fā)布到HTTP主體中。否則,URL指定按客戶端的預(yù)期操作。由于該協(xié)議預(yù)計(jì)將在Spark版本中保持穩(wěn)定,因此現(xiàn)有字段不能添加或刪除,但可以添加新的可選字段。如在少見的事件中向前或向后兼容性被破壞,Spark須引入一個(gè)新的協(xié)議版本(如V2)??蛻魴C(jī)和服務(wù)器必須使用協(xié)議的同一版本進(jìn)行通信。如果不匹配,服務(wù)器將用它支持的最高協(xié)議版本進(jìn)行響應(yīng)。此客戶機(jī)的實(shí)現(xiàn)可以用指定的版本使用該信息重試。
RestSubmissionClient.scala的源碼如下。
1. private[spark] class RestSubmissionClient(master: String) extends Logging { 2. import RestSubmissionClient._ 3. private val supportedMasterPrefixes = Seq("spark://", "mesos://") 4. ......
Restful把一切都看成是資源。利用Restful API可以對(duì)Spark進(jìn)行監(jiān)控。程序運(yùn)行的每一個(gè)步驟、Task的計(jì)算步驟都可以可視化,對(duì)Spark的運(yùn)行進(jìn)行詳細(xì)監(jiān)控。
回到startRpcEnvAndEndpoint方法中,新創(chuàng)建了一個(gè)Master實(shí)例。Master實(shí)例化時(shí)會(huì)對(duì)所有的成員進(jìn)行初始化,如默認(rèn)的Cores個(gè)數(shù)等。
Master.scala的源碼如下。
1. private[deploy] class Master( 2. override val rpcEnv: RpcEnv, 3. address: RpcAddress, 4. webUiPort: Int, 5. val securityMgr: SecurityManager, 6. val conf: SparkConf) 7. extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { 8. ....... 9. //缺省maxCores時(shí)默認(rèn)應(yīng)用程序沒有指定(通過(guò)Int.MaxValue) 10. private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) 11. val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false) 12. if (defaultCores < 1) { 13. throw new SparkException("spark.deploy.defaultCores must be positive") 14. } 15. ...... 16.
Master繼承了ThreadSafeRpcEndpoint和LeaderElectable,其中繼承LeaderElectable涉及Master的高可用性(High Availability,HA)機(jī)制。這里先關(guān)注ThreadSafeRpcEndpoint,繼承該類后,Master作為一個(gè)RpcEndpoint,實(shí)例化后首先會(huì)調(diào)用onStart方法。
Master.scala的源碼如下。
1. override def onStart(): Unit = { 2. logInfo("Starting Spark master at " + masterUrl) 3. logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") 4. //構(gòu)建一個(gè)Master的Web UI,查看向Master提交的應(yīng)用程序等信息 5. webUi = new MasterWebUI(this, webUiPort) 6. webUi.bind() 7. masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort 8. if (reverseProxy) { 9. masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) 10. logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + 11. s"Applications UIs are available at $masterWebUiUrl") 12. } 13. //在一個(gè)守護(hù)線程中,啟動(dòng)調(diào)度機(jī)制,周期性檢查Worker是否超時(shí),當(dāng)Worker節(jié)點(diǎn)超時(shí)后, //會(huì)修改其狀態(tài)或從Master中移除其相關(guān)的操作 14. 15. checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate (new Runnable { 16. override def run(): Unit = Utils.tryLogNonFatalError { 17. self.send(CheckForWorkerTimeOut) 18. } 19. }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 20. 21. //默認(rèn)情況下會(huì)啟動(dòng)Rest服務(wù),可以通過(guò)該服務(wù)向Master提交各種請(qǐng)求 22. if (restServerEnabled) { 23. val port = conf.getInt("spark.master.rest.port", 6066) 24. restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 25. } 26. restServerBoundPort = restServer.map(_.start()) 27. //度量(Metroics)相關(guān)的操作,用于監(jiān)控 28. masterMetricsSystem.registerSource(masterSource) 29. masterMetricsSystem.start() 30. applicationMetricsSystem.start() 31. //度量系統(tǒng)啟動(dòng)后,將主程序和應(yīng)用程序度量handler處理程序附加到Web UI中 32. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 33. applicationMetricsSystem.getServletHandlers.foreach(webUi. attachHandler) 34. //Master HA相關(guān)的操作 35. val serializer = new JavaSerializer(conf) 36. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 37. case "ZOOKEEPER" => 38. logInfo("Persisting recovery state to ZooKeeper") 39. val zkFactory = 40. new ZooKeeperRecoveryModeFactory(conf, serializer) 41. (zkFactory.createPersistenceEngine(), zkFactory. createLeaderElectionAgent(this)) 42. case "FILESYSTEM" => 43. val fsFactory = 44. new FileSystemRecoveryModeFactory(conf, serializer) 45. (fsFactory.createPersistenceEngine(), fsFactory.createLeader- ElectionAgent(this)) 46. case "CUSTOM" => 47. val clazz = Utils.classForName(conf.get("spark.deploy. recoveryMode.factory")) 48. val factory = clazz.getConstructor(classOf[SparkConf], classOf [Serializer]) 49. .newInstance(conf, serializer) 50. .asInstanceOf[StandaloneRecoveryModeFactory] 51. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent (this)) 52. case _ => 53. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 54. } 55. persistenceEngine = persistenceEngine_ 56. leaderElectionAgent = leaderElectionAgent_ 57. }
其中在Master的onStart方法中用new()函數(shù)創(chuàng)建MasterWebUI,啟動(dòng)一個(gè)webServer。
Master.scala的源碼如下。
1. override def onStart(): Unit = { 2. ...... 3. webUi = new MasterWebUI(this, webUiPort) 4. webUi.bind() 5. masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi. boundPort
如MasterWebUI的spark.ui.killEnabled設(shè)置為True,可以通過(guò)WebUI頁(yè)面把Spark的進(jìn)程Kill掉。
MasterWebUI.scala的源碼如下。
1. private[master] 2. class MasterWebUI( 3. val master: Master, 4. requestedPort: Int) 5. extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions ("standalone"), 6. requestedPort, master.conf, name = "MasterUI") with Logging { 7. val masterEndpointRef = master.self 8. val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) 9. ....... 10. initialize() 11. 12. /**初始化所有的服務(wù)器組件 */ 13. def initialize() { 14. val masterPage = new MasterPage(this) 15. attachPage(new ApplicationPage(this)) 16. attachPage(masterPage) 17. attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) 18. attachHandler(createRedirectHandler( 19. "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) 20. attachHandler(createRedirectHandler( 21. "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) 22. }
MasterWebUI中在初始化時(shí)用new()函數(shù)創(chuàng)建MasterPage,在MasterPage中通過(guò)代碼去寫Web頁(yè)面。
MasterPage.scala的源碼如下。
1. private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { 2. ...... 3. override def renderJson(request: HttpServletRequest): JValue = { 4. JsonProtocol.writeMasterState(getMasterState) 5. } 6. ..... 7. val content = 8. <div class="row-fluid"> 9. <div class="span12"> 10. <ul class="unstyled"> 11. <li><strong>URL:</strong> {state.uri}</li> 12. { 13. state.restUri.map { uri => 14. <li> 15. <strong>REST URL:</strong> {uri} 16. <span class="rest-uri"> (cluster mode)</span> 17. </li> 18. }.getOrElse { Seq.empty } 19. } 20. <li><strong>Alive Workers:</strong> {aliveWorkers.length}</li> 21. <li><strong>Cores in use:</strong> {aliveWorkers.map (_.cores).sum} Total, 22. {aliveWorkers.map(_.coresUsed).sum} Used</li> 23. <li><strong>Memory in use:</strong> 24. {Utils.megabytesToString(aliveWorkers.map(_.memory).sum)} Total, 25. {Utils.megabytesToString(aliveWorkers.map(_.memoryUsed) .sum)} Used</li> 26. <li><strong>Applications:</strong> 27. {state.activeApps.length} <a href="#running-app">Running</a>, 28. {state.completedApps.length} <a href="#completed-app"> Completed</a> </li> 29. <li><strong>Drivers:</strong> 30. {state.activeDrivers.length} Running, 31. {state.completedDrivers.length} Completed </li> 32. <li><strong>Status:</strong> {state.status}</li> 33. </ul> 34. </div> 35. </div> 36. ........
回到MasterWebUI.scala的initialize()方法,其中調(diào)用了attachPage方法,在WebUI中增加Web頁(yè)面。
WebUI.scala的源碼如下。
1. def attachPage(page: WebUIPage) { 2. val pagePath = "/" + page.prefix 3. val renderHandler = createServletHandler(pagePath, 4. (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) 5. val renderJsonHandler = createServletHandler(pagePath.stripSuffix ("/") + "/json", 6. (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) 7. attachHandler(renderHandler) 8. attachHandler(renderJsonHandler) 9. val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer [ServletContextHandler]()) 10. handlers += renderHandler 11. }
在WebUI的bind方法中啟用了JettyServer。 WebUI.scala的bind的源碼如下。
1. def bind() { 2. assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!") 3. try { 4. val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse ("0.0.0.0") 5. serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) 6. logInfo(s"Bound $className to $host, and started at $webUrl") 7. } catch { 8. case e: Exception => 9. logError(s"Failed to bind $className", e) 10. System.exit(1) 11. } 12. }
JettyUtils.scala的startJettyServer嘗試將Jetty服務(wù)器綁定到所提供的主機(jī)名、端口。
startJettyServer的源碼如下。
1. def startJettyServer( 2. hostName: String, 3. port: Int, 4. sslOptions: SSLOptions, 5. handlers: Seq[ServletContextHandler], 6. conf: SparkConf, 7. serverName: String = ""): ServerInfo = {
5.1.3 Master HA雙機(jī)切換
Spark生產(chǎn)環(huán)境下一般采用ZooKeeper作HA,且建議為3臺(tái)Master,ZooKeeper會(huì)自動(dòng)化管理Masters的切換。
采用ZooKeeper作HA時(shí),ZooKeeper會(huì)保存整個(gè)Spark集群運(yùn)行時(shí)的元數(shù)據(jù),包括Workers、Drivers、Applications、Executors。
ZooKeeper遇到當(dāng)前Active級(jí)別的Master出現(xiàn)故障時(shí)會(huì)從Standby Masters中選取出一臺(tái)作為Active Master,但是要注意,被選舉后到成為真正的Active Master之前需要從ZooKeeper中獲取集群當(dāng)前運(yùn)行狀態(tài)的元數(shù)據(jù)信息并進(jìn)行恢復(fù)。
在Master切換的過(guò)程中,所有已經(jīng)在運(yùn)行的程序皆正常運(yùn)行。因?yàn)镾park Application在運(yùn)行前就已經(jīng)通過(guò)Cluster Manager獲得了計(jì)算資源,所以在運(yùn)行時(shí),Job本身的調(diào)度和處理和Master是沒有任何關(guān)系的。
在Master的切換過(guò)程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應(yīng)用程序給集群,因?yàn)橹挥蠥ctive Master才能接收新的程序提交請(qǐng)求;另一方面,已經(jīng)運(yùn)行的程序中也不能因?yàn)锳ction操作觸發(fā)新的Job提交請(qǐng)求。
ZooKeeper下Master HA的基本流程如圖5-2所示。
ZooKeeper下Master HA的基本流程如下。
(1)使用ZooKeeperPersistenceEngine讀取集群的狀態(tài)數(shù)據(jù),包括Drivers、Applications、Workers、Executors等信息。

圖5-2 ZooKeeper下Master HA的基本流程
(2)判斷元數(shù)據(jù)信息是否有空的內(nèi)容。
(3)把通過(guò)ZooKeeper持久化引擎獲得了Drivers、Applications、Workers、Executors等信息,重新注冊(cè)到Master的內(nèi)存中緩存起來(lái)。
(4)驗(yàn)證獲得的信息和當(dāng)前正在運(yùn)行的集群狀態(tài)的一致性。
(5)將Application和Workers的狀態(tài)標(biāo)識(shí)為Unknown,然后向Application中的Driver以及Workers發(fā)送現(xiàn)在是Leader的Standby模式的Master的地址信息。
(6)當(dāng)Driver和Workers收到新的Master的地址信息后會(huì)響應(yīng)該信息。
(7)Master接收到來(lái)自Drivers和Workers響應(yīng)的信息后會(huì)使用一個(gè)關(guān)鍵的方法:completeRecovery()來(lái)對(duì)沒有響應(yīng)的Applications (Drivers)、Workers (Executors)進(jìn)行處理。處理完畢后,Master的State會(huì)變成RecoveryState.ALIVE,從而開始對(duì)外提供服務(wù)。
(8)此時(shí)Master使用自己的Schedule方法對(duì)正在等待的Application和Drivers進(jìn)行資源調(diào)度。
Master HA的4大方式分別是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。
需要說(shuō)明的是:
(a)ZOOKEEPER是自動(dòng)管理Master。
(b)FILESYSTEM的方式在Master出現(xiàn)故障后需要手動(dòng)重新啟動(dòng)機(jī)器,機(jī)器啟動(dòng)后會(huì)立即成為Active級(jí)別的Master來(lái)對(duì)外提供服務(wù)(接收應(yīng)用程序提交的請(qǐng)求、接收新的Job運(yùn)行的請(qǐng)求)。
(c)CUSTOM的方式允許用戶自定義Master HA的實(shí)現(xiàn),這對(duì)高級(jí)用戶特別有用。
(d)NONE,這是默認(rèn)情況,Spark集群中就采用這種方式,該方式不會(huì)持久化集群的數(shù)據(jù),Master啟動(dòng)后立即管理集群。
Master.scala的HA的源碼如下。
1. override def onStart(): Unit = { 2. ...... 3. val serializer = new JavaSerializer(conf) 4. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 5. case "ZOOKEEPER" => 6. logInfo("Persisting recovery state to ZooKeeper") 7. val zkFactory = 8. new ZooKeeperRecoveryModeFactory(conf, serializer) 9. (zkFactory.createPersistenceEngine(), zkFactory.createLeader- ElectionAgent(this)) 10. case "FILESYSTEM" => 11. val fsFactory = 12. new FileSystemRecoveryModeFactory(conf, serializer) 13. (fsFactory.createPersistenceEngine(), fsFactory. createLeaderElectionAgent(this)) 14. case "CUSTOM" => 15. val clazz = Utils.classForName(conf.get("spark.deploy. recoveryMode.factory")) 16. val factory = clazz.getConstructor(classOf[SparkConf], classOf [Serializer]) 17. .newInstance(conf, serializer) 18. .asInstanceOf[StandaloneRecoveryModeFactory] 19. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent (this)) 20. case _ => 21. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 22. } 23. persistenceEngine = persistenceEngine_ 24. leaderElectionAgent = leaderElectionAgent_ 25. } 26. ......
Spark默認(rèn)的HA方式是NONE。
1. private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
如使用ZOOKEEPER的HA方式,ZooKeeperRecoveryModeFactory.scala的源碼如下。
1. private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer) 2. extends StandaloneRecoveryModeFactory(conf, serializer) { 3. 4. def createPersistenceEngine(): PersistenceEngine = { 5. new ZooKeeperPersistenceEngine(conf, serializer) 6. } 7. 8. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 9. new ZooKeeperLeaderElectionAgent(master, conf) 10. } 11. }
通過(guò)調(diào)用zkFactory.createPersistenceEngine()用new()函數(shù)創(chuàng)建一個(gè)ZooKeeper-PersistenceEngine。
ZooKeeperPersistenceEngine.scala的源碼如下。
1. private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) 2. extends PersistenceEngine 3. with Logging { 4. 5. private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" 6. private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) 7. 8. SparkCuratorUtil.mkdir(zk, WORKING_DIR) 9. 10. 11. override def persist(name: String, obj: Object): Unit = { 12. serializeIntoFile(WORKING_DIR + "/" + name, obj) 13. } 14. 15. override def unpersist(name: String): Unit = { 16. zk.delete().forPath(WORKING_DIR + "/" + name) 17. } 18. 19. override def read[T: ClassTag](prefix: String): Seq[T] = { 20. zk.getChildren.forPath(WORKING_DIR).asScala 21. .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) 22. } 23. 24. override def close() { 25. zk.close() 26. } 27. 28. private def serializeIntoFile(path: String, value: AnyRef) { 29. val serialized = serializer.newInstance().serialize(value) 30. val bytes = new Array[Byte](serialized.remaining()) 31. serialized.get(bytes) 32. zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes) 33. } 34. 35. private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = { 36. val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) 37. try { 38. Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap (fileData))) 39. } catch { 40. case e: Exception => 41. logWarning("Exception while reading persisted file, deleting", e) 42. zk.delete().forPath(WORKING_DIR + "/" + filename) 43. None 44. } 45. } 46. }
PersistenceEngine中有至關(guān)重要的方法persist來(lái)實(shí)現(xiàn)數(shù)據(jù)持久化,readPersistedData來(lái)恢復(fù)集群中的元數(shù)據(jù)。
PersistenceEngine.scala的源碼如下。
1. def persist(name: String, obj: Object): Unit 2. ...... 3. final def readPersistedData( 4. rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq [WorkerInfo]) = { 5. rpcEnv.deserialize { () => 6. (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read [WorkerInfo]("worker_")) 7. } 8. }
下面來(lái)看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中調(diào)用new()函數(shù)創(chuàng)建ZooKeeperLeaderElectionAgent實(shí)例。
StandaloneRecoveryModeFactory.scala的源碼如下。
1. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 2. new ZooKeeperLeaderElectionAgent(master, conf) 3. } 4. }
ZooKeeperLeaderElectionAgent的源碼如下。
1. private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, 2. conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { 3. 4. val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" 5. 6. private var zk: CuratorFramework = _ 7. private var leaderLatch: LeaderLatch = _ 8. private var status = LeadershipStatus.NOT_LEADER 9. 10. start() 11. 12. private def start() { 13. logInfo("Starting ZooKeeper LeaderElection agent") 14. zk = SparkCuratorUtil.newClient(conf) 15. leaderLatch = new LeaderLatch(zk, WORKING_DIR) 16. leaderLatch.addListener(this) 17. leaderLatch.start() 18. } 19. 20. override def stop() { 21. leaderLatch.close() 22. zk.close() 23. } 24. 25. override def isLeader() { 26. synchronized { 27. //可以取得領(lǐng)導(dǎo)權(quán) 28. if (!leaderLatch.hasLeadership) { 29. return 30. } 31. 32. logInfo("We have gained leadership") 33. updateLeadershipStatus(true) 34. } 35. } 36. 37. override def notLeader() { 38. synchronized { 39. //可以取得領(lǐng)導(dǎo)權(quán) 40. if (leaderLatch.hasLeadership) { 41. return 42. } 43. 44. logInfo("We have lost leadership") 45. updateLeadershipStatus(false) 46. } 47. } 48. 49. private def updateLeadershipStatus(isLeader: Boolean) { 50. if (isLeader && status == LeadershipStatus.NOT_LEADER) { 51. status = LeadershipStatus.LEADER 52. masterInstance.electedLeader() 53. } else if (!isLeader && status == LeadershipStatus.LEADER) { 54. status = LeadershipStatus.NOT_LEADER 55. masterInstance.revokedLeadership() 56. } 57. } 58. 59. private object LeadershipStatus extends Enumeration { 60. type LeadershipStatus = Value 61. val LEADER, NOT_LEADER = Value 62. } 63. }
FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式來(lái)完成Leader的選舉,其實(shí)現(xiàn)是直接把傳入的Master作為L(zhǎng)eader。
LeaderElectionAgent.scala的源碼如下。
1. private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable) 2. extends LeaderElectionAgent { 3. masterInstance.electedLeader() 4. }
FileSystemRecoveryModeFactory.scala的源碼如下。
1. private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer) 2. extends StandaloneRecoveryModeFactory(conf, serializer) with Logging { 3. 4. val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") 5. 6. def createPersistenceEngine(): PersistenceEngine = { 7. logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) 8. new FileSystemPersistenceEngine(RECOVERY_DIR, serializer) 9. } 10. 11. def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = { 12. new MonarchyLeaderAgent(master) 13. } 14. }
如果WorkerState狀態(tài)為UNKNOWN(Worker不響應(yīng)),就把它刪除,如果以集群方式運(yùn)行,driver失敗后可以重新啟動(dòng),最后把狀態(tài)變回ALIVE,注意,這里要加入--supervise這個(gè)參數(shù)。
Master.scala的源碼如下。
1. private def completeRecovery() { 2. //使用短同步周期確?!皁nly-once”恢復(fù)一次語(yǔ)義 3. if (state != RecoveryState.RECOVERING) { return } 4. state = RecoveryState.COMPLETING_RECOVERY 5. 6. //殺掉不響應(yīng)消息的workers 和apps 7. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) 8. apps.filter(_.state == ApplicationState.UNKNOWN).foreach (finishApplication) 9. 10. //重新調(diào)度drivers,其未被任何workers聲明 11. drivers.filter(_.worker.isEmpty).foreach { d => 12. logWarning(s"Driver ${d.id} was not found after master recovery") 13. if (d.desc.supervise) { 14. logWarning(s"Re-launching ${d.id}") 15. relaunchDriver(d) 16. } else { 17. removeDriver(d.id, DriverState.ERROR, None) 18. logWarning(s"Did not re-launch ${d.id} because it was not supervised") 19. } 20. }
5.1.4 Master的注冊(cè)機(jī)制和狀態(tài)管理解密
1.Master對(duì)其他組件注冊(cè)的處理
Master接收注冊(cè)的對(duì)象主要是Driver、Application、Worker; Executor不會(huì)注冊(cè)給Master,Executor是注冊(cè)給Driver中的SchedulerBackend的。
Worker是在啟動(dòng)后主動(dòng)向Master注冊(cè)的,所以如果在生產(chǎn)環(huán)境下加入新的Worker到正在運(yùn)行的Spark集群上,此時(shí)不需要重新啟動(dòng)Spark集群就能夠使用新加入的Worker,以提升處理能力。假如在生產(chǎn)環(huán)境中的集群中有500臺(tái)機(jī)器,可能又新加入100臺(tái)機(jī)器,這時(shí)不需要重新啟動(dòng)整個(gè)集群,就可以將100臺(tái)新機(jī)器加入到集群。
Worker的源碼如下。
1. private[deploy] class Worker( 2. override val rpcEnv: RpcEnv, 3. webUiPort: Int, 4. cores: Int, 5. memory: Int, 6. masterRpcAddresses: Array[RpcAddress], 7. endpointName: String, 8. workDirPath: String = null, 9. val conf: SparkConf, 10. val securityMgr: SecurityManager) 11. extends ThreadSafeRpcEndpoint with Logging {
Worker是一個(gè)消息循環(huán)體,繼承自ThreadSafeRpcEndpoint,可以收消息,也可以發(fā)消息。Worker的onStart方法如下。
1. override def onStart() { 2. ...... 3. workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}" 4. registerWithMaster() 5. ...... 6. }
Worker的onStart方法中調(diào)用了registerWithMaster()。
1. private def registerWithMaster() { 2. ....... 3. registrationRetryTimer match { 4. case None => 5. registered = false 6. registerMasterFutures = tryRegisterAllMasters() 7. ......
registerWithMaster方法中調(diào)用了tryRegisterAllMasters,向所有的Master進(jìn)行注冊(cè)。
Spark 2.1.1版本的Worker.scala的源碼如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. masterRpcAddresses.map { masterAddress => 3. registerMasterThreadPool.submit(new Runnable { 4. override def run(): Unit = { 5. try { 6. logInfo("Connecting to master " + masterAddress + "...") 7. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 8. registerWithMaster(masterEndpoint) 9. } catch { 10. case ie: InterruptedException => //Cancelled 11. case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) 12. } 13. } 14. }) 15. } 16. }
Spark 2.2.0版本的Worker.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行registerWithMaster(masterEndpoint)方法調(diào)整為sendRegisterMessageToMaster(masterEndpoint)。
1. ...... 2. sendRegisterMessageToMaster(masterEndpoint) 3. ......
tryRegisterAllMasters方法中,由于實(shí)際運(yùn)行時(shí)有很多Master,因此使用線程池的線程進(jìn)行提交,然后獲取masterEndpoint。masterEndpoint是一個(gè)RpcEndpointRef,通過(guò)registerWithMaster (masterEndpoint)進(jìn)行注冊(cè)。
Spark 2.1.1版本的Worker.scala的registerWithMaster的源碼如下。
1. private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( 3. workerId, host, port, self, cores, memory, workerWebUiUrl)) 4. .onComplete { 5. //這是一個(gè)非常快的動(dòng)作,所以可以用"ThreadUtils.sameThread" 6. case Success(msg) => 7. Utils.tryLogNonFatalError { 8. handleRegisterResponse(msg) 9. } 10. case Failure(e) => 11. logError(s"Cannot register with master: ${masterEndpoint .address}", e) 12. System.exit(1) 13. }(ThreadUtils.sameThread) 14. }
Spark 2.1.1版本的registerWithMaster(masterEndpoint)方法調(diào)整為Spark 2.2.0版本的sendRegisterMessageToMaster(masterEndpoint)。sendRegisterMessageToMaster方法僅將RegisterWorker消息發(fā)送給Master消息循環(huán)體。sendRegisterMessageToMaster方法內(nèi)部不作其他處理。
1. private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.send(RegisterWorker( 3. workerId, 4. host, 5. port, 6. self, 7. cores, 8. memory, 9. workerWebUiUrl, 10. masterEndpoint.address)) 11. }
registerWithMaster方法中的masterEndpoint.ask[RegisterWorkerResponse]傳進(jìn)去的是RegisterWorker。RegisterWorker是一個(gè)case class,包括id、host、port、worker、cores、memory等信息,這里Worker是自己的引用RpcEndpointRef,Master通過(guò)Ref通worker通信。
Spark 2.1.1版本的RegisterWorker.scala的源碼如下。
1. case class RegisterWorker( 2. id: String, 3. host: String, 4. port: Int, 5. worker: RpcEndpointRef, 6. cores: Int, 7. memory: Int, 8. workerWebUiUrl: String) 9. extends DeployMessage { 10. Utils.checkHost(host, "Required hostname") 11. assert (port > 0) 12. }
Spark 2.2.0版本的RegisterWorker.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行之后新增加masterAddress的成員變量:Master的地址,用于Worker節(jié)點(diǎn)連接Master節(jié)點(diǎn)。
1. ...... 2. masterAddress: RpcAddress) 3. ......
Worker通過(guò)registerWithMaster向Master發(fā)送了RegisterWorker消息,Master收到RegisterWorker請(qǐng)求后,進(jìn)行相應(yīng)的處理。
Spark 2.1.1版本的Master.scala的receiveAndReply的源碼如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 2. case RegisterWorker( 3. id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => 4. logInfo("Registering worker %s:%d with %d cores, %s RAM".format( 5. workerHost, workerPort, cores, Utils.megabytesToString(memory))) 6. if (state == RecoveryState.STANDBY) { 7. context.reply(MasterInStandby) 8. } else if (idToWorker.contains(id)) { 9. context.reply(RegisterWorkerFailed("Duplicate worker ID")) 10. } else { 11. val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, 12. workerRef, workerWebUiUrl) 13. if (registerWorker(worker)) { 14. persistenceEngine.addWorker(worker) 15. context.reply(RegisteredWorker(self, masterWebUiUrl)) 16. schedule() 17. } else { 18. val workerAddress = worker.endpoint.address 19. logWarning("Worker registration failed. Attempted to re-register worker at same " + 20. "address: " + workerAddress) 21. context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " 22. + workerAddress)) 23. } 24. }
Spark 2.2.0版本的Master.scala的receiveAndReply的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。
上段代碼中第1行,RegisterWorker消息的模式匹配從receiveAndReply方法中調(diào)整到receive方法。因?yàn)閃orker向Master提交RegisterWorker消息,無(wú)須同步等待Master的答復(fù)。
上段代碼中第3行RegisterWorker的傳入?yún)?shù)新增加了masterAddress。
上段代碼中第9行context.reply方法調(diào)整為workerRef.send方法。
上段代碼中第15行context.reply方法調(diào)整為workerRef.send方法,以及RegisteredWorker中新增了一個(gè)參數(shù)masterAddress。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case RegisterWorker( 4. id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) 5. ...... 6. workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) 7. ...... 8. workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress)) 9. ......
RegisterWorker中,Master接收到Worker的注冊(cè)請(qǐng)求后,首先判斷當(dāng)前的Master是否是Standby的模式,如果是,就不處理;Master的idToWorker包含了所有已經(jīng)注冊(cè)的Worker的信息,然后會(huì)判斷當(dāng)前Master的內(nèi)存數(shù)據(jù)結(jié)構(gòu)idToWorker中是否已經(jīng)有該Worker的注冊(cè),如果有,此時(shí)不會(huì)重復(fù)注冊(cè);其中idToWorker是一個(gè)HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。
1. private val idToWorker = new HashMap[String, WorkerInfo]
WorkerInfo包括id、host、port 、cores、memory、endpoint等內(nèi)容。
1. private[spark] class WorkerInfo( 2. val id: String, 3. val host: String, 4. val port: Int, 5. val cores: Int, 6. val memory: Int, 7. val endpoint: RpcEndpointRef, 8. val webUiAddress: String) 9. extends Serializable {
Master如果決定接收注冊(cè)的Worker,首先會(huì)創(chuàng)建WorkerInfo對(duì)象來(lái)保存注冊(cè)的Worker的信息,然后調(diào)用registerWorker執(zhí)行具體的注冊(cè)的過(guò)程,如果Worker的狀態(tài)是DEAD的狀態(tài),則直接過(guò)濾掉。對(duì)于UNKNOWN的內(nèi)容,調(diào)用removeWorker進(jìn)行清理(包括清理該Worker下的Executors和Drivers)。其中,UNKNOWN的情況:Master進(jìn)行切換時(shí),先對(duì)Worker發(fā)UNKNOWN消息,只有當(dāng)Master收到Worker正確的回復(fù)消息,才將狀態(tài)標(biāo)識(shí)為正常。
registerWorker的源碼如下。
1. private def registerWorker(worker: WorkerInfo): Boolean = { 2. //在同一節(jié)點(diǎn)上可能有一個(gè)或多個(gè)指向掛掉的workers節(jié)點(diǎn)的引用(不同ID),須刪除它們 3. workers.filter { w => 4. (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) 5. }.foreach { w => 6. workers -= w 7. } 8. 9. val workerAddress = worker.endpoint.address 10. if (addressToWorker.contains(workerAddress)) { 11. val oldWorker = addressToWorker(workerAddress) 12. if (oldWorker.state == WorkerState.UNKNOWN) { 13. //未知狀態(tài)的worker 意味著在恢復(fù)過(guò)程中worker 被重新啟動(dòng)。舊的worker節(jié)點(diǎn) //掛掉,須刪掉舊節(jié)點(diǎn),接收新worker節(jié)點(diǎn) 14. removeWorker(oldWorker) 15. } else { 16. logInfo("Attempted to re-register worker at same address: " + workerAddress) 17. return false 18. } 19. } 20. 21. workers += worker 22. idToWorker(worker.id) = worker 23. addressToWorker(workerAddress) = worker 24. if (reverseProxy) { 25. webUi.addProxyTargets(worker.id, worker.webUiAddress) 26. } 27. true 28. }
在registerWorker方法中,Worker注冊(cè)完成后,把注冊(cè)的Worker加入到Master的內(nèi)存數(shù)據(jù)結(jié)構(gòu)中。
1. val workers = new HashSet[WorkerInfo] 2. private val idToWorker = new HashMap[String, WorkerInfo] 3. private val addressToWorker = new HashMap[RpcAddress, WorkerInfo] 4. ...... 5. 6. workers += worker 7. idToWorker(worker.id) = worker 8. addressToWorker(workerAddress) = worker
回到Master.scala的receiveAndReply方法,Worker注冊(cè)完成后,調(diào)用persistenceEngine.addWorker (worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注冊(cè)的數(shù)據(jù)進(jìn)行持久化。
PersistenceEngine.scala的addWorker方法如下。
1. final def addWorker(worker: WorkerInfo): Unit = { 2. persist("worker_" + worker.id, worker) 3. }
ZooKeeperPersistenceEngine是PersistenceEngine的一個(gè)具體實(shí)現(xiàn)子類,其persist方法如下。
1. private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) 2. extends PersistenceEngine 3. ...... 4. override def persist(name: String, obj: Object): Unit = { 5. serializeIntoFile(WORKING_DIR + "/" + name, obj) 6. } 7. ...... 8. private def serializeIntoFile(path: String, value: AnyRef) { 9. val serialized = serializer.newInstance().serialize(value) 10. val bytes = new Array[Byte](serialized.remaining()) 11. serialized.get(bytes) 12. zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes) 13. }
回到Master.scala的receiveAndReply方法,注冊(cè)的Worker數(shù)據(jù)持久化后,進(jìn)行schedule()。至此,Worker的注冊(cè)完成。
同樣,Driver的注冊(cè)過(guò)程:Driver提交給Master進(jìn)行注冊(cè),Master會(huì)將Driver的信息放入內(nèi)存緩存中,加入等待調(diào)度的隊(duì)列,通過(guò)持久化引擎(如ZooKeeper)把注冊(cè)信息持久化,然后進(jìn)行Schedule。
Application的注冊(cè)過(guò)程:Application提交給Master進(jìn)行注冊(cè),Driver啟動(dòng)后會(huì)執(zhí)行SparkContext的初始化,進(jìn)而導(dǎo)致StandaloneSchedulerBackend的產(chǎn)生,其內(nèi)部有StandaloneAppClient。StandaloneAppClient內(nèi)部有ClientEndpoint。ClientEndpoint來(lái)發(fā)送RegisterApplication信息給Master。Master會(huì)將Application的信息放入內(nèi)存緩存中,把Application加入等待調(diào)度的Application隊(duì)列,通過(guò)持久化引擎(如ZooKeeper)把注冊(cè)信息持久化,然后進(jìn)行Schedule。
2.Master對(duì)Driver和Executor狀態(tài)變化的處理
1)對(duì)Driver狀態(tài)變化的處理
如果Driver的各個(gè)狀態(tài)是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,就將其清理掉。其他情況則報(bào)異常。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case DriverStateChanged(driverId, state, exception) => 4. state match { 5. case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => 6. removeDriver(driverId, state, exception) 7. case _ => 8. throw new Exception(s"Received unexpected state update for driver $driverId: $state") 9. }
removeDriver清理掉Driver后,再次調(diào)用schedule方法,removeDriver的源碼如下。
1. private def removeDriver( 2. driverId: String, 3. finalState: DriverState, 4. exception: Option[Exception]) { 5. drivers.find(d => d.id == driverId) match { 6. case Some(driver) => 7. logInfo(s"Removing driver: $driverId") 8. drivers -= driver 9. if (completedDrivers.size >= RETAINED_DRIVERS) { 10. val toRemove = math.max(RETAINED_DRIVERS / 10, 1) 11. completedDrivers.trimStart(toRemove) 12. } 13. completedDrivers += driver 14. persistenceEngine.removeDriver(driver) 15. driver.state = finalState 16. driver.exception = exception 17. driver.worker.foreach(w => w.removeDriver(driver)) 18. schedule() 19. case None => 20. logWarning(s"Asked to remove unknown driver: $driverId") 21. } 22. } 23. }
2)對(duì)Executor狀態(tài)變化的處理
ExecutorStateChanged的源碼如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case ExecutorStateChanged(appId, execId, state, message, exitStatus) => 4. val execOption = idToApp.get(appId).flatMap(app => app.executors. get(execId)) 5. execOption match { 6. case Some(exec) => 7. val appInfo = idToApp(appId) 8. val oldState = exec.state 9. exec.state = state 10. 11. if (state == ExecutorState.RUNNING) { 12. assert(oldState == ExecutorState.LAUNCHING, 13. s"executor $execId state transfer from $oldState to RUNNING is illegal") 14. appInfo.resetRetryCount() 15. } 16. 17. exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) 18. 19. if (ExecutorState.isFinished(state)) { 20. //從worker 和app中刪掉executor 21. logInfo(s"Removing executor ${exec.fullId} because it is $state") 22. //如果應(yīng)用程序已經(jīng)完成,保存其狀態(tài)及在UI上正確顯示其信息 23. if (!appInfo.isFinished) { 24. appInfo.removeExecutor(exec) 25. } 26. exec.worker.removeExecutor(exec) 27. 28. val normalExit = exitStatus == Some(0) 29. //只重試一定次數(shù),這樣就不會(huì)進(jìn)入無(wú)限循環(huán)。重要提示:這個(gè)代碼路徑不是通過(guò) //測(cè)試執(zhí)行的,所以改變if條件時(shí)必須小心 30. if (!normalExit 31. && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES 32. && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this application-killing path 33. val execs = appInfo.executors.values 34. if (!execs.exists(_.state == ExecutorState.RUNNING)) { 35. logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + 36. s"${appInfo.retryCount} times; removing it") 37. removeApplication(appInfo, ApplicationState.FAILED) 38. } 39. } 40. } 41. schedule() 42. case None => 43. logWarning(s"Got status update for unknown executor $appId/ $execId") 44. }
Executor掛掉時(shí)系統(tǒng)會(huì)嘗試一定次數(shù)的重啟(最多重啟10次)。
1. private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy. maxExecutorRetries", 10)
- 亮劍.NET:.NET深入體驗(yàn)與實(shí)戰(zhàn)精要
- 腦動(dòng)力:Linux指令速查效率手冊(cè)
- 大數(shù)據(jù)戰(zhàn)爭(zhēng):人工智能時(shí)代不能不說(shuō)的事
- Getting Started with MariaDB
- TestStand工業(yè)自動(dòng)化測(cè)試管理(典藏版)
- Getting Started with Containerization
- 機(jī)器學(xué)習(xí)與大數(shù)據(jù)技術(shù)
- 群體智能與數(shù)據(jù)挖掘
- 分布式多媒體計(jì)算機(jī)系統(tǒng)
- 大學(xué)計(jì)算機(jī)應(yīng)用基礎(chǔ)
- Statistics for Data Science
- 網(wǎng)站入侵與腳本攻防修煉
- 工業(yè)機(jī)器人實(shí)操進(jìn)階手冊(cè)
- 基于RPA技術(shù)財(cái)務(wù)機(jī)器人的應(yīng)用與研究
- 計(jì)算機(jī)應(yīng)用基礎(chǔ)實(shí)訓(xùn)(職業(yè)模塊)