官术网_书友最值得收藏!

5.1 Master啟動(dòng)原理和源碼詳解

本節(jié)講解Master啟動(dòng)的原理和源碼;Master HA雙機(jī)切換;Master的注冊(cè)機(jī)制和狀態(tài)管理解密等內(nèi)容。

5.1.1 Master啟動(dòng)的原理詳解

Spark應(yīng)用程序作為獨(dú)立的集群進(jìn)程運(yùn)行,由主程序中的SparkContext對(duì)象(稱為驅(qū)動(dòng)程序)協(xié)調(diào)。Spark集群部署組件圖5-1所示。

圖5-1 Spark集群部署組件圖

其中各個(gè)術(shù)語(yǔ)及相關(guān)術(shù)語(yǔ)的描述如下。

(1)Driver Program:運(yùn)行Application的main函數(shù)并新建SparkContext實(shí)例的程序,稱為驅(qū)動(dòng)程序(Driver Program)。通常可以使用SparkContext代表驅(qū)動(dòng)程序。

(2)Cluster Manager:集群管理器(Cluster Manager)是集群資源管理的外部服務(wù)。Spark上現(xiàn)在主要有Standalone、YARN、Mesos 3種集群資源管理器。Spark自帶的Standalone模式能夠滿足絕大部分純粹的Spark計(jì)算環(huán)境中對(duì)集群資源管理的需求,基本上只有在集群中運(yùn)行多套計(jì)算框架的時(shí)候才建議考慮YARN和Mesos。

(3)Worker Node:集群中可以運(yùn)行Application代碼的工作節(jié)點(diǎn)(Worker Node),相當(dāng)于Hadoop的Slave節(jié)點(diǎn)。

(4)Executor:在Worker Node上為Application啟動(dòng)的一個(gè)工作進(jìn)程,在進(jìn)程中負(fù)責(zé)任務(wù)(Task)的運(yùn)行,并且負(fù)責(zé)將數(shù)據(jù)存放在內(nèi)存或磁盤上,在Executor內(nèi)部通過(guò)多線程的方式(即線程池)并發(fā)處理應(yīng)用程序的具體任務(wù)。

每個(gè)Application都有各自獨(dú)立的Executors,因此應(yīng)用程序之間是相互隔離的。

(5)Task:任務(wù)(Task)是指被Driver送到Executor上的工作單元。通常,一個(gè)任務(wù)會(huì)處理一個(gè)Partition的數(shù)據(jù),每個(gè)Partition一般是一個(gè)HDFS的Block塊的大小。

(6)Application:是創(chuàng)建了SparkContext實(shí)例對(duì)象的Spark用戶程序,包含了一個(gè)Driver program和集群中多個(gè)Worker上的Executor。

(7)Job:和Spark的action對(duì)應(yīng),每個(gè)action,如count、savaAsTextFile等都會(huì)對(duì)應(yīng)一個(gè)Job實(shí)例,每個(gè)Job會(huì)拆分成多個(gè)Stages,一個(gè)Stage中包含一個(gè)任務(wù)集(TaskSet),任務(wù)集中的各個(gè)任務(wù)通過(guò)一定的調(diào)度機(jī)制發(fā)送到工作單位(Executor)上并行執(zhí)行。

Spark Standalone集群的部署采用典型的Master/Slave架構(gòu)。其中,Master節(jié)點(diǎn)負(fù)責(zé)整個(gè)集群的資源管理與調(diào)度,Worker節(jié)點(diǎn)(也可以稱Slave節(jié)點(diǎn))在Master節(jié)點(diǎn)的調(diào)度下啟動(dòng)Executor,負(fù)責(zé)執(zhí)行具體工作(包括應(yīng)用程序以及應(yīng)用程序提交的任務(wù))。

5.1.2 Master啟動(dòng)的源碼詳解

Spark中各個(gè)組件是通過(guò)腳本來(lái)啟動(dòng)部署的。下面以腳本為入口點(diǎn)開始分析Master的部署。每個(gè)組件對(duì)應(yīng)提供了啟動(dòng)的腳本,同時(shí)也會(huì)提供停止的腳本。停止腳本比較簡(jiǎn)單,在此僅分析啟動(dòng)腳本。

1.Master部署的啟動(dòng)腳本解析

首先看一下Master的啟動(dòng)腳本./sbin/start-master.sh,內(nèi)容如下。

1.  # 在腳本的執(zhí)行節(jié)點(diǎn)啟動(dòng)Master組件
2.
3.  #如果沒有設(shè)置環(huán)境變量SPARK_HOME,會(huì)根據(jù)腳本所在位置自動(dòng)設(shè)置
4.  if [ -z "${SPARK_HOME}" ]; then
5.    export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
6.  fi
7.
8.  #注:提取的類名必須和SparkSubmit的類相匹配。任何變化都需在類中進(jìn)行反映
9.
10. # Master 組件對(duì)應(yīng)的類
11. CLASS="org.apache.spark.deploy.master.Master"
12.
13. #腳本的幫助信息
14.
15. if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
16.   echo "Usage: ./sbin/start-master.sh [options]"
17.   pattern="Usage:"
18.   pattern+="\|Using Spark's default log4j profile:"
19.   pattern+="\|Registered signal handlers for"
20.
21. # 通過(guò)腳本spark-class執(zhí)行指定的Master類,參數(shù)為--help
22.   "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern"
      1>&2
23.   exit 1
24. fi
25.
26. ORIGINAL_ARGS="$@"
27.
28. #控制啟動(dòng)Master時(shí),是否同時(shí)啟動(dòng) Tachyon的Master組件
29. START_TACHYON=false
30.
31. while (( "$#" )); do
32. case $1 in
33.     --with-tachyon)
34.       if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then
35.         echo "Error: --with-tachyon specified, but tachyon not found."
36.         exit -1
37.       fi
38.       START_TACHYON=true
39.       ;;
40. esac
41. shift
42. done
43.
44. . "${SPARK_HOME}/sbin/spark-config.sh"
45.
46. . "${SPARK_HOME}/bin/load-spark-env.sh"
47.
48. #下面的一些參數(shù)對(duì)應(yīng)的默認(rèn)配置屬性
49. if [ "$SPARK_MASTER_PORT" = "" ]; then
50.   SPARK_MASTER_PORT=7077
51. fi
52.
53. //用于MasterURL,所以當(dāng)沒有設(shè)置時(shí),默認(rèn)使用hostname,而不是IP地址
54. //該MasterURL在Worker注冊(cè)或應(yīng)用程序提交時(shí)使用
55. if [ "$SPARK_MASTER_IP" = "" ]; then
56.   SPARK_MASTER_IP=`hostname`
57. fi
58.
59. if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
60.   SPARK_MASTER_WEBUI_PORT=8080
61. fi
62.
63. #通過(guò)啟動(dòng)后臺(tái)進(jìn)程的腳本spark-daemon.sh來(lái)啟動(dòng)Master組件
64. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
65.   --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_
      MASTER_WEBUI_PORT \
66.   $ORIGINAL_ARGS
67.
68. #需要時(shí)同時(shí)啟動(dòng)Tachyon,此時(shí)Tachyon是編譯在Spark內(nèi)的
69. if [ "$START_TACHYON" == "true" ]; then
70.   "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP
71.   "${SPARK_HOME}"/tachyon/bin/tachyon format -s
72.   "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master
73. fi

