官术网_书友最值得收藏!

6.1 Spark Application到底是如何提交給集群的

本節(jié)講解Application提交參數(shù)配置、Application提交給集群原理、Application提交給集群源碼等內(nèi)容,將徹底解密Spark Application到底是如何提交給集群的。

6.1.1 Application提交參數(shù)配置詳解

用戶應(yīng)用程序可以使用bin/spark-submit腳本來啟動(dòng)。spark-submit腳本負(fù)責(zé)使用Spark及其依賴關(guān)系設(shè)置類路徑,并可支持Spark支持的不同群集管理器和部署模式。

bin/spark-submit腳本示例如下。

1.   ./bin/spark-submit \
2.  --class <main-class> \
3.  --master <master-url> \
4.  --deploy-mode <deploy-mode> \
5.  --conf <key>=<value> \
6.  ... # other options
7.  <application-jar> \
8.  [application-arguments]

spark-submit腳本提交參數(shù)配置中一些常用的選項(xiàng)。

--class:應(yīng)用程序的入口點(diǎn)(如org.apache.spark.examples.SparkPi)。

--master:集群的主URL(如spark://23.195.26.187:7077)。

--deploy-mode:將Driver程序部署在集群Worker節(jié)點(diǎn)(cluster);或作為外部客戶端(client)部署在本地(默認(rèn)值:client)。

--conf:任意Spark配置屬性,使用key = value格式。對于包含空格的值,用引號括起來,如“key = value”。

application-jar:包含應(yīng)用程序和所有依賴關(guān)系Jar包的路徑。該URL必須在集群內(nèi)全局可見。例如,所有節(jié)點(diǎn)上存在的hdfs://路徑或file://路徑。

application-arguments:傳遞給主類的main方法的參數(shù)。

6.1.2 Application提交給集群原理詳解

在Spark官網(wǎng)部署頁面(http://spark.apache.org/docs/latest/cluster-overview.html),可以看到當(dāng)前集群支持以下3種集群管理器(cluster manager)。

(1)Standalone:Spark原生的簡單集群管理器。使用Standalone可以很方便地搭建一個(gè)集群。

(2)Apache Mesos:一個(gè)通用的集群管理器,可以在上面運(yùn)行HadoopMapReduce和一些服務(wù)型的應(yīng)用。

(3)Hadoop YARN:在Hadoop 2中提供的資源管理器。

另外,Spark提供的EC2啟動(dòng)腳本,可以很方便地在Amazon EC2上啟動(dòng)一個(gè)Standalone集群。

實(shí)際上,除了上面這些通用的集群管理器外,Spark內(nèi)部也提供一些方便我們測試、學(xué)習(xí)的簡單集群部署模式。為了更全面地理解,我們會(huì)從Spark應(yīng)用程序部署點(diǎn)切入,也就是從提交一個(gè)Spark應(yīng)用程序開始,引出并詳細(xì)解析各種部署模式。

說明:下面涉及類的描述時(shí),如果可以通過類名唯一確定一個(gè)類,將直接給出類名,如果不能,會(huì)先給出全路徑的類名,然后在不出現(xiàn)歧義的地方再簡寫為類名。

為了簡化應(yīng)用程序提交的復(fù)雜性,Spark提供了各種應(yīng)用程序提交的統(tǒng)一入口,即spark-submit腳本,應(yīng)用程序的提交都間接或直接地調(diào)用了該腳本。下面簡單分析幾個(gè)腳本,包含./bin/spark-shell、./bin/pyspark、./bin/sparkR、./bin/spark-sql、./bin/run-example、./bin/speak-submit,以及所有腳本最終都調(diào)用到的一個(gè)執(zhí)行Java類的腳本./bin/spark-class。

1.腳本./bin/spark-shell

通過該腳本可以打開使用Scala語言進(jìn)行開發(fā)、調(diào)試的交互式界面,腳本的代碼如下所示。

1.  ......
2.  function main() {
3.  ......
4.  "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main
    --name "Spark shell" "$@"
5.  sttyicanon echo > /dev/null 2>&1
6.  else
7.  export SPARK_SUBMIT_OPTS
8.  "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main
    --name "Spark shell" "$@"
9.  fi
10. }
11. ......

對應(yīng)在第4行和第8行處,調(diào)用了應(yīng)用程序提交腳本./bin/spark-submit。腳本./bin/spark-shell的基本用法如下所示:

1.  "Usage: ./bin/spark-shell [options]"

其他腳本類似。下面分別針對各個(gè)腳本的用法(具體用法可查看腳本的幫助信息,如通過--help選項(xiàng)來獲取)與關(guān)鍵執(zhí)行語句等進(jìn)行簡單解析。了解工具(如腳本)如何使用,最根本的是先查看其幫助信息,然后在此基礎(chǔ)上進(jìn)行擴(kuò)展。

2.腳本./bin/pyspark

通過該腳本可以打開使用Python語言開發(fā)、調(diào)試的交互式界面。

(1)該腳本的用法如下。

1.  "Usage: ./bin/pyspark [options]"

(2)該腳本的執(zhí)行語句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name
    "PySparkShell" "$@"
3.腳本./bin/sparkR

通過該腳本可以打開使用sparkR開發(fā)、調(diào)試的交互式界面。

(1)該腳本的用法如下。

1.  "Usage: ./bin/sparkR [options]"

(2)該腳本的執(zhí)行語句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit sparkr-shell-main "$@"
4.腳本./bin/spark-sql

通過該腳本可以打開使用SparkSql開發(fā)、調(diào)試的交互式界面。

(1)該腳本的用法如下。

1.  "Usage: ./bin/spark-sql [options] [cli option]"

(2)該腳本的執(zhí)行語句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive.
    thriftserver.SparkSQLCLIDriver "$@"
5.腳本./bin/run-example

可以通過該腳本運(yùn)行Spark自帶的案例代碼。該腳本中會(huì)自動(dòng)補(bǔ)全案例類的路徑。

(1)該腳本的用法如下。

1.  echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
2.  echo "  - set MASTER=XX to use a specific master" 1>&2
3.  echo "  - can use abbreviated example class name relative to com.apache.
    spark.examples" 1>&2
4.  echo " (e.g. SparkPi, mllib.LinearRegression, streaming.
    KinesisWordCountASL)" 1>&2

(2)該腳本的執(zhí)行語句如下。

1.  exec "${SPARK_HOME}"/bin/spark-submit \
2.    --master $EXAMPLE_MASTER \
3.    --class $EXAMPLE_CLASS \
4.    "$SPARK_EXAMPLES_JAR" \
5.    "$@"
6.腳本./bin/spark-submit

./bin/spark-submit是提交Spark應(yīng)用程序最常用的一個(gè)腳本。從前面各個(gè)腳本的解析可以看出,各個(gè)腳本最終都調(diào)用了./bin/spark-submit腳本。

(1)該腳本的用法如下。

該腳本的用法需要從源碼中獲取,具體源碼位置參考SparkSubmitArguments類的方法printUsageAndExit,代碼如下。

1.  val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
2.  """Usage: spark-submit [options] <app jar | python file> [app arguments]
3.      |Usage: spark-submit --kill [submission ID] --master [spark://...]
4.      |Usage: spark-submit --status [submission ID] --master [spark:
        //...]""".stripMargin)

