官术网_书友最值得收藏!

1.2 Spark初體驗

本節通過Spark的基本使用,讓讀者對Spark能有初步的認識,以引導讀者逐步深入學習。

1.2.1 運行spark-shell

圖1-3中顯示了很多信息,這里進行一些說明。

安裝完Spark 2.1.0后,如果沒有明確指定log4j的配置,那么Spark會使用core模塊的org/apache/spark/目錄下的log4j-defaults.properties作為log4j的默認配置。log4j-defaults.properties指定的Spark日志級別為WARN。用戶可以到Spark安裝目錄的conf文件夾下,從log4j.properties.template復制一份log4j.properties文件,并在其中增加自己想要的配置。

除了指定log4j.properties文件外,還可以在spark-shell命令行中通過sc.setLog-Level(newLevel)語句指定日志級別。

SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是作者本地安裝Spark的機器的IP地址,4040是SparkContext的Web UI的默認監聽端口。

指定的部署模式(即master)為local[*]。當前應用(Application)的ID為local-1497084620457。

可以在spark-shell命令行通過sc使用SparkContext,通過spark使用SparkSession。sc和spark實際分別是SparkContext和SparkSession在Spark REPL中的變量名,具體細節將在1.2.3節分析。

由于Spark Core的默認日志級別是WARN,所以看到的信息不是很多。現在我們將Spark安裝目錄的conf文件夾下的log4j.properties.template通過如下命令復制出一份:

    cp log4j.properties.template log4j.properties

并將log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改為log4j. logger.org.apache.spark.repl.Main=INFO,然后我們再次運行spark-shell,將打印出更豐富的信息,如圖1-4所示。

圖1-4 Spark啟動過程打印的部分信息

從圖1-4展示的啟動日志中我們可以看到SecurityManager、SparkEnv、BlockManager-MasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlock-TransferService、BlockManager、BlockManagerMaster等信息。它們是做什么的?剛剛接觸Spark的讀者只需要知道這些信息即可,具體內容將在后邊進行詳細的介紹。

1.2.2 執行word count

這一節,我們通過word count這個耳熟能詳的例子來感受下Spark任務的執行過程。啟動spark-shell后,會打開Scala命令行,然后按照以下步驟輸入腳本。

1)輸入val lines = sc.textFile("../README.md", 2),以Spark安裝目錄下的README. md文件內容作為word count例子的數據源,執行結果如圖1-5所示。

圖1-5 步驟1執行結果

圖1-5告訴我們,lines的實際類型是MapPartitionsRDD這里特別說明一點,在生成MapPartitionsRDD之前還實例化了HadoopRDD,作為MapPartitionsRDD的上游RDD。

2)textFile方法對文本文件是逐行讀取的,我們需要輸入val words = lines.flatMap(line=> line.split(" ")),將每行文本按照空格分隔以得到每個單詞,執行結果如圖1-6所示。

圖1-6 步驟2執行結果

圖1-6告訴我們,lines在經過flatMap方法的轉換后,得到的words的實際類型也是MapPartitionsRDD。

3)對于得到的每個單詞,通過輸入val ones = words.map(w => (w,1)),將每個單詞的計數初始化為1,執行結果如圖1-7所示。

圖1-7 步驟3執行結果

圖1-7告訴我們,words在經過map方法的轉換后,得到的ones的實際類型也是MapPartitionsRDD。

4)輸入val counts = ones.reduceByKey(_ + _),對單詞進行計數值的聚合,執行結果如圖1-8所示。

圖1-8 步驟4執行結果

圖1-8告訴我們,ones在經過reduceByKey方法的轉換后,得到的counts的實際類型是ShuffledRDD。

5)輸入counts.foreach(println),將每個單詞的計數值打印出來,作業的執行過程如圖1-9和圖1-10所示。作業的輸出結果如圖1-11所示。

圖1-9 步驟5執行過程第一部分

圖1-10 步驟5執行過程第二部分

圖1-11 步驟5輸出結果

圖1-9和圖1-10展示了很多作業提交、執行的信息,這里挑選關鍵的內容進行介紹。

SparkContext為提交的Job生成的ID是0。

word count例子一共產生了4個RDD這里沒有算入最上游的HadoopRDD。,被劃分為ResultStage和ShuffleMapStage。ShuffleMapStage的ID為0,嘗試號為0。ResultStage的ID為1,嘗試號也為0。在Spark中,如果Stage沒有執行完成,就會進行多次重試。Stage無論是首次執行還是重試,都被視為是一次Stage嘗試(stage attempt),每次嘗試都有一個唯一的嘗試號(attempt number)。