通過(guò)腳本的簡(jiǎn)單分析,可以看出Master組件是以后臺(tái)守護(hù)進(jìn)程的方式啟動(dòng)的,對(duì)應(yīng)的后臺(tái)守護(hù)進(jìn)程的啟動(dòng)腳本spark-daemon.sh,在后臺(tái)守護(hù)進(jìn)程的啟動(dòng)腳本spark-daemon.sh內(nèi)部,通過(guò)腳本spark-class啟動(dòng)一個(gè)指定主類的JVM進(jìn)程,相關(guān)代碼如下所示。

1.  case "$mode" in
2.  #這里對(duì)應(yīng)的是啟動(dòng)一個(gè)Spark類
3.      (class)
4.  nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command
    "$@" >> "$log" 2>&1 < /dev/null &
5.  newpid="$!"
6.        ;;
7.  #這里對(duì)應(yīng)提交一個(gè)Spark 應(yīng)用程序
8.      (submit)
9.  nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class
    $command "$@" >> "$log" 2>&1 < /dev/null &
10. newpid="$!"
11.       ;;
12.
13.     (*)
14.       echo "unknown mode: $mode"
15.       exit 1
16.       ;;

通過(guò)腳本的分析,可以知道最終執(zhí)行的是Master類(對(duì)應(yīng)的代碼為前面的CLASS="org.apache.spark.deploy.master.Master"),對(duì)應(yīng)的入口點(diǎn)是Master伴生對(duì)象中的main方法。下面以該方法作為入口點(diǎn)進(jìn)一步解析Master部署框架。

部署Master組件時(shí),最簡(jiǎn)單的方式是直接啟動(dòng)腳本,不帶任何選項(xiàng)參數(shù),命令如下所示。

1.  ./sbin/start-master.sh

如需設(shè)置選項(xiàng)參數(shù),可以查看幫助信息,根據(jù)自己的需要進(jìn)行設(shè)置。

2.Master的源碼解析

首先查看Master伴生對(duì)象中的main方法。

Master.scala的源碼如下。

1.  def main(argStrings: Array[String]) {
2.    Utils.initDaemon(log)
3.    val conf = new SparkConf
4.  //構(gòu)建參數(shù)解析的實(shí)例
5.    val args = new MasterArguments(argStrings, conf)
6.   //啟動(dòng)RPC通信環(huán)境以及Master的RPC通信終端
7.    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port,
      args.webUiPort, conf)
8.    rpcEnv.awaitTermination()
9.  }

和其他類(如SparkSubmit)一樣,Master類的入口點(diǎn)處也包含了對(duì)應(yīng)的參數(shù)類MasterArguments。MasterArguments類包括Spark屬性配置相關(guān)的一些解析。

MasterArguments.scala的源碼如下。

1.  private[master] class MasterArguments(args: Array[String], conf:
    SparkConf) extends Logging {
2.   var host = Utils.localHostName()
3.   var port = 7077
4.   var webUiPort = 8080
5.   var propertiesFile: String = null
6.
7.   //讀取啟動(dòng)腳本中設(shè)置的環(huán)境變量
8.    if (System.getenv("SPARK_MASTER_IP") != null) {
9.      logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_
        HOST")
10.     host = System.getenv("SPARK_MASTER_IP")
11.   }
12.
13.   if (System.getenv("SPARK_MASTER_HOST") != null) {
14.     host = System.getenv("SPARK_MASTER_HOST")
15.   }
16.   if (System.getenv("SPARK_MASTER_PORT") != null) {
17.     port = System.getenv("SPARK_MASTER_PORT").toInt
18.   }
19.   if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
20.     webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
21.   }
22.  //命令行選項(xiàng)參數(shù)的解析
23.   parse(args.toList)
24.
25.   //加載SparkConf文件,所有的訪問(wèn)必須經(jīng)過(guò)此行
26.   propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
27.
28.   if (conf.contains("spark.master.ui.port")) {
29.     webUiPort = conf.get("spark.master.ui.port").toInt
30.   }
31. ......

MasterArguments中的printUsageAndExit方法對(duì)應(yīng)的就是命令行中的幫助信息。

MasterArguments.scala的源碼如下。

1.   private def printUsageAndExit(exitCode: Int) {
2.     //scalastyle:off println
3.     System.err.println(
4.       "Usage: Master [options]\n" +
5.       "\n" +
6.       "Options:\n" +
7.       "  -i HOST, --ip HOST     Hostname to listen on (deprecated, please
         use --host or -h) \n" +
8.       "  -h HOST, --host HOST   Hostname to listen on\n" +
9.       "  -p PORT, --port PORT   Port to listen on (default: 7077)\n" +
10.      "  --webui-port PORT      Port for web UI (default: 8080)\n" +
11.      "  --properties-file FILE Path to a custom Spark properties file.\n" +
12.      "                         Default is conf/spark-defaults.conf.")
13.    //scalastyle:on println
14.    System.exit(exitCode)
15.  }

解析完Master的參數(shù)后,調(diào)用startRpcEnvAndEndpoin方法啟動(dòng)RPC通信環(huán)境以及Master的RPC通信終端。

Spark 2.1.1版本的Master.scala的startRpcEnvAndEndpoint的源碼如下。

1.
2.  /**
3.    * 啟動(dòng)Master并返回一個(gè)三元組:
4.    *   (1) The Master RpcEnv
5.    *   (2) The web UI bound port
6.     *   (3) The REST server bound port, if any
7.     */
8.
9.  def startRpcEnvAndEndpoint(
10.       host: String,
11.       port: Int,
12.       webUiPort: Int,
13.       conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
14.     val securityMgr = new SecurityManager(conf)
15.    //構(gòu)建RPC通信環(huán)境
16.     val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
17.    //構(gòu)建RPC通信終端,實(shí)例化Master
18.     val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
19.       new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
20.
21.  //向Master的通信終端發(fā)送請(qǐng)求,獲取綁定的端口號(hào)
22.  //包含Master的Web UI監(jiān)聽端口號(hào)和REST的監(jiān)聽端口號(hào)
23.
24.     val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse]
        (BoundPortsRequest)
25.     (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
26.   }
27. }

Spark 2.2.0版本的Master.scala的startRpcEnvAndEndpoint的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第24行MasterEndpoint的askWithRetry方法調(diào)整為askSync方法。

1.      ......
2.     val portsResponse = masterEndpoint.askSync[BoundPortsResponse]
       (BoundPortsRequest)
3.  ......

startRpcEnvAndEndpoint方法中定義的ENDPOINT_NAME如下。

Master.scala的源碼如下。

1.    private[deploy] object Master extends Logging {
2.    val SYSTEM_NAME = "sparkMaster"
3.    val ENDPOINT_NAME = "Master"
4.  .......

startRpcEnvAndEndpoint方法中通過(guò)masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest)給Master自己發(fā)送一個(gè)消息BoundPortsRequest,是一個(gè)case object。發(fā)送消息BoundPortsRequest給自己,確保masterEndpoint正常啟動(dòng)起來(lái)。返回消息的類型是BoundPortsResponse,是一個(gè)case class。

MasterMessages.scala的源碼如下。

1.   private[master] object MasterMessages {
2.  ......
3.    case object BoundPortsRequest
4.    case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int,
      restPort: Option[Int])
5.  }

Master收到消息BoundPortsRequest,發(fā)送返回消息BoundPortsResponse。

Master.scala的源碼如下。

1.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
2.  ......
3.  case BoundPortsRequest =>
4.  context.reply(BoundPortsResponse(address.port, webUi.boundPort,
    restServerBoundPort))

在BoundPortsResponse傳入的參數(shù)restServerBoundPort是在Master的onStart方法中定義的。

Master.scala的源碼如下。