(2)該腳本的執(zhí)行語句如下。

1.  exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.
    SparkSubmit "$@"
7.腳本./bin/spark-class

該腳本是所有其他腳本最終都調(diào)用到的一個(gè)執(zhí)行Java類的腳本。其中關(guān)鍵的執(zhí)行語句如下所示。

1.  CMD=()
2.  while IFS= read -d '' -r ARG; do
3.    CMD+=("$ARG")
4.  done <<("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
    "$@")
5.  exec "${CMD[@]}"

其中,負(fù)責(zé)運(yùn)行的RUNNER變量設(shè)置如下。

1.  # Find the java binary
2.  //將RUNNER設(shè)置為Java
3.  if [ -n "${JAVA_HOME}" ]; then
4.    RUNNER="${JAVA_HOME}/bin/java"
5.  else
6.    if [ `command -v java` ]; then
7.      RUNNER="java"
8.    else
9.      echo "JAVA_HOME is not set" >&2
10.     exit 1
11.   fi
12. fi

在腳本中,LAUNCH_CLASSPATH變量對應(yīng)Java命令運(yùn)行時(shí)所需的classpath信息。最終Java命令啟動(dòng)的類是org.apache.spark.launcher.Main。Main類的入口函數(shù)main,會(huì)根據(jù)輸入?yún)?shù)構(gòu)建出最終執(zhí)行的命令,即這里返回的${CMD[@]}信息,然后通過exec執(zhí)行。

