官术网_书友最值得收藏!

3.8 通過WordCount實戰解析Spark RDD內部機制

本節通過Spark WordCount動手實踐,編寫單詞計數代碼;在wordcount.scala的基礎上,從數據流動的視角深入分析Spark RDD的數據處理過程。

3.8.1 Spark WordCount動手實踐

本節進行Spark WordCount動手實踐。首先建立一個文本文件helloSpark.txt,將文本文件放到文件目錄data/wordcount/中。helloSpark.txt的文本內容如下。

1.  Hello Spark Hello Scala
2.  Hello Hadoop
3.  Hello Flink
4.  Spark is Awesome

在IDEA中編寫wordcount.scala的代碼如下。

1.   package com.dt.spark.sparksql
2.  import org.apache.spark.SparkConf
3.  import org.apache.spark.SparkContext
4.  import org.apache.spark.rdd.RDD
5.  /**
6.    * 使用Scala開發本地測試的Spark WordCount程序
7.    * @author DT大數據夢工廠
8.    * 新浪微博:http://weibo.com/ilovepains/
9.    */
10. object WordCount {
11.   def main(args: Array[String]){
12.     /**
13.       * 第1步:創建Spark的配置對象SparkConf,設置Spark程序運行時的配置信息,
          * 例如,通過setMaster設置程序要鏈接的Spark集群的Master的URL,如果設置
          * 為local,則代表Spark程序在本地運行,特別適合于機器配置非常差(如只有1GB
          * 的內存)的初學者
14.       */
15.
16.     val conf = new SparkConf() //創建SparkConf對象
17.     conf.setAppName("Wow,My First Spark App!")
                                 //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
18.     conf.setMaster("local") //此時程序在本地運行,不需要安裝Spark集群
19.
20.     /**
21.       * 第2步:創建SparkContext對象
22.       * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
          * R等都必須有一個SparkContext
23.       * SparkContext    核心作用:初始化 Spark   應用程序,運行所需要的核心組件,包括
          * DAGScheduler、TaskScheduler、SchedulerBackend,同時還會負責Spark程
          * 序往Master注冊程序等,SparkContext是整個Spark應用程序中至關重要的一個對象
24.       */
25.     val sc = new SparkContext(conf)
                                 //創建SparkContext對象,通過傳入SparkConf實例來定
                                 //制Spark運行的具體參數和配置信息
26.
27.     /**
28.       * 第 3  步:根據具體的數據來源(如  HDFS、HBase、Local FS、DB、S3      等)通過
          * SparkContext來創建RDD
29.       * RDD的創建有3種方式:根據外部的數據來源(如HDFS)、根據Scala集合、由其他
          * 的RDD操作
30.       * 數據會被RDD劃分成為一系列的Partitions,分配到每個Partition的數據屬于
          * 一個Task的處理范疇
31.       */
32.
33.     val lines = sc.textFile("data/wordcount/helloSpark.txt", 1)
                                               //讀取本地文件并設置為一個Partition
34.
35.     /**
36.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
37.       *  第4.1步:將每一行的字符串拆分成單個單詞
38.       */
39.     val words = lines.flatMap { line => line.split(" ")}
                                          //對每一行的字符串進行單詞拆分,并把所有行的拆
                                          //分結果通過flat合并成為一個大的單詞集合
40.     /**
41.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
42.       *  第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word, 1)
43.       */
44.     val pairs = words.map { word => (word, 1) }
45.
46.     /**
47.       * 第4步:對初始的RDD進行Transformation級別的處理,如通過map、filter等
          * 高階函數等的編程,進行具體的數據計算
48.       *  第4.3步:在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數
49.       */
50.     val wordCountsOdered = pairs.reduceByKey(_+_).map(pair => (pair._2,
        pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
        //對相同的Key,進行Value的累計(包括Local和Reducer級別,同時Reduce)
51.     wordCountsOdered.collect.foreach(wordNumberPair => println
        (wordNumberPair._1 + " : " + wordNumberPair._2))
52.     sc.stop()
53.
54.   }
55. }

在IDEA中運行程序,wordcount.scala的運行結果如下:

1.   ......
2.   17/05/21 21:19:07 INFO DAGScheduler: Job 0 finished: collect at
     WordCount.scala:60, took 0.957991 s
3.  Hello : 4
4.  Spark : 2
5.  Awesome : 1
6.  Flink : 1
7.  is : 1
8.  Scala : 1
9.  Hadoop : 1
10. ......

3.8.2 解析RDD生成的內部機制

下面詳細解析一下wordcount.scala的運行原理。

(1)從數據流動視角解密WordCount,使用Spark作單詞計數統計,搞清楚數據到底是怎么流動的。

(2)從RDD依賴關系的視角解密WordCount。Spark中的一切操作都是RDD,后面的RDD對前面的RDD有依賴關系。

(3)DAG與血統Lineage的思考。

在wordcount.scala的基礎上,我們從數據流動的視角分析數據到底是怎么處理的。我們繪制一張WordCount數據處理過程圖,由于圖片較大,為了方便閱讀,將原圖分成兩張圖,如圖3-11和圖3-12所示。

圖3-11 WordCount圖1

圖3-12 WordCount圖2

數據在生產環境中默認在HDFS中進行分布式存儲,如果在分布式集群中,我們的機器會分成不同的節點對數據進行處理,這里我們在本地測試,重點關注數據是怎么流動的。處理的第一步是獲取數據,讀取數據會生成HadoopRDD。

在WordCount.scala中,單擊sc.textFile進入Spakr框架,SparkContext.scala的textFile的源碼如下。

1.  def textFile(
2.      path: String,
3.      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
4.    assertNotStopped()
5.    hadoopFile(path, classOf[TextInputFormat],            classOf[LongWritable],
      classOf[Text],
6.      minPartitions).map(pair => pair._2.toString).setName(path)
7.  }

下面看一下hadoopFile的源碼,通過new()函數創建一個HadoopRDD,HadoopRDD從Hdfs上讀取分布式數據,并且以數據分片的方式存在于集群中。所謂的數據分片,就是把我們要處理的數據分成不同的部分,例如,在集群中有4個節點,粗略的劃分可以認為將數據分成4個部分,4條語句就分成4個部分。例如,Hello Spark在第一臺機器上,Hello Hadoop在第二臺機器上,Hello Flink在第三臺機器上,Spark is Awesome在第四臺機器上。HadoopRDD幫助我們從磁盤上讀取數據,計算的時候會分布式地放入內存中,Spark運行在Hadoop上,要借助Hadoop來讀取數據。

Spark的特點包括:分布式、基于內存(部分基于磁盤)、可迭代;默認分片策略Block多大,分片就多大。但這種說法不完全準確,因為分片記錄可能跨兩個Block,所以一個分片不會嚴格地等于Block的大小。例如,HDFS的Block大小是128MB的話,分片可能多幾個字節或少幾個字節。分片不一定小于128MB,因為如果最后一條記錄跨兩個Block,分片會把最后一條記錄放在前一個分片中。這里,HadoopRDD用了4個數據分片,設想為128M左右。

hadoopFile的源碼如下。

1.   def hadoopFile[K, V](
2.        path: String,
3.        inputFormatClass: Class[_ <: InputFormat[K, V]],
4.        keyClass: Class[K],
5.        valueClass: Class[V],
6.        minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
7.      assertNotStopped()
8.
9.      //加載hdfs-site.xml配置文件
10.     //詳情參閱Spark-11227
11.     FileSystem.getLocal(hadoopConfiguration)
12.
13.     //Hadoop配置文件大約有10 KB,相當大,所以進行廣播
14.     val confBroadcast = broadcast(new SerializableConfiguration
        (hadoopConfiguration))
15.     val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.
        setInputPaths(jobConf, path)
16.     new HadoopRDD(
17.       this,
18.       confBroadcast,
19.       Some(setInputPathsFunc),
20.       inputFormatClass,
21.       keyClass,
22.       valueClass,
23.       minPartitions).setName(path)
24.   }

SparkContext.scala的textFile源碼中,調用hadoopFile方法后進行了map轉換操作,map對讀取的每一行數據進行轉換,讀入的數據是一個Tuple,Key值為索引,Value值為每行數據的內容,生成MapPartitionsRDD。這里,map(pair => pair._2.toString)是基于HadoopRDD產生的Partition去掉的行Key產生的Value,第二個元素是讀取的每行數據內容。MapPartitionsRDD是Spark框架產生的,運行中可能產生一個RDD,也可能產生兩個RDD。例如,textFile中Spark框架就產生了兩個RDD,即HadoopRDD和MapPartitionsRDD。下面是map的源碼。

1.  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map
      (cleanF))
4.  }