1.   ......
2.    private var restServerBoundPort: Option[Int] = None
3.
4.    override def onStart(): Unit = {
5.  ......
6.     restServerBoundPort = restServer.map(_.start())
7.  .......

而restServerBoundPort是通過(guò)restServer進(jìn)行map操作啟動(dòng)賦值。下面看一下restServer。

Master.scala的源碼如下。

1.    private var restServer: Option[StandaloneRestServer] = None
2.  ......
3.     if (restServerEnabled) {
4.        val port = conf.getInt("spark.master.rest.port", 6066)
5.       restServer = Some(new StandaloneRestServer(address.host, port, conf,
         self, masterUrl))
6.      }
7.  ......

其中調(diào)用new()函數(shù)創(chuàng)建一個(gè)StandaloneRestServer。StandaloneRestServer服務(wù)器響應(yīng)請(qǐng)求提交的[RestSubmissionClient],將被嵌入到standalone Master中,僅用于集群模式。服務(wù)器根據(jù)不同的情況使用不同的HTTP代碼進(jìn)行響應(yīng)。

 200 OK-請(qǐng)求已成功處理。

 400錯(cuò)誤請(qǐng)求:請(qǐng)求格式錯(cuò)誤,未成功驗(yàn)證,或意外類型。

 468未知協(xié)議版本:請(qǐng)求指定了此服務(wù)器不支持的協(xié)議。

 500內(nèi)部服務(wù)器錯(cuò)誤:服務(wù)器在處理請(qǐng)求時(shí)引發(fā)內(nèi)部異常。

服務(wù)器在HTTP主體中總包含一個(gè)JSON表示的[SubmitRestProtocolResponse]。如果發(fā)生錯(cuò)誤,服務(wù)器將包括一個(gè)[ErrorResponse]。如果構(gòu)造了這個(gè)錯(cuò)誤響應(yīng)內(nèi)部失敗時(shí),響應(yīng)將由一個(gè)空體組成。響應(yīng)體指示內(nèi)部服務(wù)器錯(cuò)誤。

StandaloneRestServer.scala的源碼如下。

1.  private[deploy] class StandaloneRestServer(
2.     host: String,
3.     requestedPort: Int,
4.     masterConf: SparkConf,
5.     masterEndpoint: RpcEndpointRef,
6.     masterUrl: String)
7.   extends RestSubmissionServer(host, requestedPort, masterConf) {

下面看一下RestSubmissionClient客戶端??蛻舳颂峤簧暾?qǐng)[RestSubmissionServer]。在協(xié)議版本V1中,REST URL以表單形式出現(xiàn)http://[host:port]/v1/submissions/[action],[action]可以是create、kill或狀態(tài)中的其中一種。每種請(qǐng)求類型都表示為發(fā)送到以下前綴的HTTP消息:

(1)submit - POST to /submissions/create

(2)kill - POST /submissions/kill/[submissionId]

(3)status - GET /submissions/status/[submissionId]

在(1)情況下,參數(shù)以JSON字段的形式發(fā)布到HTTP主體中。否則,URL指定按客戶端的預(yù)期操作。由于該協(xié)議預(yù)計(jì)將在Spark版本中保持穩(wěn)定,因此現(xiàn)有字段不能添加或刪除,但可以添加新的可選字段。如在少見的事件中向前或向后兼容性被破壞,Spark須引入一個(gè)新的協(xié)議版本(如V2)??蛻魴C(jī)和服務(wù)器必須使用協(xié)議的同一版本進(jìn)行通信。如果不匹配,服務(wù)器將用它支持的最高協(xié)議版本進(jìn)行響應(yīng)。此客戶機(jī)的實(shí)現(xiàn)可以用指定的版本使用該信息重試。

RestSubmissionClient.scala的源碼如下。

1.   private[spark] class RestSubmissionClient(master: String) extends
     Logging {
2.    import RestSubmissionClient._
3.    private val supportedMasterPrefixes = Seq("spark://", "mesos://")
4.  ......

Restful把一切都看成是資源。利用Restful API可以對(duì)Spark進(jìn)行監(jiān)控。程序運(yùn)行的每一個(gè)步驟、Task的計(jì)算步驟都可以可視化,對(duì)Spark的運(yùn)行進(jìn)行詳細(xì)監(jiān)控。

回到startRpcEnvAndEndpoint方法中,新創(chuàng)建了一個(gè)Master實(shí)例。Master實(shí)例化時(shí)會(huì)對(duì)所有的成員進(jìn)行初始化,如默認(rèn)的Cores個(gè)數(shù)等。

Master.scala的源碼如下。

1.   private[deploy] class Master(
2.      override val rpcEnv: RpcEnv,
3.      address: RpcAddress,
4.      webUiPort: Int,
5.      val securityMgr: SecurityManager,
6.      val conf: SparkConf)
7.    extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
8.  .......
9.    //缺省maxCores時(shí)默認(rèn)應(yīng)用程序沒有指定(通過(guò)Int.MaxValue)
10.   private val defaultCores = conf.getInt("spark.deploy.defaultCores",
      Int.MaxValue)
11.   val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
12.   if (defaultCores < 1) {
13.     throw new SparkException("spark.deploy.defaultCores must be positive")
14.   }
15. ......
16.

Master繼承了ThreadSafeRpcEndpoint和LeaderElectable,其中繼承LeaderElectable涉及Master的高可用性(High Availability,HA)機(jī)制。這里先關(guān)注ThreadSafeRpcEndpoint,繼承該類后,Master作為一個(gè)RpcEndpoint,實(shí)例化后首先會(huì)調(diào)用onStart方法。

Master.scala的源碼如下。

1.  override def onStart(): Unit = {
2.    logInfo("Starting Spark master at " + masterUrl)
3.     logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
4.     //構(gòu)建一個(gè)Master的Web UI,查看向Master提交的應(yīng)用程序等信息
5.     webUi = new MasterWebUI(this, webUiPort)
6.     webUi.bind()
7.     masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
8.     if (reverseProxy) {
9.       masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
10.      logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers
         and " +
11.       s"Applications UIs are available at $masterWebUiUrl")
12.    }
13.  //在一個(gè)守護(hù)線程中,啟動(dòng)調(diào)度機(jī)制,周期性檢查Worker是否超時(shí),當(dāng)Worker節(jié)點(diǎn)超時(shí)后,
     //會(huì)修改其狀態(tài)或從Master中移除其相關(guān)的操作
14.
15.    checkForWorkerTimeOutTask      =   forwardMessageThread.scheduleAtFixedRate
       (new Runnable {
16.      override def run(): Unit = Utils.tryLogNonFatalError {
17.        self.send(CheckForWorkerTimeOut)
18.      }
19.    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
20.
21.  //默認(rèn)情況下會(huì)啟動(dòng)Rest服務(wù),可以通過(guò)該服務(wù)向Master提交各種請(qǐng)求
22.    if (restServerEnabled) {
23.      val port = conf.getInt("spark.master.rest.port", 6066)
24.     restServer = Some(new StandaloneRestServer(address.host, port, conf,
        self, masterUrl))
25.    }
26.    restServerBoundPort = restServer.map(_.start())
27.  //度量(Metroics)相關(guān)的操作,用于監(jiān)控
28.    masterMetricsSystem.registerSource(masterSource)
29.    masterMetricsSystem.start()
30.    applicationMetricsSystem.start()
31.   //度量系統(tǒng)啟動(dòng)后,將主程序和應(yīng)用程序度量handler處理程序附加到Web UI中
32.    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
33.    applicationMetricsSystem.getServletHandlers.foreach(webUi.
       attachHandler)
34.   //Master HA相關(guān)的操作
35.    val serializer = new JavaSerializer(conf)
36.    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
37.      case "ZOOKEEPER" =>
38.        logInfo("Persisting recovery state to ZooKeeper")
39.        val zkFactory =
40.          new ZooKeeperRecoveryModeFactory(conf, serializer)
41.        (zkFactory.createPersistenceEngine(), zkFactory.
           createLeaderElectionAgent(this))
42.      case "FILESYSTEM" =>
43.        val fsFactory =
44.          new FileSystemRecoveryModeFactory(conf, serializer)
45.        (fsFactory.createPersistenceEngine(), fsFactory.createLeader-
           ElectionAgent(this))
46.      case "CUSTOM" =>
47.        val clazz = Utils.classForName(conf.get("spark.deploy.
           recoveryMode.factory"))
48.        val factory = clazz.getConstructor(classOf[SparkConf], classOf
           [Serializer])
49.          .newInstance(conf, serializer)
50.          .asInstanceOf[StandaloneRecoveryModeFactory]
51.        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent
           (this))
52.      case _ =>
53.        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
54.    }
55.    persistenceEngine = persistenceEngine_
56.    leaderElectionAgent = leaderElectionAgent_
57.  }

其中在Master的onStart方法中用new()函數(shù)創(chuàng)建MasterWebUI,啟動(dòng)一個(gè)webServer。

Master.scala的源碼如下。

1.    override def onStart(): Unit = {
2.  ......
3.   webUi = new MasterWebUI(this, webUiPort)
4.      webUi.bind()
5.      masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.
        boundPort

如MasterWebUI的spark.ui.killEnabled設(shè)置為True,可以通過(guò)WebUI頁(yè)面把Spark的進(jìn)程Kill掉。

MasterWebUI.scala的源碼如下。

1.     private[master]
2.  class MasterWebUI(
3.      val master: Master,
4.      requestedPort: Int)
5.    extends    WebUI(master.securityMgr,        master.securityMgr.getSSLOptions
      ("standalone"),
6.      requestedPort, master.conf, name = "MasterUI") with Logging {
7.    val masterEndpointRef = master.self
8.    val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
9.  .......
10. initialize()
11.
12.   /**初始化所有的服務(wù)器組件 */
13.   def initialize() {
14.     val masterPage = new MasterPage(this)
15.     attachPage(new ApplicationPage(this))
16.     attachPage(masterPage)
17.     attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR,
        "/static"))
18.     attachHandler(createRedirectHandler(
19.       "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods =
          Set("POST")))
20.     attachHandler(createRedirectHandler(
21.       "/driver/kill", "/", masterPage.handleDriverKillRequest,
          httpMethods = Set("POST")))
22.   }

MasterWebUI中在初始化時(shí)用new()函數(shù)創(chuàng)建MasterPage,在MasterPage中通過(guò)代碼去寫Web頁(yè)面。

MasterPage.scala的源碼如下。

1.  private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
2.  ......
3.   override def renderJson(request: HttpServletRequest): JValue = {
4.     JsonProtocol.writeMasterState(getMasterState)
5.    }
6.  .....
7.    val content =
8.          <div class="row-fluid">
9.            <div class="span12">
10.             <ul class="unstyled">
11.               <li><strong>URL:</strong> {state.uri}</li>
12.               {
13.                 state.restUri.map { uri =>
14.                   <li>
15.                     <strong>REST URL:</strong> {uri}
16.                     <span class="rest-uri"> (cluster mode)</span>
17.                   </li>
18.                 }.getOrElse { Seq.empty }
19.               }
20.               <li><strong>Alive Workers:</strong> {aliveWorkers.length}</li>
21.               <li><strong>Cores in use:</strong> {aliveWorkers.map
                 (_.cores).sum} Total,
22.                 {aliveWorkers.map(_.coresUsed).sum} Used</li>
23.               <li><strong>Memory in use:</strong>
24.                 {Utils.megabytesToString(aliveWorkers.map(_.memory).sum)}
                   Total,
25.                 {Utils.megabytesToString(aliveWorkers.map(_.memoryUsed)
                    .sum)} Used</li>
26.               <li><strong>Applications:</strong>
27.                 {state.activeApps.length} <a href="#running-app">Running</a>,
28.                 {state.completedApps.length}         <a   href="#completed-app">
                    Completed</a> </li>
29.               <li><strong>Drivers:</strong>
30.                 {state.activeDrivers.length} Running,
31.                 {state.completedDrivers.length} Completed </li>
32.               <li><strong>Status:</strong> {state.status}</li>
33.             </ul>
34.           </div>
35.         </div>
36. ........

回到MasterWebUI.scala的initialize()方法,其中調(diào)用了attachPage方法,在WebUI中增加Web頁(yè)面。

WebUI.scala的源碼如下。

1.     def attachPage(page: WebUIPage) {
2.     val pagePath = "/" + page.prefix
3.     val renderHandler = createServletHandler(pagePath,
4.       (request: HttpServletRequest) => page.render(request),
         securityManager, conf, basePath)
5.     val renderJsonHandler = createServletHandler(pagePath.stripSuffix
       ("/") + "/json",
6.       (request: HttpServletRequest) => page.renderJson(request),
         securityManager, conf, basePath)
7.     attachHandler(renderHandler)
8.     attachHandler(renderJsonHandler)
9.     val   handlers    =   pageToHandlers.getOrElseUpdate(page,         ArrayBuffer
       [ServletContextHandler]())
10.    handlers += renderHandler
11.  }

在WebUI的bind方法中啟用了JettyServer。 WebUI.scala的bind的源碼如下。

1.   def bind() {
2.     assert(!serverInfo.isDefined, s"Attempted to bind $className more than
       once!")
3.     try {
4.       val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse
         ("0.0.0.0")
5.      serverInfo = Some(startJettyServer(host, port, sslOptions, handlers,
        conf, name))
6.       logInfo(s"Bound $className to $host, and started at $webUrl")
7.     } catch {
8.       case e: Exception =>
9.         logError(s"Failed to bind $className", e)
10.        System.exit(1)
11.    }
12.  }

JettyUtils.scala的startJettyServer嘗試將Jetty服務(wù)器綁定到所提供的主機(jī)名、端口。

startJettyServer的源碼如下。

1.  def startJettyServer(
2.        hostName: String,
3.        port: Int,
4.        sslOptions: SSLOptions,
5.        handlers: Seq[ServletContextHandler],
6.        conf: SparkConf,
7.        serverName: String = ""): ServerInfo = {

5.1.3 Master HA雙機(jī)切換

Spark生產(chǎn)環(huán)境下一般采用ZooKeeper作HA,且建議為3臺(tái)Master,ZooKeeper會(huì)自動(dòng)化管理Masters的切換。

采用ZooKeeper作HA時(shí),ZooKeeper會(huì)保存整個(gè)Spark集群運(yùn)行時(shí)的元數(shù)據(jù),包括Workers、Drivers、Applications、Executors。

ZooKeeper遇到當(dāng)前Active級(jí)別的Master出現(xiàn)故障時(shí)會(huì)從Standby Masters中選取出一臺(tái)作為Active Master,但是要注意,被選舉后到成為真正的Active Master之前需要從ZooKeeper中獲取集群當(dāng)前運(yùn)行狀態(tài)的元數(shù)據(jù)信息并進(jìn)行恢復(fù)。

在Master切換的過(guò)程中,所有已經(jīng)在運(yùn)行的程序皆正常運(yùn)行。因?yàn)镾park Application在運(yùn)行前就已經(jīng)通過(guò)Cluster Manager獲得了計(jì)算資源,所以在運(yùn)行時(shí),Job本身的調(diào)度和處理和Master是沒有任何關(guān)系的。

在Master的切換過(guò)程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應(yīng)用程序給集群,因?yàn)橹挥蠥ctive Master才能接收新的程序提交請(qǐng)求;另一方面,已經(jīng)運(yùn)行的程序中也不能因?yàn)锳ction操作觸發(fā)新的Job提交請(qǐng)求。

ZooKeeper下Master HA的基本流程如圖5-2所示。

ZooKeeper下Master HA的基本流程如下。

(1)使用ZooKeeperPersistenceEngine讀取集群的狀態(tài)數(shù)據(jù),包括Drivers、Applications、Workers、Executors等信息。

圖5-2 ZooKeeper下Master HA的基本流程

(2)判斷元數(shù)據(jù)信息是否有空的內(nèi)容。

(3)把通過(guò)ZooKeeper持久化引擎獲得了Drivers、Applications、Workers、Executors等信息,重新注冊(cè)到Master的內(nèi)存中緩存起來(lái)。

(4)驗(yàn)證獲得的信息和當(dāng)前正在運(yùn)行的集群狀態(tài)的一致性。

(5)將Application和Workers的狀態(tài)標(biāo)識(shí)為Unknown,然后向Application中的Driver以及Workers發(fā)送現(xiàn)在是Leader的Standby模式的Master的地址信息。

(6)當(dāng)Driver和Workers收到新的Master的地址信息后會(huì)響應(yīng)該信息。

(7)Master接收到來(lái)自Drivers和Workers響應(yīng)的信息后會(huì)使用一個(gè)關(guān)鍵的方法:completeRecovery()來(lái)對(duì)沒有響應(yīng)的Applications (Drivers)、Workers (Executors)進(jìn)行處理。處理完畢后,Master的State會(huì)變成RecoveryState.ALIVE,從而開始對(duì)外提供服務(wù)。

(8)此時(shí)Master使用自己的Schedule方法對(duì)正在等待的Application和Drivers進(jìn)行資源調(diào)度。

Master HA的4大方式分別是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。

需要說(shuō)明的是:

(a)ZOOKEEPER是自動(dòng)管理Master。

(b)FILESYSTEM的方式在Master出現(xiàn)故障后需要手動(dòng)重新啟動(dòng)機(jī)器,機(jī)器啟動(dòng)后會(huì)立即成為Active級(jí)別的Master來(lái)對(duì)外提供服務(wù)(接收應(yīng)用程序提交的請(qǐng)求、接收新的Job運(yùn)行的請(qǐng)求)。

(c)CUSTOM的方式允許用戶自定義Master HA的實(shí)現(xiàn),這對(duì)高級(jí)用戶特別有用。

(d)NONE,這是默認(rèn)情況,Spark集群中就采用這種方式,該方式不會(huì)持久化集群的數(shù)據(jù),Master啟動(dòng)后立即管理集群。

Master.scala的HA的源碼如下。

1.    override def onStart(): Unit = {
2.  ......
3.  val serializer = new JavaSerializer(conf)
4.      val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
5.        case "ZOOKEEPER" =>
6.          logInfo("Persisting recovery state to ZooKeeper")
7.          val zkFactory =
8.            new ZooKeeperRecoveryModeFactory(conf, serializer)
9.          (zkFactory.createPersistenceEngine(), zkFactory.createLeader-
             ElectionAgent(this))
10.       case "FILESYSTEM" =>
11.         val fsFactory =
12.           new FileSystemRecoveryModeFactory(conf, serializer)
13.         (fsFactory.createPersistenceEngine(), fsFactory.
            createLeaderElectionAgent(this))
14.       case "CUSTOM" =>
15.         val clazz = Utils.classForName(conf.get("spark.deploy.
            recoveryMode.factory"))
16.         val factory = clazz.getConstructor(classOf[SparkConf], classOf
            [Serializer])
17.           .newInstance(conf, serializer)
18.           .asInstanceOf[StandaloneRecoveryModeFactory]
19.         (factory.createPersistenceEngine(), factory.createLeaderElectionAgent
            (this))
20.       case _ =>
21.         (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
22.     }
23.     persistenceEngine = persistenceEngine_
24.     leaderElectionAgent = leaderElectionAgent_
25.   }
26. ......

Spark默認(rèn)的HA方式是NONE。

1.  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

如使用ZOOKEEPER的HA方式,ZooKeeperRecoveryModeFactory.scala的源碼如下。

1.    private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf,
      serializer: Serializer)
2.    extends StandaloneRecoveryModeFactory(conf, serializer) {
3.
4.    def createPersistenceEngine(): PersistenceEngine = {
5.      new ZooKeeperPersistenceEngine(conf, serializer)
6.    }
7.
8.    def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
9.      new ZooKeeperLeaderElectionAgent(master, conf)
10.   }
11. }

通過(guò)調(diào)用zkFactory.createPersistenceEngine()用new()函數(shù)創(chuàng)建一個(gè)ZooKeeper-PersistenceEngine。

ZooKeeperPersistenceEngine.scala的源碼如下。

1.   private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val
     serializer: Serializer)
2.  extends PersistenceEngine
3.  with Logging {
4.
5.  private    val   WORKING_DIR     =   conf.get("spark.deploy.zookeeper.dir",
    "/spark") + "/master_status"
6.    private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
7.
8.    SparkCuratorUtil.mkdir(zk, WORKING_DIR)
9.
10.
11.   override def persist(name: String, obj: Object): Unit = {
12.     serializeIntoFile(WORKING_DIR + "/" + name, obj)
13.   }
14.
15.   override def unpersist(name: String): Unit = {
16.     zk.delete().forPath(WORKING_DIR + "/" + name)
17.   }
18.
19.   override def read[T: ClassTag](prefix: String): Seq[T] = {
20.     zk.getChildren.forPath(WORKING_DIR).asScala
21.       .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
22.   }
23.
24.   override def close() {
25.     zk.close()
26.   }
27.
28.   private def serializeIntoFile(path: String, value: AnyRef) {
29.     val serialized = serializer.newInstance().serialize(value)
30.     val bytes = new Array[Byte](serialized.remaining())
31.     serialized.get(bytes)
32.     zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
33.   }
34.
35.   private    def   deserializeFromFile[T](filename:          String)(implicit  m:
      ClassTag[T]): Option[T] = {
36.     val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
37.     try {
38.       Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap
          (fileData)))
39.     } catch {
40.       case e: Exception =>
41.         logWarning("Exception while reading persisted file, deleting", e)
42.         zk.delete().forPath(WORKING_DIR + "/" + filename)
43.         None
44.     }
45.   }
46. }