6.1.3 Application提交給集群源碼詳解

本節(jié)從應(yīng)用部署的角度解析相關(guān)的源碼,主要包括腳本提交時(shí)對應(yīng)JVM進(jìn)程啟動(dòng)的主類org.apache.spark.launcher.Main、定義應(yīng)用程序提交的行為類型的類org.apache.spark.deploy. SparkSubmitAction、應(yīng)用程序封裝底層集群管理器和部署模式的類org.apache.spark.deploy. SparkSubmit,以及代表一個(gè)應(yīng)用程序的驅(qū)動(dòng)程序的類org.apache.spark.SparkContext。

1.Main解析

從前面的腳本分析,得出最終都是通過org.apache.spark.launcher.Main類(下面簡稱Main類)啟動(dòng)應(yīng)用程序的。因此,首先解析一下Main類。

在Main類的源碼中,類的注釋如下。

1.  /**
2.    * Spark 啟動(dòng)器的命令行接口。在Spark腳本內(nèi)部使用
3.    *
4.    */

對應(yīng)地,在Main對象的入口方法main的注釋如下。

Main.java源碼如下。

1.
2.   /**
3.     * Usage: Main [class] [class args]
4.     * <p>
5.     * 命令行界面工作在兩種模式下:
6.     * <ul>
7.     *   <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.
       *   SparkSubmit", the {@link SparkLauncher} class is used to launch a
       *   Spark application. </li>
8.     *   <li>"spark-class": if another class is provided, an internal Spark
       *   class is run.</li>
9.     * </ul>
10.  .......
11. public static void main(String[] argsArray) throws Exception {
12. ......

Main類主要有兩種工作模式,分別描述如下。

(1)spark-submit

啟動(dòng)器要啟動(dòng)的類為org.apache.spark.deploy.SparkSubmit時(shí),對應(yīng)為spark-submit工作模式。此時(shí),使用SparkSubmitCommandBuilder類來構(gòu)建啟動(dòng)命令。

(2)spark-class

啟動(dòng)器要啟動(dòng)的類是除SparkSubmit之外的其他類時(shí),對應(yīng)為spark-class工作模式。此時(shí)使用SparkClassCommandBuilder類的buildCommand方法來構(gòu)建啟動(dòng)命令。

Main.java的源碼如下。

1.  public static void main(String[] argsArray) throws Exception {
2.  .......
3.  String className = args.remove(0);
4.  .......
5.      if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
6.        try {
7.          builder = new SparkSubmitCommandBuilder(args);
8.  .......
9.   } else {
10.       builder = new SparkClassCommandBuilder(className, args);
11.     }
12. ......

以spark-submit工作模式為例,對應(yīng)的在構(gòu)建啟動(dòng)命令的SparkSubmitCommandBuilder類中,上述調(diào)用的SparkClassCommandBuilder構(gòu)造函數(shù)定義如下。

SparkSubmitCommandBuilder.java的源碼如下。

1.     SparkSubmitCommandBuilder(List<String> args) {
2.      this.allowsMixedArguments = false;
3.      this.sparkArgs = new ArrayList<>();
4.      boolean isExample = false;
5.      List<String> submitArgs = args;
6.   //根據(jù)輸入的第一個(gè)參數(shù)設(shè)置,包括主資源appResource等
7.      if (args.size() > 0) {
8.        switch (args.get(0)) {
9.          case PYSPARK_SHELL:
10.           this.allowsMixedArguments = true;
11.           appResource = PYSPARK_SHELL;
12.           submitArgs = args.subList(1, args.size());
13.           break;
14.
15.         case SPARKR_SHELL:
16.           this.allowsMixedArguments = true;
17.           appResource = SPARKR_SHELL;
18.           submitArgs = args.subList(1, args.size());
19.           break;
20.
21.         case RUN_EXAMPLE:
22.           isExample = true;
23.           submitArgs = args.subList(1, args.size());
24.       }
25.
26.       this.isExample = isExample;
27.       OptionParser parser = new OptionParser();
28.       parser.parse(submitArgs);
29.       this.isAppResourceReq = parser.isAppResourceReq;
30.     }  else {
31.       this.isExample = isExample;
32.       this.isAppResourceReq = false;
33.     }
34.   }

從這些初步的參數(shù)解析可以看出,前面腳本中的參數(shù)與最終對應(yīng)的主資源間的對應(yīng)關(guān)系見表6-1。

表6-1 腳本中的參數(shù)與主資源間的對應(yīng)關(guān)系

如果繼續(xù)跟蹤appResource賦值的源碼,可以跟蹤到一些特殊類的類名與最終對應(yīng)的主資源間的對應(yīng)關(guān)系,見表6-2。

表6-2 特殊類的類名與主資源間的對應(yīng)關(guān)系

如果有興趣,可以繼續(xù)跟蹤SparkClassCommandBuilder類的buildCommand方法的源碼,查看構(gòu)建的命令具體有哪些。

通過Main類的簡單解析,可以將前面的腳本分析結(jié)果與后面即將進(jìn)行分析的SparkSubmit類關(guān)聯(lián)起來,以便進(jìn)一步解析與應(yīng)用程序提交相關(guān)的其他源碼。

從前面的腳本分析可以看到,提交應(yīng)用程序時(shí),Main啟動(dòng)的類,也就是用戶最終提交執(zhí)行的類是org.apache.spark.deploy.SparkSubmit。因此,下面開始解析SparkSubmit相關(guān)的源碼,包括提交行為的定義、提交時(shí)的參數(shù)解析以及最終提交運(yùn)行的代碼解析。

2.SparkSubmitAction解析

SparkSubmitAction定義了提交應(yīng)用程序的行為類型,源碼如下所示。

SparkSubmit.scala的源碼如下。

1.    private[deploy] object SparkSubmitAction extends Enumeration {
2.    type SparkSubmitAction = Value
3.    val SUBMIT, KILL, REQUEST_STATUS = Value
4.  }

從源碼中可以看到,分別定義了SUBMIT、KILL、REQUEST_STATUS這3種行為類型,對應(yīng)提交應(yīng)用、停止應(yīng)用、查詢應(yīng)用的狀態(tài)。

3.SparkSubmit解析

SparkSubmit的全路徑為org.apache.spark.deploy.SparkSubmit。從SparkSubmit類的注釋可以看出,SparkSubmit是啟動(dòng)一個(gè)Spark應(yīng)用程序的主入口點(diǎn),這和前面從腳本分析得到的結(jié)論一致。首先看一下SparkSubmit類的注釋,如下所示。

1.  /**
2.    *啟動(dòng)一個(gè)Spark應(yīng)用程序的主入口點(diǎn)
3.    *
4.    *
5.    *這個(gè)程序處理與Spark依賴相關(guān)的類路徑設(shè)置,提供Spark支持的在不同集群管理器的部
      *署模式
6.    *
7.    */