我們看一下WordCount業務代碼,對讀取的每行數據進行flatMap轉換。這里,flatMap對RDD中的每一個Partition的每一行數據內容進行單詞切分,如有4個Partition分別進行單詞切分,將“Hello Spark”切分成單詞“Hello”和“Spark”,對每一個Partition中的每一行進行單詞切分并合并成一個大的單詞實例的集合。flatMap轉換生成的仍然是MapPartitionsRDD:

RDD.scala的flatMap的源碼如下。

1.  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap
      (cleanF))
4.  }

繼續WordCount業務代碼,words.map { word => (word, 1) }通過map轉換將單詞切分以后單詞計數為1。例如,將單詞“Hello”和“Spark”變成(Hello,1),(Spark,1)。這里生成了MapPartitionsRDD。

RDD.scala的map的源碼如下。

1.  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map
      (cleanF))
4.  }

繼續WordCount業務代碼,計數之后進行一個關鍵的reduceByKey操作,對全局的數據進行計數統計。reduceByKey對相同的Key進行Value的累計(包括Local和Reducer級別,同時Reduce)。reduceByKey在MapPartitionsRDD之后,在Local reduce級別本地進行了統計,這里也是MapPartitionsRDD。例如,在本地將(Hello,1),(Spark,1),(Hello,1),(Scala,1)匯聚成(Hello,2),(Spark,1),(Scala,1)。Shuffle之前的Local Reduce操作主要負責本地局部統計,并且把統計以后的結果按照分區策略放到不同的file。舉一個簡單的例子,如果下一個階段Stage是3個并行度,每個Partition進行local reduce以后,將自己的數據分成3種類型,最簡單的方式是根據HashCode按3取模。