PersistenceEngine中有至關(guān)重要的方法persist來(lái)實(shí)現(xiàn)數(shù)據(jù)持久化,readPersistedData來(lái)恢復(fù)集群中的元數(shù)據(jù)。

PersistenceEngine.scala的源碼如下。

1.   def persist(name: String, obj: Object): Unit
2.  ......
3.    final def readPersistedData(
4.        rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq
          [WorkerInfo]) = {
5.      rpcEnv.deserialize { () =>
6.        (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read
          [WorkerInfo]("worker_"))
7.      }
8.    }

下面來(lái)看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中調(diào)用new()函數(shù)創(chuàng)建ZooKeeperLeaderElectionAgent實(shí)例。

StandaloneRecoveryModeFactory.scala的源碼如下。

1.    def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
2.      new ZooKeeperLeaderElectionAgent(master, conf)
3.    }
4.  }

ZooKeeperLeaderElectionAgent的源碼如下。

1.   private[master] class ZooKeeperLeaderElectionAgent(val masterInstance:
     LeaderElectable,
2.     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent
       with Logging  {
3.
4.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") +
     "/leader_election"
5.
6.   private var zk: CuratorFramework = _
7.   private var leaderLatch: LeaderLatch = _
8.   private var status = LeadershipStatus.NOT_LEADER
9.
10.  start()
11.
12.  private def start() {
13.    logInfo("Starting ZooKeeper LeaderElection agent")
14.    zk = SparkCuratorUtil.newClient(conf)
15.    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
16.    leaderLatch.addListener(this)
17.    leaderLatch.start()
18.  }
19.
20.  override def stop() {
21.    leaderLatch.close()
22.    zk.close()
23.  }
24.
25.  override def isLeader() {
26.    synchronized {
27.      //可以取得領(lǐng)導(dǎo)權(quán)
28.      if (!leaderLatch.hasLeadership) {
29.        return
30.      }
31.
32.      logInfo("We have gained leadership")
33.      updateLeadershipStatus(true)
34.    }
35.  }
36.
37.  override def notLeader() {
38.    synchronized {
39.      //可以取得領(lǐng)導(dǎo)權(quán)
40.      if (leaderLatch.hasLeadership) {
41.        return
42.      }
43.
44.       logInfo("We have lost leadership")
45.       updateLeadershipStatus(false)
46.     }
47.   }
48.
49.   private def updateLeadershipStatus(isLeader: Boolean) {
50.     if (isLeader && status == LeadershipStatus.NOT_LEADER) {
51.       status = LeadershipStatus.LEADER
52.       masterInstance.electedLeader()
53.     } else if (!isLeader && status == LeadershipStatus.LEADER) {
54.       status = LeadershipStatus.NOT_LEADER
55.       masterInstance.revokedLeadership()
56.     }
57.   }
58.
59.   private object LeadershipStatus extends Enumeration {
60.     type LeadershipStatus = Value
61.     val LEADER, NOT_LEADER = Value
62.   }
63. }

FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式來(lái)完成Leader的選舉,其實(shí)現(xiàn)是直接把傳入的Master作為L(zhǎng)eader。

LeaderElectionAgent.scala的源碼如下。

1.   private[spark] class MonarchyLeaderAgent(val masterInstance:
     LeaderElectable)
2.    extends LeaderElectionAgent {
3.    masterInstance.electedLeader()
4.  }

FileSystemRecoveryModeFactory.scala的源碼如下。

1.  private[master] class FileSystemRecoveryModeFactory(conf: SparkConf,
    serializer: Serializer)
2.    extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
3.
4.    val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
5.
6.    def createPersistenceEngine(): PersistenceEngine = {
7.      logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
8.      new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
9.    }
10.
11.   def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
12.     new MonarchyLeaderAgent(master)
13.   }
14. }