SparkSubmit會(huì)幫助我們設(shè)置Spark相關(guān)依賴包的classpath,同時(shí),為了幫助用戶簡化提交應(yīng)用程序的復(fù)雜性,SparkSubmit提供了一個(gè)抽象層,封裝了底層復(fù)雜的集群管理器與部署模式的各種差異點(diǎn),即通過SparkSubmit的封裝,集群管理器與部署模式對用戶是透明的。

在SparkSubmit中體現(xiàn)透明性的集群管理器定義的源碼如下所示。

SparkSubmit.scala的源碼如下。

1.  //集群管理器
2.  //Cluster managers
3.  private val YARN = 1
4.  private val STANDALONE = 2
5.  private val MESOS = 4
6.  private val LOCAL = 8
7.  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

在SparkSubmit中體現(xiàn)透明性的部署模式定義的源碼如下。

1.  //部署模式
2.  //Deploy modes
3.  private val CLIENT = 1
4.  private val CLUSTER = 2
5.  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

作為提交應(yīng)用程序的入口點(diǎn),SparkSubmit中根據(jù)具體的集群管理器進(jìn)行參數(shù)轉(zhuǎn)換、參數(shù)校驗(yàn)等操作,如對模式的檢查,代碼中給出了針對特定情況,不支持的集群管理器與部署模式,在這些模式下提交應(yīng)用程序會(huì)直接報(bào)錯(cuò)退出。

SparkSubmit.scala的源碼如下。

