- Spark內核設計的藝術:架構設計與實現
- 耿嘉安
- 2360字
- 2019-01-04 04:03:48
1.2 Spark初體驗
本節通過Spark的基本使用,讓讀者對Spark能有初步的認識,以引導讀者逐步深入學習。
1.2.1 運行spark-shell
圖1-3中顯示了很多信息,這里進行一些說明。
安裝完Spark 2.1.0后,如果沒有明確指定log4j的配置,那么Spark會使用core模塊的org/apache/spark/目錄下的log4j-defaults.properties作為log4j的默認配置。log4j-defaults.properties指定的Spark日志級別為WARN。用戶可以到Spark安裝目錄的conf文件夾下,從log4j.properties.template復制一份log4j.properties文件,并在其中增加自己想要的配置。
除了指定log4j.properties文件外,還可以在spark-shell命令行中通過sc.setLog-Level(newLevel)語句指定日志級別。
SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是作者本地安裝Spark的機器的IP地址,4040是SparkContext的Web UI的默認監聽端口。
指定的部署模式(即master)為local[*]。當前應用(Application)的ID為local-1497084620457。
可以在spark-shell命令行通過sc使用SparkContext,通過spark使用SparkSession。sc和spark實際分別是SparkContext和SparkSession在Spark REPL中的變量名,具體細節將在1.2.3節分析。
由于Spark Core的默認日志級別是WARN,所以看到的信息不是很多。現在我們將Spark安裝目錄的conf文件夾下的log4j.properties.template通過如下命令復制出一份:
cp log4j.properties.template log4j.properties
并將log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改為log4j. logger.org.apache.spark.repl.Main=INFO,然后我們再次運行spark-shell,將打印出更豐富的信息,如圖1-4所示。

圖1-4 Spark啟動過程打印的部分信息
從圖1-4展示的啟動日志中我們可以看到SecurityManager、SparkEnv、BlockManager-MasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlock-TransferService、BlockManager、BlockManagerMaster等信息。它們是做什么的?剛剛接觸Spark的讀者只需要知道這些信息即可,具體內容將在后邊進行詳細的介紹。
1.2.2 執行word count
這一節,我們通過word count這個耳熟能詳的例子來感受下Spark任務的執行過程。啟動spark-shell后,會打開Scala命令行,然后按照以下步驟輸入腳本。
1)輸入val lines = sc.textFile("../README.md", 2),以Spark安裝目錄下的README. md文件內容作為word count例子的數據源,執行結果如圖1-5所示。

圖1-5 步驟1執行結果
圖1-5告訴我們,lines的實際類型是MapPartitionsRDD。
2)textFile方法對文本文件是逐行讀取的,我們需要輸入val words = lines.flatMap(line=> line.split(" ")),將每行文本按照空格分隔以得到每個單詞,執行結果如圖1-6所示。

圖1-6 步驟2執行結果
圖1-6告訴我們,lines在經過flatMap方法的轉換后,得到的words的實際類型也是MapPartitionsRDD。
3)對于得到的每個單詞,通過輸入val ones = words.map(w => (w,1)),將每個單詞的計數初始化為1,執行結果如圖1-7所示。

圖1-7 步驟3執行結果
圖1-7告訴我們,words在經過map方法的轉換后,得到的ones的實際類型也是MapPartitionsRDD。
4)輸入val counts = ones.reduceByKey(_ + _),對單詞進行計數值的聚合,執行結果如圖1-8所示。

圖1-8 步驟4執行結果
圖1-8告訴我們,ones在經過reduceByKey方法的轉換后,得到的counts的實際類型是ShuffledRDD。
5)輸入counts.foreach(println),將每個單詞的計數值打印出來,作業的執行過程如圖1-9和圖1-10所示。作業的輸出結果如圖1-11所示。

圖1-9 步驟5執行過程第一部分

圖1-10 步驟5執行過程第二部分

圖1-11 步驟5輸出結果
圖1-9和圖1-10展示了很多作業提交、執行的信息,這里挑選關鍵的內容進行介紹。
SparkContext為提交的Job生成的ID是0。
word count例子一共產生了4個RDD
,被劃分為ResultStage和ShuffleMapStage。ShuffleMapStage的ID為0,嘗試號為0。ResultStage的ID為1,嘗試號也為0。在Spark中,如果Stage沒有執行完成,就會進行多次重試。Stage無論是首次執行還是重試,都被視為是一次Stage嘗試(stage attempt),每次嘗試都有一個唯一的嘗試號(attempt number)。
由于Job有兩個分區,所以ShuffleMapStage和ResultStage都有兩個Task被提交。每個Task也會有多次嘗試,因而也有屬于Task的嘗試號。從圖中看出,Shuffle-MapStage中的兩個Task和ResultStage中的兩個Task的嘗試號也都是0。
HadoopRDD則用于讀取文件內容。
圖1-11展示了單詞計數的輸出結果和最后打印的任務結束的日志信息。
筆者在本節介紹的word count例子是以SparkContext的API來實現的,讀者也可以選擇在spark-shell中通過運用SparkSession的API來實現。本書在第10章將介紹Spark源碼自帶的用SparkSession的API來實現的word count的Java應用程序。
1.2.3 剖析spark-shell
通過在spark-shell中執行word count的過程,讓讀者了解到可以使用spark-shell提交Spark作業。現在讀者應該很想知道spark-shell究竟做了什么呢?
1. 腳本分析
在Spark安裝目錄的bin文件夾下可以找到spark-shell,其中有代碼清單1-1所示的一段腳本。
代碼清單1-1 spark-shell腳本
function main() { if $cygwin; then stty -icanon min 1-echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }
我們看到腳本spark-shell里執行了spark-submit腳本,那么打開spark-submit腳本,發現代碼清單1-2中所示的腳本。
代碼清單1-2 spark-submit腳本
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
可以看到spark-submit中又執行了腳本spark-class。打開腳本spark-class,首先發現以下一段腳本:
# Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi
上面的腳本是為了找到Java命令。在spark-class腳本中還會找到以下內容:
build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@")
根據代碼清單1-2,腳本spark-submit在執行spark-class腳本時,給它增加了參數SparkSubmit。所以讀到這里,應該知道Spark啟動了以SparkSubmit為主類的JVM進程。
2. 遠程監控
為便于在本地對Spark進程進行遠程監控,在spark-shell腳本中找到以下配置:
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"
并追加以下jmx配置:
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote. ssl=false
如果Spark安裝在其他機器,那么在本地打開Java VisualVM后需要添加遠程主機,如圖1-12所示。