如果WorkerState狀態(tài)為UNKNOWN(Worker不響應(yīng)),就把它刪除,如果以集群方式運(yùn)行,driver失敗后可以重新啟動(dòng),最后把狀態(tài)變回ALIVE,注意,這里要加入--supervise這個(gè)參數(shù)。

Master.scala的源碼如下。

1.  private def completeRecovery() {
2.     //使用短同步周期確?!皁nly-once”恢復(fù)一次語(yǔ)義
3.   if (state != RecoveryState.RECOVERING) { return }
4.   state = RecoveryState.COMPLETING_RECOVERY
5.
6.   //殺掉不響應(yīng)消息的workers 和apps
7.   workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
8.   apps.filter(_.state == ApplicationState.UNKNOWN).foreach
     (finishApplication)
9.
10.  //重新調(diào)度drivers,其未被任何workers聲明
11.  drivers.filter(_.worker.isEmpty).foreach { d =>
12.    logWarning(s"Driver ${d.id} was not found after master recovery")
13.    if (d.desc.supervise) {
14.      logWarning(s"Re-launching ${d.id}")
15.      relaunchDriver(d)
16.    } else {
17.      removeDriver(d.id, DriverState.ERROR, None)
18.      logWarning(s"Did not re-launch ${d.id} because it was not supervised")
19.    }
20.  }

5.1.4 Master的注冊(cè)機(jī)制和狀態(tài)管理解密

1.Master對(duì)其他組件注冊(cè)的處理

Master接收注冊(cè)的對(duì)象主要是Driver、Application、Worker; Executor不會(huì)注冊(cè)給Master,Executor是注冊(cè)給Driver中的SchedulerBackend的。

Worker是在啟動(dòng)后主動(dòng)向Master注冊(cè)的,所以如果在生產(chǎn)環(huán)境下加入新的Worker到正在運(yùn)行的Spark集群上,此時(shí)不需要重新啟動(dòng)Spark集群就能夠使用新加入的Worker,以提升處理能力。假如在生產(chǎn)環(huán)境中的集群中有500臺(tái)機(jī)器,可能又新加入100臺(tái)機(jī)器,這時(shí)不需要重新啟動(dòng)整個(gè)集群,就可以將100臺(tái)新機(jī)器加入到集群。

Worker的源碼如下。

1.   private[deploy] class Worker(
2.      override val rpcEnv: RpcEnv,
3.      webUiPort: Int,
4.      cores: Int,
5.      memory: Int,
6.      masterRpcAddresses: Array[RpcAddress],
7.      endpointName: String,
8.      workDirPath: String = null,
9.      val conf: SparkConf,
10.     val securityMgr: SecurityManager)
11.   extends ThreadSafeRpcEndpoint with Logging {

Worker是一個(gè)消息循環(huán)體,繼承自ThreadSafeRpcEndpoint,可以收消息,也可以發(fā)消息。Worker的onStart方法如下。

1.    override def onStart() {
2.  ......
3.    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
4.    registerWithMaster()
5.    ......
6.  }

Worker的onStart方法中調(diào)用了registerWithMaster()。

1.  private def registerWithMaster() {
2.  .......
3.  registrationRetryTimer match {
4.    case None =>
5.      registered = false
6.      registerMasterFutures = tryRegisterAllMasters()
7.   ......

registerWithMaster方法中調(diào)用了tryRegisterAllMasters,向所有的Master進(jìn)行注冊(cè)。

Spark 2.1.1版本的Worker.scala的源碼如下。

1.          private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.     masterRpcAddresses.map { masterAddress =>
3.       registerMasterThreadPool.submit(new Runnable {
4.         override def run(): Unit = {
5.           try {
6.             logInfo("Connecting to master " + masterAddress + "...")
7.             val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
               Master.ENDPOINT_NAME)
8.             registerWithMaster(masterEndpoint)
9.           } catch {
10.            case ie: InterruptedException => //Cancelled
11.            case NonFatal(e) => logWarning(s"Failed to connect to master
               $masterAddress", e)
12.          }
13.        }
14.      })
15.    }
16.  }

Spark 2.2.0版本的Worker.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行registerWithMaster(masterEndpoint)方法調(diào)整為sendRegisterMessageToMaster(masterEndpoint)。

1.  ......
2.          sendRegisterMessageToMaster(masterEndpoint)
3.  ......

tryRegisterAllMasters方法中,由于實(shí)際運(yùn)行時(shí)有很多Master,因此使用線程池的線程進(jìn)行提交,然后獲取masterEndpoint。masterEndpoint是一個(gè)RpcEndpointRef,通過(guò)registerWithMaster (masterEndpoint)進(jìn)行注冊(cè)。

Spark 2.1.1版本的Worker.scala的registerWithMaster的源碼如下。

1.   private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
2.     masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
3.       workerId, host, port, self, cores, memory, workerWebUiUrl))
4.       .onComplete {
5.         //這是一個(gè)非常快的動(dòng)作,所以可以用"ThreadUtils.sameThread"
6.         case Success(msg) =>
7.           Utils.tryLogNonFatalError {
8.             handleRegisterResponse(msg)
9.           }
10.        case Failure(e) =>
11.          logError(s"Cannot register with master: ${masterEndpoint
             .address}", e)
12.          System.exit(1)
13.     }(ThreadUtils.sameThread)
14.  }

