- Spark大數(shù)據(jù)商業(yè)實(shí)戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)
- 王家林
- 6243字
- 2019-12-12 17:29:58
6.1 Spark Application到底是如何提交給集群的
本節(jié)講解Application提交參數(shù)配置、Application提交給集群原理、Application提交給集群源碼等內(nèi)容,將徹底解密Spark Application到底是如何提交給集群的。
6.1.1 Application提交參數(shù)配置詳解
用戶應(yīng)用程序可以使用bin/spark-submit腳本來啟動(dòng)。spark-submit腳本負(fù)責(zé)使用Spark及其依賴關(guān)系設(shè)置類路徑,并可支持Spark支持的不同群集管理器和部署模式。
bin/spark-submit腳本示例如下。
1. ./bin/spark-submit \ 2. --class <main-class> \ 3. --master <master-url> \ 4. --deploy-mode <deploy-mode> \ 5. --conf <key>=<value> \ 6. ... # other options 7. <application-jar> \ 8. [application-arguments]
spark-submit腳本提交參數(shù)配置中一些常用的選項(xiàng)。
--class:應(yīng)用程序的入口點(diǎn)(如org.apache.spark.examples.SparkPi)。
--master:集群的主URL(如spark://23.195.26.187:7077)。
--deploy-mode:將Driver程序部署在集群Worker節(jié)點(diǎn)(cluster);或作為外部客戶端(client)部署在本地(默認(rèn)值:client)。
--conf:任意Spark配置屬性,使用key = value格式。對于包含空格的值,用引號括起來,如“key = value”。
application-jar:包含應(yīng)用程序和所有依賴關(guān)系Jar包的路徑。該URL必須在集群內(nèi)全局可見。例如,所有節(jié)點(diǎn)上存在的hdfs://路徑或file://路徑。
application-arguments:傳遞給主類的main方法的參數(shù)。
6.1.2 Application提交給集群原理詳解
在Spark官網(wǎng)部署頁面(http://spark.apache.org/docs/latest/cluster-overview.html),可以看到當(dāng)前集群支持以下3種集群管理器(cluster manager)。
(1)Standalone:Spark原生的簡單集群管理器。使用Standalone可以很方便地搭建一個(gè)集群。
(2)Apache Mesos:一個(gè)通用的集群管理器,可以在上面運(yùn)行HadoopMapReduce和一些服務(wù)型的應(yīng)用。
(3)Hadoop YARN:在Hadoop 2中提供的資源管理器。
另外,Spark提供的EC2啟動(dòng)腳本,可以很方便地在Amazon EC2上啟動(dòng)一個(gè)Standalone集群。
實(shí)際上,除了上面這些通用的集群管理器外,Spark內(nèi)部也提供一些方便我們測試、學(xué)習(xí)的簡單集群部署模式。為了更全面地理解,我們會(huì)從Spark應(yīng)用程序部署點(diǎn)切入,也就是從提交一個(gè)Spark應(yīng)用程序開始,引出并詳細(xì)解析各種部署模式。
說明:下面涉及類的描述時(shí),如果可以通過類名唯一確定一個(gè)類,將直接給出類名,如果不能,會(huì)先給出全路徑的類名,然后在不出現(xiàn)歧義的地方再簡寫為類名。
為了簡化應(yīng)用程序提交的復(fù)雜性,Spark提供了各種應(yīng)用程序提交的統(tǒng)一入口,即spark-submit腳本,應(yīng)用程序的提交都間接或直接地調(diào)用了該腳本。下面簡單分析幾個(gè)腳本,包含./bin/spark-shell、./bin/pyspark、./bin/sparkR、./bin/spark-sql、./bin/run-example、./bin/speak-submit,以及所有腳本最終都調(diào)用到的一個(gè)執(zhí)行Java類的腳本./bin/spark-class。
1.腳本./bin/spark-shell
通過該腳本可以打開使用Scala語言進(jìn)行開發(fā)、調(diào)試的交互式界面,腳本的代碼如下所示。
1. ...... 2. function main() { 3. ...... 4. "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" 5. sttyicanon echo > /dev/null 2>&1 6. else 7. export SPARK_SUBMIT_OPTS 8. "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" 9. fi 10. } 11. ......
對應(yīng)在第4行和第8行處,調(diào)用了應(yīng)用程序提交腳本./bin/spark-submit。腳本./bin/spark-shell的基本用法如下所示:
1. "Usage: ./bin/spark-shell [options]"
其他腳本類似。下面分別針對各個(gè)腳本的用法(具體用法可查看腳本的幫助信息,如通過--help選項(xiàng)來獲取)與關(guān)鍵執(zhí)行語句等進(jìn)行簡單解析。了解工具(如腳本)如何使用,最根本的是先查看其幫助信息,然后在此基礎(chǔ)上進(jìn)行擴(kuò)展。
2.腳本./bin/pyspark
通過該腳本可以打開使用Python語言開發(fā)、調(diào)試的交互式界面。
(1)該腳本的用法如下。
1. "Usage: ./bin/pyspark [options]"
(2)該腳本的執(zhí)行語句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
3.腳本./bin/sparkR
通過該腳本可以打開使用sparkR開發(fā)、調(diào)試的交互式界面。
(1)該腳本的用法如下。
1. "Usage: ./bin/sparkR [options]"
(2)該腳本的執(zhí)行語句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit sparkr-shell-main "$@"
4.腳本./bin/spark-sql
通過該腳本可以打開使用SparkSql開發(fā)、調(diào)試的交互式界面。
(1)該腳本的用法如下。
1. "Usage: ./bin/spark-sql [options] [cli option]"
(2)該腳本的執(zhí)行語句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.hive. thriftserver.SparkSQLCLIDriver "$@"
5.腳本./bin/run-example
可以通過該腳本運(yùn)行Spark自帶的案例代碼。該腳本中會(huì)自動(dòng)補(bǔ)全案例類的路徑。
(1)該腳本的用法如下。
1. echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2 2. echo " - set MASTER=XX to use a specific master" 1>&2 3. echo " - can use abbreviated example class name relative to com.apache. spark.examples" 1>&2 4. echo " (e.g. SparkPi, mllib.LinearRegression, streaming. KinesisWordCountASL)" 1>&2
(2)該腳本的執(zhí)行語句如下。
1. exec "${SPARK_HOME}"/bin/spark-submit \ 2. --master $EXAMPLE_MASTER \ 3. --class $EXAMPLE_CLASS \ 4. "$SPARK_EXAMPLES_JAR" \ 5. "$@"
6.腳本./bin/spark-submit
./bin/spark-submit是提交Spark應(yīng)用程序最常用的一個(gè)腳本。從前面各個(gè)腳本的解析可以看出,各個(gè)腳本最終都調(diào)用了./bin/spark-submit腳本。
(1)該腳本的用法如下。
該腳本的用法需要從源碼中獲取,具體源碼位置參考SparkSubmitArguments類的方法printUsageAndExit,代碼如下。
1. val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse( 2. """Usage: spark-submit [options] <app jar | python file> [app arguments] 3. |Usage: spark-submit --kill [submission ID] --master [spark://...] 4. |Usage: spark-submit --status [submission ID] --master [spark: //...]""".stripMargin)
(2)該腳本的執(zhí)行語句如下。
1. exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy. SparkSubmit "$@"
7.腳本./bin/spark-class
該腳本是所有其他腳本最終都調(diào)用到的一個(gè)執(zhí)行Java類的腳本。其中關(guān)鍵的執(zhí)行語句如下所示。
1. CMD=() 2. while IFS= read -d '' -r ARG; do 3. CMD+=("$ARG") 4. done <<("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") 5. exec "${CMD[@]}"
其中,負(fù)責(zé)運(yùn)行的RUNNER變量設(shè)置如下。
1. # Find the java binary 2. //將RUNNER設(shè)置為Java 3. if [ -n "${JAVA_HOME}" ]; then 4. RUNNER="${JAVA_HOME}/bin/java" 5. else 6. if [ `command -v java` ]; then 7. RUNNER="java" 8. else 9. echo "JAVA_HOME is not set" >&2 10. exit 1 11. fi 12. fi
在腳本中,LAUNCH_CLASSPATH變量對應(yīng)Java命令運(yùn)行時(shí)所需的classpath信息。最終Java命令啟動(dòng)的類是org.apache.spark.launcher.Main。Main類的入口函數(shù)main,會(huì)根據(jù)輸入?yún)?shù)構(gòu)建出最終執(zhí)行的命令,即這里返回的${CMD[@]}信息,然后通過exec執(zhí)行。
6.1.3 Application提交給集群源碼詳解
本節(jié)從應(yīng)用部署的角度解析相關(guān)的源碼,主要包括腳本提交時(shí)對應(yīng)JVM進(jìn)程啟動(dòng)的主類org.apache.spark.launcher.Main、定義應(yīng)用程序提交的行為類型的類org.apache.spark.deploy. SparkSubmitAction、應(yīng)用程序封裝底層集群管理器和部署模式的類org.apache.spark.deploy. SparkSubmit,以及代表一個(gè)應(yīng)用程序的驅(qū)動(dòng)程序的類org.apache.spark.SparkContext。
1.Main解析
從前面的腳本分析,得出最終都是通過org.apache.spark.launcher.Main類(下面簡稱Main類)啟動(dòng)應(yīng)用程序的。因此,首先解析一下Main類。
在Main類的源碼中,類的注釋如下。
1. /** 2. * Spark 啟動(dòng)器的命令行接口。在Spark腳本內(nèi)部使用 3. * 4. */
對應(yīng)地,在Main對象的入口方法main的注釋如下。
Main.java源碼如下。
1. 2. /** 3. * Usage: Main [class] [class args] 4. * <p> 5. * 命令行界面工作在兩種模式下: 6. * <ul> 7. * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy. * SparkSubmit", the {@link SparkLauncher} class is used to launch a * Spark application. </li> 8. * <li>"spark-class": if another class is provided, an internal Spark * class is run.</li> 9. * </ul> 10. ....... 11. public static void main(String[] argsArray) throws Exception { 12. ......
Main類主要有兩種工作模式,分別描述如下。
(1)spark-submit
啟動(dòng)器要啟動(dòng)的類為org.apache.spark.deploy.SparkSubmit時(shí),對應(yīng)為spark-submit工作模式。此時(shí),使用SparkSubmitCommandBuilder類來構(gòu)建啟動(dòng)命令。
(2)spark-class
啟動(dòng)器要啟動(dòng)的類是除SparkSubmit之外的其他類時(shí),對應(yīng)為spark-class工作模式。此時(shí)使用SparkClassCommandBuilder類的buildCommand方法來構(gòu)建啟動(dòng)命令。
Main.java的源碼如下。
1. public static void main(String[] argsArray) throws Exception { 2. ....... 3. String className = args.remove(0); 4. ....... 5. if (className.equals("org.apache.spark.deploy.SparkSubmit")) { 6. try { 7. builder = new SparkSubmitCommandBuilder(args); 8. ....... 9. } else { 10. builder = new SparkClassCommandBuilder(className, args); 11. } 12. ......
以spark-submit工作模式為例,對應(yīng)的在構(gòu)建啟動(dòng)命令的SparkSubmitCommandBuilder類中,上述調(diào)用的SparkClassCommandBuilder構(gòu)造函數(shù)定義如下。
SparkSubmitCommandBuilder.java的源碼如下。
1. SparkSubmitCommandBuilder(List<String> args) { 2. this.allowsMixedArguments = false; 3. this.sparkArgs = new ArrayList<>(); 4. boolean isExample = false; 5. List<String> submitArgs = args; 6. //根據(jù)輸入的第一個(gè)參數(shù)設(shè)置,包括主資源appResource等 7. if (args.size() > 0) { 8. switch (args.get(0)) { 9. case PYSPARK_SHELL: 10. this.allowsMixedArguments = true; 11. appResource = PYSPARK_SHELL; 12. submitArgs = args.subList(1, args.size()); 13. break; 14. 15. case SPARKR_SHELL: 16. this.allowsMixedArguments = true; 17. appResource = SPARKR_SHELL; 18. submitArgs = args.subList(1, args.size()); 19. break; 20. 21. case RUN_EXAMPLE: 22. isExample = true; 23. submitArgs = args.subList(1, args.size()); 24. } 25. 26. this.isExample = isExample; 27. OptionParser parser = new OptionParser(); 28. parser.parse(submitArgs); 29. this.isAppResourceReq = parser.isAppResourceReq; 30. } else { 31. this.isExample = isExample; 32. this.isAppResourceReq = false; 33. } 34. }
從這些初步的參數(shù)解析可以看出,前面腳本中的參數(shù)與最終對應(yīng)的主資源間的對應(yīng)關(guān)系見表6-1。
表6-1 腳本中的參數(shù)與主資源間的對應(yīng)關(guān)系

如果繼續(xù)跟蹤appResource賦值的源碼,可以跟蹤到一些特殊類的類名與最終對應(yīng)的主資源間的對應(yīng)關(guān)系,見表6-2。
表6-2 特殊類的類名與主資源間的對應(yīng)關(guān)系

如果有興趣,可以繼續(xù)跟蹤SparkClassCommandBuilder類的buildCommand方法的源碼,查看構(gòu)建的命令具體有哪些。
通過Main類的簡單解析,可以將前面的腳本分析結(jié)果與后面即將進(jìn)行分析的SparkSubmit類關(guān)聯(lián)起來,以便進(jìn)一步解析與應(yīng)用程序提交相關(guān)的其他源碼。
從前面的腳本分析可以看到,提交應(yīng)用程序時(shí),Main啟動(dòng)的類,也就是用戶最終提交執(zhí)行的類是org.apache.spark.deploy.SparkSubmit。因此,下面開始解析SparkSubmit相關(guān)的源碼,包括提交行為的定義、提交時(shí)的參數(shù)解析以及最終提交運(yùn)行的代碼解析。
2.SparkSubmitAction解析
SparkSubmitAction定義了提交應(yīng)用程序的行為類型,源碼如下所示。
SparkSubmit.scala的源碼如下。
1. private[deploy] object SparkSubmitAction extends Enumeration { 2. type SparkSubmitAction = Value 3. val SUBMIT, KILL, REQUEST_STATUS = Value 4. }
從源碼中可以看到,分別定義了SUBMIT、KILL、REQUEST_STATUS這3種行為類型,對應(yīng)提交應(yīng)用、停止應(yīng)用、查詢應(yīng)用的狀態(tài)。
3.SparkSubmit解析
SparkSubmit的全路徑為org.apache.spark.deploy.SparkSubmit。從SparkSubmit類的注釋可以看出,SparkSubmit是啟動(dòng)一個(gè)Spark應(yīng)用程序的主入口點(diǎn),這和前面從腳本分析得到的結(jié)論一致。首先看一下SparkSubmit類的注釋,如下所示。
1. /** 2. *啟動(dòng)一個(gè)Spark應(yīng)用程序的主入口點(diǎn) 3. * 4. * 5. *這個(gè)程序處理與Spark依賴相關(guān)的類路徑設(shè)置,提供Spark支持的在不同集群管理器的部 *署模式 6. * 7. */
SparkSubmit會(huì)幫助我們設(shè)置Spark相關(guān)依賴包的classpath,同時(shí),為了幫助用戶簡化提交應(yīng)用程序的復(fù)雜性,SparkSubmit提供了一個(gè)抽象層,封裝了底層復(fù)雜的集群管理器與部署模式的各種差異點(diǎn),即通過SparkSubmit的封裝,集群管理器與部署模式對用戶是透明的。
在SparkSubmit中體現(xiàn)透明性的集群管理器定義的源碼如下所示。
SparkSubmit.scala的源碼如下。
1. //集群管理器 2. //Cluster managers 3. private val YARN = 1 4. private val STANDALONE = 2 5. private val MESOS = 4 6. private val LOCAL = 8 7. private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
在SparkSubmit中體現(xiàn)透明性的部署模式定義的源碼如下。
1. //部署模式 2. //Deploy modes 3. private val CLIENT = 1 4. private val CLUSTER = 2 5. private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
作為提交應(yīng)用程序的入口點(diǎn),SparkSubmit中根據(jù)具體的集群管理器進(jìn)行參數(shù)轉(zhuǎn)換、參數(shù)校驗(yàn)等操作,如對模式的檢查,代碼中給出了針對特定情況,不支持的集群管理器與部署模式,在這些模式下提交應(yīng)用程序會(huì)直接報(bào)錯(cuò)退出。
SparkSubmit.scala的源碼如下。
1. //不支持的集群管理器與部署模式 2. 3. (clusterManager, deployMode) match { 4. case (STANDALONE, CLUSTER) if args.isPython => 5. printErrorAndExit("Cluster deploy mode is currently not supported for python " +"applications on standalone clusters.") 6. 7. case (STANDALONE, CLUSTER) if args.isR => 8. printErrorAndExit("Cluster deploy mode is currently not supported for R " +"applications on standalone clusters.") 9. 10. case (LOCAL, CLUSTER) => 11. printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") 12. case (_, CLUSTER) if isShell(args.primaryResource) => 13. printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") 14. case (_, CLUSTER) if isSqlShell(args.mainClass) => 15. printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.") 16. case (_, CLUSTER) if isThriftServer(args.mainClass) => 17. printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.") 18. case _ => 19. }
首先,一個(gè)程序運(yùn)行的入口點(diǎn)對應(yīng)單例對象的main函數(shù),因此在執(zhí)行SparkSubmit時(shí),對應(yīng)的入口點(diǎn)是objectSparkSubmit的main函數(shù),具體代碼如下。
SparkSubmit.scala的源碼如下。
1. //入口點(diǎn)函數(shù)main的定義 2. def main(args: Array[String]): Unit = { 3. val appArgs = new SparkSubmitArguments(args) 4. ...... 5. //根據(jù)3種行為分別進(jìn)行處理 6. appArgs.action match { 7. case SparkSubmitAction.SUBMIT => submit(appArgs) 8. case SparkSubmitAction.KILL => kill(appArgs) 9. case SparkSubmitAction.REQUEST_STATUS =>requestStatus(appArgs) 10. } 11. }
其中,SparkSubmitArguments類對應(yīng)用戶調(diào)用提交腳本spark-submit時(shí)傳入的參數(shù)信息。對應(yīng)的腳本的幫助信息(./bin/spark-submit --help),也是由該類的printUsageAndExit方法提供的。
找到上面的入口點(diǎn)代碼之后,就可以開始分析其內(nèi)部的源碼。對應(yīng)參數(shù)信息的SparkSubmitArguments可以參考腳本的幫助信息,來查看具體參數(shù)對應(yīng)的含義。參數(shù)分析后,便是對各種提交行為的具體處理。SparkSubmit支持SparkSubmitAction包含的3種行為,下面以行為SparkSubmitAction.SUBMIT為例進(jìn)行分析,其他行為也可以通過各自的具體處理代碼進(jìn)行分析。
對應(yīng)處理SparkSubmitAction.SUBMIT行為的代碼入口點(diǎn)為submit(appArgs),進(jìn)入該方法,即進(jìn)入提交應(yīng)用程序的處理方法的具體代碼如下所示。
SparkSubmit.scala的源碼如下。
1. private def submit(args: SparkSubmitArguments): Unit = { 2. //準(zhǔn)備應(yīng)用程序提交的環(huán)境,該步驟包含了內(nèi)部封裝的各個(gè)細(xì)節(jié)處理 3. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 4. 5. def doRunMain(): Unit = { 6. if (args.proxyUser != null) { 7. val proxyUser = UserGroupInformation.createProxyUser (args.proxyUser,UserGroupInformation.getCurrentUser()) 8. 9. try { 10. proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { 11. override def run(): Unit = { 12. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 13. } 14. }) 15. } catch { 16. case e: Exception => 17. //hadoop的AuthorizationException抑制異常堆棧跟蹤,通過JVM打印輸 //出的消息不是很有幫助。這里檢測異常以及空棧,對其采用不同的處理 18. if (e.getStackTrace().length == 0) { 19. //scalastyle:off println 20. printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") 21. //scalastyle:on println 22. exitFn(1) 23. } else { 24. throw e 25. } 26. } 27. } else { 28. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 29. } 30. } 31. 32. //Standalone 集群模式下,有兩種提交應(yīng)用程序的方式 33. //1.傳統(tǒng)的RPC網(wǎng)關(guān)方式使用o.a.s.deploy.Client進(jìn)行封裝 34. //2.Spark 1.3使用新REST-based 網(wǎng)關(guān)方式,作為Spark 1.3的默認(rèn)方法,如果Master //節(jié)點(diǎn)不是REST服務(wù)器節(jié)點(diǎn),Spark應(yīng)用程序提交時(shí)會(huì)切換到傳統(tǒng)的網(wǎng)關(guān)模式 35. if (args.isStandaloneCluster && args.useRest) { 36. try { 37. //scalastyle:off println 38. printStream.println("Running Spark using the REST application submission protocol.") 39. //scalastyle:on println 40. doRunMain() 41. } catch { 42. //如果失敗,則使用傳統(tǒng)的提交方式 43. case e: SubmitRestConnectionException => 44. printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") 45. 46. //重新設(shè)置提交方式的控制開關(guān) 47. args.useRest = false 48. submit(args) 49. } 50. //在所有其他模式中,只要準(zhǔn)備好主類就可以 51. } else { 52. doRunMain() 53. } 54. }
其中,最終運(yùn)行所需的參數(shù)都由prepareSubmitEnvironment方法負(fù)責(zé)解析、轉(zhuǎn)換,然后根據(jù)其結(jié)果執(zhí)行。解析的結(jié)果包含以下4部分。
子進(jìn)程運(yùn)行所需的參數(shù)。
子進(jìn)程運(yùn)行時(shí)的classpath列表。
系統(tǒng)屬性的映射。
子進(jìn)程運(yùn)行時(shí)的主類。
解析之后調(diào)用runMain方法,該方法中除了一些環(huán)境設(shè)置等操作外,最終會(huì)調(diào)用解析得到的childMainClass的main方法。下面簡單分析一下prepareSubmitEnvironment方法,通過該方法來了解SparkSubmit是如何幫助底層的集群管理器和部署模式的封裝的。里面涉及的各種細(xì)節(jié)比較多,這里以不同集群管理器和部署模式下最終運(yùn)行的childMainClass類的解析為主線進(jìn)行分析。
(1)當(dāng)部署模式為CLIENT時(shí),將childMainClass設(shè)置為傳入的mainClass,對應(yīng)代碼如下所示。
1. //在CLIENT模式下,直接啟動(dòng)應(yīng)用程序的主類 2. if (deployMode == CLIENT || isYarnCluster) { 3. childMainClass = args.mainClass 4. if (isUserJar(args.primaryResource)) { 5. childClasspath += args.primaryResource 6. } 7. if (args.jars != null) { childClasspath ++= args.jars.split(",") } 8. } 9. 10. if (deployMode == CLIENT) { 11. if (args.childArgs != null) { childArgs ++= args.childArgs } 12. }
(2)當(dāng)集群管理器為STANDALONE、部署模式為CLUSTER時(shí),根據(jù)提交的兩種方式將childMainClass分別設(shè)置為不同的類,同時(shí)將傳入的args.mainClass(提交應(yīng)用程序時(shí)設(shè)置的主類)及其參數(shù)根據(jù)不同集群管理器與部署模式進(jìn)行轉(zhuǎn)換,并封裝到新的主類所需的參數(shù)中。對應(yīng)的設(shè)置見表6-3。
表6-3 STANDALONE+CLUSTER時(shí)兩種不同提交方式下的childMainClass封裝

其中,表述性狀態(tài)傳遞(Representational State Transfer,REST)是Roy Fielding博士在2000年他的博士論文中提出來的一種軟件架構(gòu)風(fēng)格。
這些設(shè)置的主類相當(dāng)于封裝了應(yīng)用程序提交時(shí)的主類,運(yùn)行后負(fù)責(zé)向Master節(jié)點(diǎn)申請啟動(dòng)提交的應(yīng)用程序。
(3)當(dāng)集群管理器為YARN、部署模式為CLUSTER時(shí),childMainClass以及對應(yīng)的mainClass的設(shè)置見表6-4。
表6-4 YARN+CLUSTER時(shí)childMainClass下的childMainClass封裝

(4)當(dāng)集群管理器為MESOS、部署模式為CLUSTER時(shí),childMainClass以及對應(yīng)的mainClass的設(shè)置見表6-5。
表6-5 MESOS+CLUSTER時(shí)childMainClass下的childMainClass封裝

從上面的分析中可以看到,使用CLIENT部署模式進(jìn)行提交時(shí),由于設(shè)置的childMainClass為應(yīng)用程序提交時(shí)的主類,因此是直接在提交點(diǎn)執(zhí)行設(shè)置的主類,即mainClass,當(dāng)使用CLUSTER部署模式進(jìn)行提交時(shí),則會(huì)根據(jù)具體集群管理器等信息,使用相應(yīng)的封裝類。這些封裝類會(huì)向集群申請?zhí)峤粦?yīng)用程序的請求,然后在由集群調(diào)度分配得到的節(jié)點(diǎn)上,啟動(dòng)所申請的應(yīng)用程序。
以封裝類設(shè)置為org.apache.spark.deploy.Client為例,從該類主入口main方法查看,可以看到構(gòu)建了一個(gè)ClientEndpoint實(shí)例,該實(shí)例構(gòu)建時(shí),會(huì)將提交應(yīng)用程序時(shí)設(shè)置的mainClass等信息封裝到DriverDescription實(shí)例中,然后發(fā)送到Master,申請執(zhí)行用戶提交的應(yīng)用程序。
對應(yīng)各種集群管理器與部署模式的組合,實(shí)際代碼中的處理細(xì)節(jié)非常多。這里僅給出一種源碼閱讀的方式,和對應(yīng)的大數(shù)據(jù)處理一樣,通常采用化繁為簡的方式去閱讀復(fù)雜的源碼。例如,這里在理解整個(gè)大框架的調(diào)用過程后,以childMainClass的設(shè)置作為主線去解讀源碼,對應(yīng)地,在擴(kuò)展閱讀其他源碼時(shí),也可以采用這種方式,以某種集群管理器與部署模式為主線,詳細(xì)閱讀相關(guān)的代碼。最后,在了解各種組合的處理細(xì)節(jié)之后,通過對比、抽象等方法,對整個(gè)SparkSubmit進(jìn)行歸納總結(jié)。
提交的應(yīng)用程序的驅(qū)動(dòng)程序(Driver Program)部分對應(yīng)包含了一個(gè)SparkContext實(shí)例。因此,接下來從該實(shí)例出發(fā),解析驅(qū)動(dòng)程序在不同的集群管理器的部署細(xì)節(jié)。
4.SparkContext解析
在詳細(xì)解析SparkContext實(shí)例前,首先查看一下SparkContext類的注釋部分,具體如下所示。
1. /** 2. * Spark功能的主入口點(diǎn)。一個(gè)SparkContext代表連接到Spark集群,并可用于在集群中 * 創(chuàng)建RDDs、累加器和廣播變量 3. * ...... 4. * @param 描述應(yīng)用程序配置的配置對象。在該配置的任何設(shè)置將覆蓋默認(rèn)的配置以及系統(tǒng)屬性 5. */
SparkContext類是Spark功能的主入口點(diǎn)。一個(gè)SparkContext實(shí)例代表了與一個(gè)Spark集群的連接,并且通過該實(shí)例,可以在集群中構(gòu)建RDDs、累加器以及廣播變量。SparkContext實(shí)例的構(gòu)建參數(shù)config描述了應(yīng)用程序的Spark配置。在該參數(shù)中指定的配置屬性會(huì)覆蓋默認(rèn)的配置屬性以及系統(tǒng)屬性。
在SparkContext類文件中定義了一個(gè)描述集群管理器類型的單例對象SparkMasterRegex,在該對象中詳細(xì)給出了當(dāng)前Spark支持的各種集群管理器類型。
SparkContext.scala的源碼如下。
1. /** 2. * 定義了從Master信息中抽取集群管理器類型的一個(gè)正則表達(dá)式集合 3. * 4. */ 5. private object SparkMasterRegex { 6. //對應(yīng)Master格式如local[N] 和local[*]的正則表達(dá)式 7. //對應(yīng)的Master格式如local[N]和local[*]的正則表達(dá)式 8. val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r 9. 10. //對應(yīng)的Master格式如local[N, maxRetries]的正則表達(dá)式 11. //這種集群管理器類型用于具有任務(wù)失敗嘗試功能的測試 12. 13. val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r 14. 15. //一種模擬Spark集群的本地模式的正則表達(dá)式,對應(yīng)的Master格式如local-cluster[N, 16. //cores, memory] 17. 18. LOCAL_CLUSTER_REGEX = """local-cluster\ [\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s* ([0-9]+)\s*]""".r 19. 20. //連接Spark部署集群的正則表達(dá)式 21. 22. val SPARK_REGEX = """spark://(.*)""".r 23. }
在SparkContext類中的主要流程可以歸納如下:
(1)createSparkEnv:創(chuàng)建Spark的執(zhí)行環(huán)境對應(yīng)的SparkEnv實(shí)例。
對應(yīng)代碼如下所示。
1. //Create the Spark execution environment (cache, map output tracker, //etc) 2. _env = createSparkEnv(_conf, isLocal, listenerBus) 3. SparkEnv.set(_env)
(2)createTaskScheduler:創(chuàng)建作業(yè)調(diào)度器實(shí)例。
對應(yīng)代碼如下所示。
1. //創(chuàng)建和啟動(dòng)調(diào)度器scheduler 2. val (sched, ts) = SparkContext.createTaskScheduler(this, master) 3. _schedulerBackend = sched 4. _taskScheduler = ts
其中,TaskScheduler是低層次的任務(wù)調(diào)度器,負(fù)責(zé)任務(wù)的調(diào)度。通過該接口提供可插拔的任務(wù)調(diào)度器。每個(gè)TaskScheduler負(fù)責(zé)調(diào)度一個(gè)SparkContext實(shí)例中的任務(wù),負(fù)責(zé)調(diào)度上層DAG調(diào)度器中每個(gè)Stage提交的任務(wù)集(TaskSet),并將這些任務(wù)提交到集群中運(yùn)行,在任務(wù)提交執(zhí)行時(shí),可以使用失敗重試機(jī)制設(shè)置失敗重試的次數(shù)。上述對應(yīng)高層的DAG調(diào)度器的實(shí)例構(gòu)建參見下一步。
(3)new DAGScheduler:創(chuàng)建高層Stage調(diào)度的DAG調(diào)度器實(shí)例。
對應(yīng)代碼如下。
1. _dagScheduler = new DAGScheduler(this)
DAGScheduler是高層調(diào)度模塊,負(fù)責(zé)作業(yè)(Job)的Stage拆分,以及最終將Stage對應(yīng)的任務(wù)集提交到低層次的任務(wù)調(diào)度器上。
下面基于這些主要流程,針對SparkMasterRegex單例對象中給出的各種集群部署模式進(jìn)行解析。對應(yīng)不同集群模式,這些流程中構(gòu)建了包括TaskScheduler與SchedulerBackend的不同的具體子類,所構(gòu)建的相關(guān)實(shí)例具體見表6-6。
表6-6 各種情況下TaskScheduler與SchedulerBackend的不同的具體子類

與TaskScheduler和SchedulerBackend不同的是,在不同集群模式中,應(yīng)用程序的高層調(diào)度器DAGScheduler的實(shí)例是相同的,即對應(yīng)在Spark on YARN與Mesos等集群管理器中,應(yīng)用程序內(nèi)部的高層Stage調(diào)度是相同的。
- Excel 2007函數(shù)與公式自學(xué)寶典
- 商戰(zhàn)數(shù)據(jù)挖掘:你需要了解的數(shù)據(jù)科學(xué)與分析思維
- Photoshop CS4經(jīng)典380例
- Apache Hive Essentials
- 基于ARM 32位高速嵌入式微控制器
- 傳感器與物聯(lián)網(wǎng)技術(shù)
- 網(wǎng)絡(luò)化分布式系統(tǒng)預(yù)測控制
- 步步圖解自動(dòng)化綜合技能
- 內(nèi)模控制及其應(yīng)用
- Microsoft System Center Confi guration Manager
- 智能生產(chǎn)線的重構(gòu)方法
- 人工智能:語言智能處理
- 筆記本電腦維修之電路分析基礎(chǔ)
- ADuC系列ARM器件應(yīng)用技術(shù)
- 51單片機(jī)應(yīng)用程序開發(fā)與實(shí)踐