PairRDDFunctions.scala的reduceByKey的源碼如下。

1.  def  reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
2.    reduceByKey(defaultPartitioner(self), func)
3.  }

至此,前面所有的操作都是一個Stage,一個Stage意味著什么:完全基于內存操作。父Stage:Stage內部的操作是基于內存迭代的,也可以進行Cache,這樣速度快很多。不同于Hadoop的Map Redcue,Hadoop Map Redcue每次都要經過磁盤。

reduceByKey在Local reduce本地匯聚以后生成的MapPartitionsRDD仍屬于父Stage;然后reduceByKey展開真正的Shuffle操作,Shuffle是Spark甚至整個分布式系統的性能瓶頸,Shuffle產生ShuffleRDD,ShuffledRDD就變成另一個Stage,為什么是變成另外一個Stage?因為要網絡傳輸,網絡傳輸不能在內存中進行迭代。

從WordCount業務代碼pairs.reduceByKey(_+_)中看一下PairRDDFunctions.scala的reduceByKey的源碼。

1.   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K,
     V)] = self.withScope {
2.    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
3.  }

reduceByKey內部調用了combineByKeyWithClassTag方法。下面看一下PairRDDFunctions. scala的combineByKeyWithClassTag的源碼。

1.   def combineByKeyWithClassTag[C](
2.        createCombiner: V => C,
3.        mergeValue: (C, V) => C,
4.        mergeCombiners: (C, C) => C,
5.        partitioner: Partitioner,
6.        mapSideCombine: Boolean = true,
7.        serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K,
          C)] = self.withScope {
8.      require(mergeCombiners != null, "mergeCombiners must be defined")
        //required as of Spark 0.9.0
9.      if (keyClass.isArray) {
10.       if (mapSideCombine) {
11.         throw new SparkException("Cannot use map-side combining with array
            keys.")
12.       }
13.       if (partitioner.isInstanceOf[HashPartitioner]) {
14.         throw new SparkException("HashPartitioner cannot partition array
            keys.")
15.       }
16.     }
17.     val aggregator = new Aggregator[K, V, C](
18.       self.context.clean(createCombiner),
19.       self.context.clean(mergeValue),
20.       self.context.clean(mergeCombiners))
21.     if (self.partitioner == Some(partitioner)) {
22.       self.mapPartitions(iter => {
23.         val context = TaskContext.get()
24.         new InterruptibleIterator(context, aggregator.combineValuesByKey
            (iter, context))
25.       }, preservesPartitioning = true)
26.     } else {
27.       new ShuffledRDD[K, V, C](self, partitioner)
28.         .setSerializer(serializer)
29.         .setAggregator(aggregator)
30.         .setMapSideCombine(mapSideCombine)
31.     }
32.   }

在combineByKeyWithClassTag方法中就用new()函數創建了ShuffledRDD。

前面假設有4臺機器并行計算,每臺機器在自己的內存中進行迭代計算,現在產生Shuffle,數據就要進行分類,MapPartitionsRDD數據根據Hash已經分好類,我們就抓取MapPartitionsRDD中的數據。我們從第一臺機器中獲取的內容為(Hello,2),從第二臺機器中獲取的內容為(Hello,1),從第三臺機器中獲取的內容為(Hello,1),把所有的Hello都抓過來。同樣,我們把其他的數據(Hadoop,1),(Flink,1)……都抓過來。

這就是Shuffle的過程,根據數據的分類拿到自己需要的數據。注意,MapPartitionsRDD屬于第一個Stage,是父Stage,內部基于內存進行迭代,不需要操作都要讀寫磁盤,所以速度非常快;從計算算子的角度講,reduceByKey發生在哪里?reduceByKey發生的計算過程包括兩個RDD:一個是MapPartitionsRDD;一個是ShuffledRDD。ShuffledRDD要產生網絡通信。

reduceByKey之后,我們將結果收集起來,進行全局級別的reduce,產生reduceByKey的最后結果,如將(Hello,2),(Hello,1),(Hello,1)在內部變成(Hello,4),其他數據也類似統計。這里reduceByKey之后,如果通過Collect將數據收集起來,就會產生MapPartitionsRDD。從Collect的角度講,MapPartitionsRDD的作用是將結果收集起來發送給Driver;從saveAsTextFile輸出到Hdfs的角度講,例如輸出(Hello,4),其中Hello是key,4是Value嗎?不是!這里(Hello,4)就是value,這就需要設計一個key出來。

下面是RDD.scala的saveAsTextFile方法。

1.    def saveAsTextFile(path: String): Unit = withScope {
2.     //https://issues.apache.org/jira/browse/SPARK-2075
3.     //NullWritable在Hadoop 1.+版本中是Comparable,所以編譯器無法發現隱式排
       //序,將使用默認的‘空’。然而,在Hadoop 2.+中是Comparable[NullWritable],
       //編譯器將調用隱式的“排序”方法來創建一個排序的NullWritable。這就是為什么對
       //于Hadoop 1.+版本和Hadoop 2.+版本的saveAsTextFile,編譯器會生成不同的匿
       //名類。因此,這里提供了一個顯式排序的“null”來確保編譯器為saveAsTextFile生
       //成相同的字節碼
4.
5.
6.     val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
7.     val textClassTag = implicitly[ClassTag[Text]]
8.     val r = this.mapPartitions { iter =>
9.       val text = new Text()
10.      iter.map { x =>
11.        text.set(x.toString)
12.        (NullWritable.get(), text)
13.      }
14.    }
15.    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
16.      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
17.  }

RDD.scala的saveAsTextFile方法中的iter.map {x=>text.set(x.toString) (NullWritable.get(), text)},這里,key轉換成Null,value就是內容本身(Hello,4)。saveAsHadoopFile中的TextOutputFormat要求輸出的是key-value的格式,而我們處理的是內容。回顧一下,之前我們在textFile讀入數據的時候,讀入split分片將key去掉了,計算的是value。因此,輸出時,須將丟失的key重新弄進來,這里key對我們沒有意義,但key對Spark框架有意義,只有value對我們有意義。第一次計算的時候我們把key丟棄了,所以最后往HDFS寫結果的時候需要生成key,這符合對稱法則和能量守恒形式。

總結:

第一個Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD。

第二個Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD。

主站蜘蛛池模板: 五大连池市| 边坝县| 乌什县| 二手房| 长海县| 三亚市| 武威市| 博乐市| 泰宁县| 许昌市| 建始县| 陆川县| 石棉县| 醴陵市| 大石桥市| 柳江县| 孝昌县| 白玉县| 闽清县| 萨迦县| 广州市| 兴和县| 喜德县| 集贤县| 林周县| 同江市| 六枝特区| 清丰县| 毕节市| 正镶白旗| 两当县| 扬中市| 泸州市| 甘德县| 大埔区| 滕州市| 叙永县| 隆德县| 唐山市| 贺州市| 松潘县|