1.   //不支持的集群管理器與部署模式
2.
3.      (clusterManager, deployMode) match {
4.        case (STANDALONE, CLUSTER) if args.isPython =>
5.          printErrorAndExit("Cluster deploy mode is currently not supported
              for python " +"applications on standalone clusters.")
6.
7.        case (STANDALONE, CLUSTER) if args.isR =>
8.          printErrorAndExit("Cluster deploy mode is currently not supported
              for R " +"applications on standalone clusters.")
9.
10.       case (LOCAL, CLUSTER) =>
11.         printErrorAndExit("Cluster deploy mode is not compatible with
            master \"local\"")
12.       case (_, CLUSTER) if isShell(args.primaryResource) =>
13.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            shells.")
14.       case (_, CLUSTER) if isSqlShell(args.mainClass) =>
15.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            SQL shell.")
16.       case (_, CLUSTER) if isThriftServer(args.mainClass) =>
17.         printErrorAndExit("Cluster deploy mode is not applicable to Spark
            Thrift server.")
18.       case _ =>
19.     }

首先,一個(gè)程序運(yùn)行的入口點(diǎn)對應(yīng)單例對象的main函數(shù),因此在執(zhí)行SparkSubmit時(shí),對應(yīng)的入口點(diǎn)是objectSparkSubmit的main函數(shù),具體代碼如下。

SparkSubmit.scala的源碼如下。

1.  //入口點(diǎn)函數(shù)main的定義
2.  def main(args: Array[String]): Unit = {
3.    val appArgs = new SparkSubmitArguments(args)
4.   ......
5.   //根據(jù)3種行為分別進(jìn)行處理
6.  appArgs.action match {
7.      case SparkSubmitAction.SUBMIT => submit(appArgs)
8.      case SparkSubmitAction.KILL => kill(appArgs)
9.      case SparkSubmitAction.REQUEST_STATUS =>requestStatus(appArgs)
10.   }
11. }

其中,SparkSubmitArguments類對應(yīng)用戶調(diào)用提交腳本spark-submit時(shí)傳入的參數(shù)信息。對應(yīng)的腳本的幫助信息(./bin/spark-submit --help),也是由該類的printUsageAndExit方法提供的。

找到上面的入口點(diǎn)代碼之后,就可以開始分析其內(nèi)部的源碼。對應(yīng)參數(shù)信息的SparkSubmitArguments可以參考腳本的幫助信息,來查看具體參數(shù)對應(yīng)的含義。參數(shù)分析后,便是對各種提交行為的具體處理。SparkSubmit支持SparkSubmitAction包含的3種行為,下面以行為SparkSubmitAction.SUBMIT為例進(jìn)行分析,其他行為也可以通過各自的具體處理代碼進(jìn)行分析。

對應(yīng)處理SparkSubmitAction.SUBMIT行為的代碼入口點(diǎn)為submit(appArgs),進(jìn)入該方法,即進(jìn)入提交應(yīng)用程序的處理方法的具體代碼如下所示。

SparkSubmit.scala的源碼如下。

1.     private def submit(args: SparkSubmitArguments): Unit = {
2.   //準(zhǔn)備應(yīng)用程序提交的環(huán)境,該步驟包含了內(nèi)部封裝的各個(gè)細(xì)節(jié)處理
3.      val (childArgs, childClasspath, sysProps, childMainClass) =
        prepareSubmitEnvironment(args)
4.
5.      def doRunMain(): Unit = {
6.        if (args.proxyUser != null) {
7.          val proxyUser = UserGroupInformation.createProxyUser
              (args.proxyUser,UserGroupInformation.getCurrentUser())
8.
9.          try {
10.           proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
11.             override def run(): Unit = {
12.               runMain(childArgs, childClasspath, sysProps, childMainClass,
                  args.verbose)
13.             }
14.           })
15.         } catch {
16.           case e: Exception =>
17.             //hadoop的AuthorizationException抑制異常堆棧跟蹤,通過JVM打印輸
                //出的消息不是很有幫助。這里檢測異常以及空棧,對其采用不同的處理
18.             if (e.getStackTrace().length == 0) {
19.               //scalastyle:off println
20.               printStream.println(s"ERROR:            ${e.getClass().getName()}:
                  ${e.getMessage()}")
21.               //scalastyle:on println
22.               exitFn(1)
23.             } else {
24.               throw e
25.             }
26.         }
27.       } else {
28.         runMain(childArgs,       childClasspath,      sysProps,    childMainClass,
            args.verbose)
29.      }
30.    }
31.
32.   //Standalone 集群模式下,有兩種提交應(yīng)用程序的方式
33.   //1.傳統(tǒng)的RPC網(wǎng)關(guān)方式使用o.a.s.deploy.Client進(jìn)行封裝
34.   //2.Spark 1.3使用新REST-based 網(wǎng)關(guān)方式,作為Spark 1.3的默認(rèn)方法,如果Master
      //節(jié)點(diǎn)不是REST服務(wù)器節(jié)點(diǎn),Spark應(yīng)用程序提交時(shí)會(huì)切換到傳統(tǒng)的網(wǎng)關(guān)模式
35.    if (args.isStandaloneCluster && args.useRest) {
36.      try {
37.        //scalastyle:off println
38.        printStream.println("Running Spark using the REST application
           submission protocol.")
39.        //scalastyle:on println
40.        doRunMain()
41.      } catch {
42.          //如果失敗,則使用傳統(tǒng)的提交方式
43.        case e: SubmitRestConnectionException =>
44.          printWarning(s"Master endpoint ${args.master} was not a REST
             server. " + "Falling back to legacy submission gateway instead.")
45.
46.       //重新設(shè)置提交方式的控制開關(guān)
47.          args.useRest = false
48.          submit(args)
49.      }
50.    //在所有其他模式中,只要準(zhǔn)備好主類就可以
51.    } else {
52.      doRunMain()
53.    }
54.  }

其中,最終運(yùn)行所需的參數(shù)都由prepareSubmitEnvironment方法負(fù)責(zé)解析、轉(zhuǎn)換,然后根據(jù)其結(jié)果執(zhí)行。解析的結(jié)果包含以下4部分。

 子進(jìn)程運(yùn)行所需的參數(shù)。

 子進(jìn)程運(yùn)行時(shí)的classpath列表。

 系統(tǒng)屬性的映射。

 子進(jìn)程運(yùn)行時(shí)的主類。

解析之后調(diào)用runMain方法,該方法中除了一些環(huán)境設(shè)置等操作外,最終會(huì)調(diào)用解析得到的childMainClass的main方法。下面簡單分析一下prepareSubmitEnvironment方法,通過該方法來了解SparkSubmit是如何幫助底層的集群管理器和部署模式的封裝的。里面涉及的各種細(xì)節(jié)比較多,這里以不同集群管理器和部署模式下最終運(yùn)行的childMainClass類的解析為主線進(jìn)行分析。

(1)當(dāng)部署模式為CLIENT時(shí),將childMainClass設(shè)置為傳入的mainClass,對應(yīng)代碼如下所示。

1.     //在CLIENT模式下,直接啟動(dòng)應(yīng)用程序的主類
2.  if (deployMode == CLIENT || isYarnCluster) {
3.       childMainClass = args.mainClass
4.       if (isUserJar(args.primaryResource)) {
5.         childClasspath += args.primaryResource
6.       }
7.       if (args.jars != null) { childClasspath ++= args.jars.split(",") }
8.     }
9.
10.  if (deployMode == CLIENT) {
11.    if (args.childArgs != null) { childArgs ++= args.childArgs }
12.  }

(2)當(dāng)集群管理器為STANDALONE、部署模式為CLUSTER時(shí),根據(jù)提交的兩種方式將childMainClass分別設(shè)置為不同的類,同時(shí)將傳入的args.mainClass(提交應(yīng)用程序時(shí)設(shè)置的主類)及其參數(shù)根據(jù)不同集群管理器與部署模式進(jìn)行轉(zhuǎn)換,并封裝到新的主類所需的參數(shù)中。對應(yīng)的設(shè)置見表6-3。

表6-3 STANDALONE+CLUSTER時(shí)兩種不同提交方式下的childMainClass封裝

其中,表述性狀態(tài)傳遞(Representational State Transfer,REST)是Roy Fielding博士在2000年他的博士論文中提出來的一種軟件架構(gòu)風(fēng)格。

這些設(shè)置的主類相當(dāng)于封裝了應(yīng)用程序提交時(shí)的主類,運(yùn)行后負(fù)責(zé)向Master節(jié)點(diǎn)申請啟動(dòng)提交的應(yīng)用程序。

(3)當(dāng)集群管理器為YARN、部署模式為CLUSTER時(shí),childMainClass以及對應(yīng)的mainClass的設(shè)置見表6-4。

表6-4 YARN+CLUSTER時(shí)childMainClass下的childMainClass封裝

(4)當(dāng)集群管理器為MESOS、部署模式為CLUSTER時(shí),childMainClass以及對應(yīng)的mainClass的設(shè)置見表6-5。

表6-5 MESOS+CLUSTER時(shí)childMainClass下的childMainClass封裝

從上面的分析中可以看到,使用CLIENT部署模式進(jìn)行提交時(shí),由于設(shè)置的childMainClass為應(yīng)用程序提交時(shí)的主類,因此是直接在提交點(diǎn)執(zhí)行設(shè)置的主類,即mainClass,當(dāng)使用CLUSTER部署模式進(jìn)行提交時(shí),則會(huì)根據(jù)具體集群管理器等信息,使用相應(yīng)的封裝類。這些封裝類會(huì)向集群申請?zhí)峤粦?yīng)用程序的請求,然后在由集群調(diào)度分配得到的節(jié)點(diǎn)上,啟動(dòng)所申請的應(yīng)用程序。

以封裝類設(shè)置為org.apache.spark.deploy.Client為例,從該類主入口main方法查看,可以看到構(gòu)建了一個(gè)ClientEndpoint實(shí)例,該實(shí)例構(gòu)建時(shí),會(huì)將提交應(yīng)用程序時(shí)設(shè)置的mainClass等信息封裝到DriverDescription實(shí)例中,然后發(fā)送到Master,申請執(zhí)行用戶提交的應(yīng)用程序。

對應(yīng)各種集群管理器與部署模式的組合,實(shí)際代碼中的處理細(xì)節(jié)非常多。這里僅給出一種源碼閱讀的方式,和對應(yīng)的大數(shù)據(jù)處理一樣,通常采用化繁為簡的方式去閱讀復(fù)雜的源碼。例如,這里在理解整個(gè)大框架的調(diào)用過程后,以childMainClass的設(shè)置作為主線去解讀源碼,對應(yīng)地,在擴(kuò)展閱讀其他源碼時(shí),也可以采用這種方式,以某種集群管理器與部署模式為主線,詳細(xì)閱讀相關(guān)的代碼。最后,在了解各種組合的處理細(xì)節(jié)之后,通過對比、抽象等方法,對整個(gè)SparkSubmit進(jìn)行歸納總結(jié)。

提交的應(yīng)用程序的驅(qū)動(dòng)程序(Driver Program)部分對應(yīng)包含了一個(gè)SparkContext實(shí)例。因此,接下來從該實(shí)例出發(fā),解析驅(qū)動(dòng)程序在不同的集群管理器的部署細(xì)節(jié)。

4.SparkContext解析

在詳細(xì)解析SparkContext實(shí)例前,首先查看一下SparkContext類的注釋部分,具體如下所示。

1.  /**
2.   * Spark功能的主入口點(diǎn)。一個(gè)SparkContext代表連接到Spark集群,并可用于在集群中
     * 創(chuàng)建RDDs、累加器和廣播變量
3.   * ......
4.   * @param 描述應(yīng)用程序配置的配置對象。在該配置的任何設(shè)置將覆蓋默認(rèn)的配置以及系統(tǒng)屬性
5.   */

SparkContext類是Spark功能的主入口點(diǎn)。一個(gè)SparkContext實(shí)例代表了與一個(gè)Spark集群的連接,并且通過該實(shí)例,可以在集群中構(gòu)建RDDs、累加器以及廣播變量。SparkContext實(shí)例的構(gòu)建參數(shù)config描述了應(yīng)用程序的Spark配置。在該參數(shù)中指定的配置屬性會(huì)覆蓋默認(rèn)的配置屬性以及系統(tǒng)屬性。

在SparkContext類文件中定義了一個(gè)描述集群管理器類型的單例對象SparkMasterRegex,在該對象中詳細(xì)給出了當(dāng)前Spark支持的各種集群管理器類型。

SparkContext.scala的源碼如下。

1.  /**
2.   * 定義了從Master信息中抽取集群管理器類型的一個(gè)正則表達(dá)式集合
3.   *
4.   */
5.  private object SparkMasterRegex {
6.   //對應(yīng)Master格式如local[N] 和local[*]的正則表達(dá)式
7.   //對應(yīng)的Master格式如local[N]和local[*]的正則表達(dá)式
8.   val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
9.
10.  //對應(yīng)的Master格式如local[N, maxRetries]的正則表達(dá)式
11.  //這種集群管理器類型用于具有任務(wù)失敗嘗試功能的測試
12.
13.   val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
14.
15. //一種模擬Spark集群的本地模式的正則表達(dá)式,對應(yīng)的Master格式如local-cluster[N,
16. //cores, memory]
17.
18.                                         LOCAL_CLUSTER_REGEX = """local-cluster\
                                             [\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*
                                             ([0-9]+)\s*]""".r
19.
20. //連接Spark部署集群的正則表達(dá)式
21.
22.   val SPARK_REGEX = """spark://(.*)""".r
23. }

在SparkContext類中的主要流程可以歸納如下:

(1)createSparkEnv:創(chuàng)建Spark的執(zhí)行環(huán)境對應(yīng)的SparkEnv實(shí)例。

對應(yīng)代碼如下所示。

1.      //Create the Spark execution environment (cache, map output tracker,
        //etc)
2.      _env = createSparkEnv(_conf, isLocal, listenerBus)
3.  SparkEnv.set(_env)

(2)createTaskScheduler:創(chuàng)建作業(yè)調(diào)度器實(shí)例。

對應(yīng)代碼如下所示。

1.  //創(chuàng)建和啟動(dòng)調(diào)度器scheduler
2.  val (sched, ts) = SparkContext.createTaskScheduler(this, master)
3.  _schedulerBackend = sched
4.  _taskScheduler = ts

其中,TaskScheduler是低層次的任務(wù)調(diào)度器,負(fù)責(zé)任務(wù)的調(diào)度。通過該接口提供可插拔的任務(wù)調(diào)度器。每個(gè)TaskScheduler負(fù)責(zé)調(diào)度一個(gè)SparkContext實(shí)例中的任務(wù),負(fù)責(zé)調(diào)度上層DAG調(diào)度器中每個(gè)Stage提交的任務(wù)集(TaskSet),并將這些任務(wù)提交到集群中運(yùn)行,在任務(wù)提交執(zhí)行時(shí),可以使用失敗重試機(jī)制設(shè)置失敗重試的次數(shù)。上述對應(yīng)高層的DAG調(diào)度器的實(shí)例構(gòu)建參見下一步。

(3)new DAGScheduler:創(chuàng)建高層Stage調(diào)度的DAG調(diào)度器實(shí)例。

對應(yīng)代碼如下。

1.  _dagScheduler = new DAGScheduler(this)

DAGScheduler是高層調(diào)度模塊,負(fù)責(zé)作業(yè)(Job)的Stage拆分,以及最終將Stage對應(yīng)的任務(wù)集提交到低層次的任務(wù)調(diào)度器上。

下面基于這些主要流程,針對SparkMasterRegex單例對象中給出的各種集群部署模式進(jìn)行解析。對應(yīng)不同集群模式,這些流程中構(gòu)建了包括TaskScheduler與SchedulerBackend的不同的具體子類,所構(gòu)建的相關(guān)實(shí)例具體見表6-6。

表6-6 各種情況下TaskScheduler與SchedulerBackend的不同的具體子類

與TaskScheduler和SchedulerBackend不同的是,在不同集群模式中,應(yīng)用程序的高層調(diào)度器DAGScheduler的實(shí)例是相同的,即對應(yīng)在Spark on YARN與Mesos等集群管理器中,應(yīng)用程序內(nèi)部的高層Stage調(diào)度是相同的。

主站蜘蛛池模板: 红桥区| 太原市| 阿荣旗| 象州县| 简阳市| 会泽县| 旌德县| 中西区| 江陵县| 甘孜县| 永胜县| 普洱| 刚察县| 东安县| 隆德县| 蒙城县| 东平县| 弋阳县| 聂荣县| 晋宁县| 康马县| 淄博市| 噶尔县| 崇阳县| 大悟县| 彭州市| 仙游县| 隆德县| 湘阴县| 铅山县| 咸宁市| 永登县| 福贡县| 闽清县| 康马县| 洞头县| 牙克石市| 句容市| 枣阳市| 双峰县| 额尔古纳市|