圖1-12 添加遠程主機
右擊已添加的遠程主機,添加JMX連接,如圖1-13所示。

圖1-13 添加JMX連接
如果Spark安裝在本地,那么打開Java VisualVM后就會在應用程序窗口中看到org. apache.spark.deploy.SparkSubmit進程,只需雙擊即可。
選擇右側的“線程”選項卡,選擇main線程,然后點擊“線程Dump”按鈕,如圖1-14所示。

圖1-14 查看Spark線程
從線程Dump的內容中找到線程main的信息,如代碼清單1-3所示。
代碼清單1-3 main線程的Dump信息
"main" #1 prio=5 os_prio=31 tid=0x00007fa012802000 nid=0x1303 runnable [0x0000000- 10d11c000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.read0(Native Method) at java.io.FileInputStream.read(FileInputStream.java:207) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246) at jline.internal.InputStreamReader.read(InputStreamReader.java:261) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.InputStreamReader.read(InputStreamReader.java:198) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269) at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2. apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2. apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveR eader.scala:44) at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive- Reader.scala:37) at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28) at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop. scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClass Loader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:68) at org.apache.spark.repl.Main$.main(Main.scala:51) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$r unMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
從main線程的棧信息中可以看出程序的調用順序:SparkSubmit.main→repl.Main→Iloop.process。
3. 源碼分析
我們根據上面的線索,直接閱讀Iloop的process方法的源碼,如代碼清單1-4所示。
代碼清單1-4 process的實現
def process(settings: Settings): Boolean = savingContextLoader { this.settings = settings createInterpreter() // sets in to some kind of reader depending on environmental cues in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) globalFuture = future { intp.initializeSynchronous() loopPostInit() !intp.reporter.hasErrors } loadFiles(settings) printWelcome() try loop() match { case LineResults.EOF => out print Properties.shellInterruptedString case _ => } catch AbstractOrMissingHandler() finally closeInterpreter() true }
根據代碼清單1-4, Iloop的process方法調用了loadFiles方法。Spark中的SparkILoop繼承了Iloop并重寫了loadFiles方法,其實現如下:
override def loadFiles(settings: Settings): Unit = { initializeSpark() super.loadFiles(settings) }
根據上面展示的代碼,loadFiles方法調用了SparkILoop的initializeSpark方法,initialize Spark的實現如代碼清單1-5所示。
代碼清單1-5 initializeSpark的實現
def initializeSpark() { intp.beQuietDuring { processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession ! = null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } @transient val sc = { val _sc = spark.sparkContext if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) if (proxyUrl ! = null) { println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_ sc.applicationId}") } else { println(s"Spark Context Web UI is available at Spark Master Public URL") } } else { _sc.uiWebUrl.foreach { webUrl => println(s"Spark context Web UI available at ${webUrl}") } } println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }
我們看到,initializeSpark向交互式shell發送了一大串代碼,Scala的交互式shell將調用org.apache.spark.repl.Main的createSparkSession方法(見代碼清單1-6),創建Spark-Session。我們看到常量spark將持有SparkSession的引用,并且sc持有SparkSession內部初始化好的SparkContext。所以我們才能夠在spark-shell的交互式shell中使用sc和spark。
代碼清單1-6 createSparkSession的實現
def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri ! = null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") ! = null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } val builder = SparkSession.builder.config(conf) if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { sparkSession = builder.getOrCreate() logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext sparkSession }
根據代碼清單1-6所示,createSparkSession方法通過SparkSession的API創建Spark-Session實例。本書將有關SparkSession等API的內容放在第10章進行講解,初次接觸Spark的讀者現在只需要了解即可。
- The Complete Rust Programming Reference Guide
- Learning Microsoft Windows Server 2012 Dynamic Access Control
- Python機器學習:數據分析與評分卡建模(微課版)
- GraphQL學習指南
- Python數據可視化:基于Bokeh的可視化繪圖
- 深入淺出Electron:原理、工程與實踐
- arc42 by Example
- Manga Studio Ex 5 Cookbook
- 樂學Web編程:網站制作不神秘
- Developing Middleware in Java EE 8
- 跟小海龜學Python
- 軟件架構:Python語言實現
- 小學生C++創意編程(視頻教學版)
- 網站構建技術
- HTML+CSS+JavaScript網頁制作:從入門到精通(第4版)