Spark 2.1.1版本的registerWithMaster(masterEndpoint)方法調(diào)整為Spark 2.2.0版本的sendRegisterMessageToMaster(masterEndpoint)。sendRegisterMessageToMaster方法僅將RegisterWorker消息發(fā)送給Master消息循環(huán)體。sendRegisterMessageToMaster方法內(nèi)部不作其他處理。

1.   private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef):
     Unit = {
2.      masterEndpoint.send(RegisterWorker(
3.        workerId,
4.        host,
5.        port,
6.        self,
7.        cores,
8.        memory,
9.        workerWebUiUrl,
10.       masterEndpoint.address))
11.   }

registerWithMaster方法中的masterEndpoint.ask[RegisterWorkerResponse]傳進(jìn)去的是RegisterWorker。RegisterWorker是一個(gè)case class,包括id、host、port、worker、cores、memory等信息,這里Worker是自己的引用RpcEndpointRef,Master通過(guò)Ref通worker通信。

Spark 2.1.1版本的RegisterWorker.scala的源碼如下。

1.   case class RegisterWorker(
2.       id: String,
3.       host: String,
4.       port: Int,
5.       worker: RpcEndpointRef,
6.       cores: Int,
7.       memory: Int,
8.       workerWebUiUrl: String)
9.     extends DeployMessage {
10.    Utils.checkHost(host, "Required hostname")
11.    assert (port > 0)
12.  }