由于Job有兩個分區,所以ShuffleMapStage和ResultStage都有兩個Task被提交。每個Task也會有多次嘗試,因而也有屬于Task的嘗試號。從圖中看出,Shuffle-MapStage中的兩個Task和ResultStage中的兩個Task的嘗試號也都是0。

HadoopRDD則用于讀取文件內容。

圖1-11展示了單詞計數的輸出結果和最后打印的任務結束的日志信息。

筆者在本節介紹的word count例子是以SparkContext的API來實現的,讀者也可以選擇在spark-shell中通過運用SparkSession的API來實現。本書在第10章將介紹Spark源碼自帶的用SparkSession的API來實現的word count的Java應用程序。

1.2.3 剖析spark-shell

通過在spark-shell中執行word count的過程,讓讀者了解到可以使用spark-shell提交Spark作業。現在讀者應該很想知道spark-shell究竟做了什么呢?

1. 腳本分析

在Spark安裝目錄的bin文件夾下可以找到spark-shell,其中有代碼清單1-1所示的一段腳本。

代碼清單1-1 spark-shell腳本

    function main() {
      if $cygwin; then
        stty -icanon min 1-echo > /dev/null 2>&1
        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
        "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main  --name
            "Spark shell" "$@"
        stty icanon echo > /dev/null 2>&1
      else
        export SPARK_SUBMIT_OPTS
        "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main  --name
            "Spark shell" "$@"
      fi
    }

我們看到腳本spark-shell里執行了spark-submit腳本,那么打開spark-submit腳本,發現代碼清單1-2中所示的腳本。

代碼清單1-2 spark-submit腳本

    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

可以看到spark-submit中又執行了腳本spark-class。打開腳本spark-class,首先發現以下一段腳本:

    # Find the java binary
    if [ -n "${JAVA_HOME}" ]; then
      RUNNER="${JAVA_HOME}/bin/java"
    else
      if [ "$(command -v java)" ]; then
        RUNNER="java"
      else
        echo "JAVA_HOME is not set" >&2
        exit 1
      fi
    fi

上面的腳本是為了找到Java命令。在spark-class腳本中還會找到以下內容:

    build_command() {
      "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d\0" $?
    }
    CMD=()
    while IFS= read -d '' -r ARG; do
      CMD+=("$ARG")
    done < <(build_command "$@")

根據代碼清單1-2,腳本spark-submit在執行spark-class腳本時,給它增加了參數SparkSubmit。所以讀到這里,應該知道Spark啟動了以SparkSubmit為主類的JVM進程。

2. 遠程監控

為便于在本地對Spark進程進行遠程監控,在spark-shell腳本中找到以下配置:

    SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

并追加以下jmx配置:

    -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote. ssl=false

如果Spark安裝在其他機器,那么在本地打開Java VisualVM后需要添加遠程主機,如圖1-12所示。

圖1-12 添加遠程主機

右擊已添加的遠程主機,添加JMX連接,如圖1-13所示。

圖1-13 添加JMX連接

如果Spark安裝在本地,那么打開Java VisualVM后就會在應用程序窗口中看到org. apache.spark.deploy.SparkSubmit進程,只需雙擊即可。

選擇右側的“線程”選項卡,選擇main線程,然后點擊“線程Dump”按鈕,如圖1-14所示。

圖1-14 查看Spark線程

從線程Dump的內容中找到線程main的信息,如代碼清單1-3所示。

代碼清單1-3 main線程的Dump信息

    "main" #1 prio=5 os_prio=31 tid=0x00007fa012802000 nid=0x1303 runnable [0x0000000-
        10d11c000]
      java.lang.Thread.State: RUNNABLE
      at java.io.FileInputStream.read0(Native Method)
      at java.io.FileInputStream.read(FileInputStream.java:207)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246)
      at jline.internal.InputStreamReader.read(InputStreamReader.java:261)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.internal.InputStreamReader.read(InputStreamReader.java:198)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145)
      at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349)
      at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269)
      at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57)
      at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.
          apply(InteractiveReader.scala:37)
      at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.
          apply(InteractiveReader.scala:37)
      at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveR
          eader.scala:44)
      at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-
          Reader.scala:37)
      at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28)
      at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404)
      at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.
          scala:923)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClass
        Loader.scala:97)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
    at org.apache.spark.repl.Main$.doMain(Main.scala:68)
    at org.apache.spark.repl.Main$.main(Main.scala:51)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
        java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.
        java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$r
        unMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

從main線程的棧信息中可以看出程序的調用順序:SparkSubmit.main→repl.Main→Iloop.process。

3. 源碼分析