Spark 2.2.0版本的RegisterWorker.scala的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行之后新增加masterAddress的成員變量:Master的地址,用于Worker節(jié)點(diǎn)連接Master節(jié)點(diǎn)。

1.  ......
2.     masterAddress: RpcAddress)
3.    ......

Worker通過(guò)registerWithMaster向Master發(fā)送了RegisterWorker消息,Master收到RegisterWorker請(qǐng)求后,進(jìn)行相應(yīng)的處理。

Spark 2.1.1版本的Master.scala的receiveAndReply的源碼如下。

1.  override def receiveAndReply(context: RpcCallContext):
    PartialFunction[Any, Unit] = {
2.  case RegisterWorker(
3.      id, workerHost, workerPort, workerRef, cores, memory,
       workerWebUiUrl) =>
4.   logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
5.     workerHost, workerPort, cores, Utils.megabytesToString(memory)))
6.   if (state == RecoveryState.STANDBY) {
7.     context.reply(MasterInStandby)
8.   } else if (idToWorker.contains(id)) {
9.     context.reply(RegisterWorkerFailed("Duplicate worker ID"))
10.  } else {
11.    val worker = new WorkerInfo(id, workerHost, workerPort, cores,
       memory,
12.      workerRef, workerWebUiUrl)
13.    if (registerWorker(worker)) {
14.      persistenceEngine.addWorker(worker)
15.      context.reply(RegisteredWorker(self, masterWebUiUrl))
16.      schedule()
17.    } else {
18.      val workerAddress = worker.endpoint.address
19.      logWarning("Worker registration failed. Attempted to re-register
         worker at same " +
20.        "address: " + workerAddress)
21.      context.reply(RegisterWorkerFailed("Attempted to              re-register
         worker at same address: "
22.        + workerAddress))
23.    }
24.  }

Spark 2.2.0版本的Master.scala的receiveAndReply的源碼與Spark 2.1.1版本相比具有如下特點(diǎn)。

 上段代碼中第1行,RegisterWorker消息的模式匹配從receiveAndReply方法中調(diào)整到receive方法。因?yàn)閃orker向Master提交RegisterWorker消息,無(wú)須同步等待Master的答復(fù)。

 上段代碼中第3行RegisterWorker的傳入?yún)?shù)新增加了masterAddress。

 上段代碼中第9行context.reply方法調(diào)整為workerRef.send方法。

 上段代碼中第15行context.reply方法調(diào)整為workerRef.send方法,以及RegisteredWorker中新增了一個(gè)參數(shù)masterAddress。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.   case RegisterWorker(
4.       id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
         masterAddress)
5.  ......
6.        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
7.  ......
8.        workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
9.   ......

RegisterWorker中,Master接收到Worker的注冊(cè)請(qǐng)求后,首先判斷當(dāng)前的Master是否是Standby的模式,如果是,就不處理;Master的idToWorker包含了所有已經(jīng)注冊(cè)的Worker的信息,然后會(huì)判斷當(dāng)前Master的內(nèi)存數(shù)據(jù)結(jié)構(gòu)idToWorker中是否已經(jīng)有該Worker的注冊(cè),如果有,此時(shí)不會(huì)重復(fù)注冊(cè);其中idToWorker是一個(gè)HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。

1.  private val idToWorker = new HashMap[String, WorkerInfo]

WorkerInfo包括id、host、port 、cores、memory、endpoint等內(nèi)容。

1.    private[spark] class WorkerInfo(
2.    val id: String,
3.    val host: String,
4.    val port: Int,
5.    val cores: Int,
6.    val memory: Int,
7.    val endpoint: RpcEndpointRef,
8.    val webUiAddress: String)
9.  extends Serializable {

Master如果決定接收注冊(cè)的Worker,首先會(huì)創(chuàng)建WorkerInfo對(duì)象來(lái)保存注冊(cè)的Worker的信息,然后調(diào)用registerWorker執(zhí)行具體的注冊(cè)的過(guò)程,如果Worker的狀態(tài)是DEAD的狀態(tài),則直接過(guò)濾掉。對(duì)于UNKNOWN的內(nèi)容,調(diào)用removeWorker進(jìn)行清理(包括清理該Worker下的Executors和Drivers)。其中,UNKNOWN的情況:Master進(jìn)行切換時(shí),先對(duì)Worker發(fā)UNKNOWN消息,只有當(dāng)Master收到Worker正確的回復(fù)消息,才將狀態(tài)標(biāo)識(shí)為正常。

registerWorker的源碼如下。

1.      private def registerWorker(worker: WorkerInfo): Boolean = {
2.   //在同一節(jié)點(diǎn)上可能有一個(gè)或多個(gè)指向掛掉的workers節(jié)點(diǎn)的引用(不同ID),須刪除它們
3.     workers.filter { w =>
4.       (w.host == worker.host && w.port == worker.port) && (w.state ==
         WorkerState.DEAD)
5.     }.foreach { w =>
6.       workers -= w
7.     }
8.
9.     val workerAddress = worker.endpoint.address
10.    if (addressToWorker.contains(workerAddress)) {
11.      val oldWorker = addressToWorker(workerAddress)
12.      if (oldWorker.state == WorkerState.UNKNOWN) {
13.        //未知狀態(tài)的worker 意味著在恢復(fù)過(guò)程中worker 被重新啟動(dòng)。舊的worker節(jié)點(diǎn)
           //掛掉,須刪掉舊節(jié)點(diǎn),接收新worker節(jié)點(diǎn)
14.        removeWorker(oldWorker)
15.      } else {
16.        logInfo("Attempted to re-register worker at same address: " +
           workerAddress)
17.        return false
18.      }
19.    }
20.
21.    workers += worker
22.    idToWorker(worker.id) = worker
23.    addressToWorker(workerAddress) = worker
24.    if (reverseProxy) {
25.       webUi.addProxyTargets(worker.id, worker.webUiAddress)
26.    }
27.    true
28.  }

在registerWorker方法中,Worker注冊(cè)完成后,把注冊(cè)的Worker加入到Master的內(nèi)存數(shù)據(jù)結(jié)構(gòu)中。

1.  val workers = new HashSet[WorkerInfo]
2.  private val idToWorker = new HashMap[String, WorkerInfo]
3.    private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
4.  ......
5.
6.    workers += worker
7.      idToWorker(worker.id) = worker
8.      addressToWorker(workerAddress) = worker

回到Master.scala的receiveAndReply方法,Worker注冊(cè)完成后,調(diào)用persistenceEngine.addWorker (worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注冊(cè)的數(shù)據(jù)進(jìn)行持久化。

PersistenceEngine.scala的addWorker方法如下。

1.      final def addWorker(worker: WorkerInfo): Unit = {
2.    persist("worker_" + worker.id, worker)
3.  }

ZooKeeperPersistenceEngine是PersistenceEngine的一個(gè)具體實(shí)現(xiàn)子類,其persist方法如下。

1.     private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val
       serializer: Serializer)
2.    extends PersistenceEngine
3.  ......
4.    override def persist(name: String, obj: Object): Unit = {
5.      serializeIntoFile(WORKING_DIR + "/" + name, obj)
6.    }
7.  ......
8.  private def serializeIntoFile(path: String, value: AnyRef) {
9.      val serialized = serializer.newInstance().serialize(value)
10.     val bytes = new Array[Byte](serialized.remaining())
11.     serialized.get(bytes)
12.     zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
13.   }

回到Master.scala的receiveAndReply方法,注冊(cè)的Worker數(shù)據(jù)持久化后,進(jìn)行schedule()。至此,Worker的注冊(cè)完成。

同樣,Driver的注冊(cè)過(guò)程:Driver提交給Master進(jìn)行注冊(cè),Master會(huì)將Driver的信息放入內(nèi)存緩存中,加入等待調(diào)度的隊(duì)列,通過(guò)持久化引擎(如ZooKeeper)把注冊(cè)信息持久化,然后進(jìn)行Schedule。

Application的注冊(cè)過(guò)程:Application提交給Master進(jìn)行注冊(cè),Driver啟動(dòng)后會(huì)執(zhí)行SparkContext的初始化,進(jìn)而導(dǎo)致StandaloneSchedulerBackend的產(chǎn)生,其內(nèi)部有StandaloneAppClient。StandaloneAppClient內(nèi)部有ClientEndpoint。ClientEndpoint來(lái)發(fā)送RegisterApplication信息給Master。Master會(huì)將Application的信息放入內(nèi)存緩存中,把Application加入等待調(diào)度的Application隊(duì)列,通過(guò)持久化引擎(如ZooKeeper)把注冊(cè)信息持久化,然后進(jìn)行Schedule。

2.Master對(duì)Driver和Executor狀態(tài)變化的處理

1)對(duì)Driver狀態(tài)變化的處理

如果Driver的各個(gè)狀態(tài)是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,就將其清理掉。其他情況則報(bào)異常。

1.     override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.   case DriverStateChanged(driverId, state, exception) =>
4.        state match {
5.          case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED
            | DriverState.FAILED =>
6.            removeDriver(driverId, state, exception)
7.          case _ =>
8.            throw new Exception(s"Received unexpected state update for driver
              $driverId: $state")
9.        }

removeDriver清理掉Driver后,再次調(diào)用schedule方法,removeDriver的源碼如下。

1.       private def removeDriver(
2.        driverId: String,
3.        finalState: DriverState,
4.        exception: Option[Exception]) {
5.      drivers.find(d => d.id == driverId) match {
6.        case Some(driver) =>
7.          logInfo(s"Removing driver: $driverId")
8.          drivers -= driver
9.          if (completedDrivers.size >= RETAINED_DRIVERS) {
10.           val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
11.           completedDrivers.trimStart(toRemove)
12.         }
13.         completedDrivers += driver
14.         persistenceEngine.removeDriver(driver)
15.         driver.state = finalState
16.         driver.exception = exception
17.         driver.worker.foreach(w => w.removeDriver(driver))
18.         schedule()
19.       case None =>
20.         logWarning(s"Asked to remove unknown driver: $driverId")
21.     }
22.   }
23. }

2)對(duì)Executor狀態(tài)變化的處理

ExecutorStateChanged的源碼如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.  case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
4.        val execOption = idToApp.get(appId).flatMap(app => app.executors.
          get(execId))
5.        execOption match {
6.          case Some(exec) =>
7.            val appInfo = idToApp(appId)
8.            val oldState = exec.state
9.            exec.state = state
10.
11.           if (state == ExecutorState.RUNNING) {
12.             assert(oldState == ExecutorState.LAUNCHING,
13.               s"executor $execId state transfer from $oldState to RUNNING
                  is illegal")
14.             appInfo.resetRetryCount()
15.      }
16.
17.      exec.application.driver.send(ExecutorUpdated(execId,                state,
         message, exitStatus, false))
18.
19.      if (ExecutorState.isFinished(state)) {
20.        //從worker 和app中刪掉executor
21.        logInfo(s"Removing executor ${exec.fullId} because it is
           $state")
22.        //如果應(yīng)用程序已經(jīng)完成,保存其狀態(tài)及在UI上正確顯示其信息
23.        if (!appInfo.isFinished) {
24.          appInfo.removeExecutor(exec)
25.        }
26.        exec.worker.removeExecutor(exec)
27.
28.        val normalExit = exitStatus == Some(0)
29.        //只重試一定次數(shù),這樣就不會(huì)進(jìn)入無(wú)限循環(huán)。重要提示:這個(gè)代碼路徑不是通過(guò)
           //測(cè)試執(zhí)行的,所以改變if條件時(shí)必須小心
30.        if (!normalExit
31.            && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
32.            && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this
               application-killing path
33.          val execs = appInfo.executors.values
34.          if (!execs.exists(_.state == ExecutorState.RUNNING)) {
35.            logError(s"Application ${appInfo.desc.name} with ID
               ${appInfo.id} failed " +
36.              s"${appInfo.retryCount} times; removing it")
37.            removeApplication(appInfo, ApplicationState.FAILED)
38.          }
39.        }
40.      }
41.      schedule()
42.    case None =>
43.      logWarning(s"Got status update for unknown executor $appId/
         $execId")
44.  }

Executor掛掉時(shí)系統(tǒng)會(huì)嘗試一定次數(shù)的重啟(最多重啟10次)。

1.  private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.
    maxExecutorRetries", 10)
主站蜘蛛池模板: 柳林县| 仙游县| 农安县| 积石山| 南雄市| 扬州市| 渝北区| 娄烦县| 宾阳县| 望江县| 平和县| 商河县| 湛江市| 永福县| 牟定县| 彰化县| 望奎县| 泉州市| 新田县| 广安市| 伊川县| 桂阳县| 五常市| 湘西| 六枝特区| 子长县| 来安县| 东乡族自治县| 昌吉市| 团风县| 龙游县| 龙川县| 长治市| 游戏| 邳州市| 民县| 沾化县| 云阳县| 德钦县| 额尔古纳市| 怀宁县|