我們根據上面的線索,直接閱讀Iloop的process方法的源碼Iloop是Scala語言自身的類庫中的用于實現交互式shell的實現類,提供對REPL(Read-eval-print-loop)的實現。,如代碼清單1-4所示。

代碼清單1-4 process的實現

    def process(settings: Settings): Boolean = savingContextLoader {
      this.settings = settings
      createInterpreter()
      // sets in to some kind of reader depending on environmental cues
      in =  in0.fold(chooseReader(settings))(r  =>  SimpleReader(r,  out,  interactive  =
          true))
      globalFuture = future {
        intp.initializeSynchronous()
        loopPostInit()
        !intp.reporter.hasErrors
      }
      loadFiles(settings)
        printWelcome()
      try loop() match {
        case LineResults.EOF => out print Properties.shellInterruptedString
        case _                   =>
      }
      catch AbstractOrMissingHandler()
      finally closeInterpreter()
      true
    }

根據代碼清單1-4, Iloop的process方法調用了loadFiles方法。Spark中的SparkILoop繼承了Iloop并重寫了loadFiles方法,其實現如下:

    override def loadFiles(settings: Settings): Unit = {
      initializeSpark()
      super.loadFiles(settings)
    }

根據上面展示的代碼,loadFiles方法調用了SparkILoop的initializeSpark方法,initialize Spark的實現如代碼清單1-5所示。

代碼清單1-5 initializeSpark的實現

    def initializeSpark() {
      intp.beQuietDuring {
        processLine("""
          @transient val spark = if (org.apache.spark.repl.Main.sparkSession ! = null) {
              org.apache.spark.repl.Main.sparkSession
            } else {
              org.apache.spark.repl.Main.createSparkSession()
            }
          @transient val sc = {
            val _sc = spark.sparkContext
            if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
              val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
              if (proxyUrl ! = null) {
                println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_
                  sc.applicationId}")
              } else {
                println(s"Spark Context Web UI is available at Spark Master Public URL")
                }
              } else {
                _sc.uiWebUrl.foreach {
                  webUrl => println(s"Spark context Web UI available at ${webUrl}")
                }
              }
              println("Spark context available as 'sc' " +
                s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
              println("Spark session available as 'spark'.")
              _sc
            }
            """)
          processLine("import org.apache.spark.SparkContext._")
          processLine("import spark.implicits._")
          processLine("import spark.sql")
          processLine("import org.apache.spark.sql.functions._")
          replayCommandStack = Nil // remove above commands from session history.
        }
      }

我們看到,initializeSpark向交互式shell發送了一大串代碼,Scala的交互式shell將調用org.apache.spark.repl.Main的createSparkSession方法(見代碼清單1-6),創建Spark-Session。我們看到常量spark將持有SparkSession的引用,并且sc持有SparkSession內部初始化好的SparkContext。所以我們才能夠在spark-shell的交互式shell中使用sc和spark。

代碼清單1-6 createSparkSession的實現

    def createSparkSession(): SparkSession = {
      val execUri = System.getenv("SPARK_EXECUTOR_URI")
      conf.setIfMissing("spark.app.name", "Spark shell")
      conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
      if (execUri ! = null) {
        conf.set("spark.executor.uri", execUri)
      }
      if (System.getenv("SPARK_HOME") ! = null) {
        conf.setSparkHome(System.getenv("SPARK_HOME"))
      }
      val builder = SparkSession.builder.config(conf)
      if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") {
        if (SparkSession.hiveClassesArePresent) {
          sparkSession = builder.enableHiveSupport().getOrCreate()
          logInfo("Created Spark session with Hive support")
        } else {
          builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
          sparkSession = builder.getOrCreate()
          logInfo("Created Spark session")
        }
      } else {
        sparkSession = builder.getOrCreate()
        logInfo("Created Spark session")
      }
      sparkContext = sparkSession.sparkContext
      sparkSession
    }

根據代碼清單1-6所示,createSparkSession方法通過SparkSession的API創建Spark-Session實例。本書將有關SparkSession等API的內容放在第10章進行講解,初次接觸Spark的讀者現在只需要了解即可。

主站蜘蛛池模板: 吉首市| 福鼎市| 呼和浩特市| 罗源县| 凌源市| 宁波市| 鄂托克前旗| 海门市| 宁都县| 陕西省| 丰镇市| 五常市| 新建县| 将乐县| 望奎县| 肥西县| 友谊县| 天峻县| 泽库县| 沈丘县| 永州市| 邳州市| 中卫市| 宁明县| 微山县| 安达市| 独山县| 新乡市| 前郭尔| 全州县| 池州市| 宜宾市| 阳原县| 会东县| 黑河市| 新乡县| 新龙县| 义马市| 天等县| 西吉县| 历史|