官术网_书友最值得收藏!

第3章

Spark程序開發

致虛極,守靜篤。萬物并作,吾以觀復。

——《道德經》第十六章

這世間,一切原本都是空虛而寧靜的,萬物也因而能夠在其中生長。因此,要追尋萬物的本質,必須恢復其最原始的虛靜狀態,只有致虛和守靜做到極篤的境地,萬物才能蓬勃生長,往復循環。

作為程序員,怎么提倡超越都不為過,但落地到具體問題,我們需要有比較實際的措施。從簡單程序開始,以致虛和守靜的心態,清空自己在大數據方向不勞而獲的幻想,逐步成長為業內有影響力的角色。對于大部分程序員而言,本章內容略顯基礎,首先通過Spark交互Shell來介紹Spark API,編寫簡單的Spark程序,然后展示如何構建Spark開發環境,以及編寫簡單的Spark案例程序,并提交應用程序。

3.1 使用Spark Shell編寫程序

要學習Spark程序開發,建議首先通過spark-shell交互式學習,加深對Spark程序開發的理解。spark-shell提供了一種學習API的簡單方式,以及一個能夠交互式分析數據的強大工具,在Scala語言環境下(Scala運行于Java虛擬機,因此能有效使用現有的Java庫)或Python語言環境下均可使用。

3.1.1 啟動Spark Shell

在spark-shell中,已經創建了一個名為sc的SparkContext對象,如在4個CPU核上運行bin/spark-shell,命令如下:

./bin/spark-shell --master local[4]

如果指定Jar包路徑,命令如下:

./bin/spark-shell --master local[4] --jars testcode.jar

其中,--master用來設置context將要連接并使用的資源主節點,master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一個local地址;使用--jars可以添加Jar包的路徑,使用逗號分隔可以添加多個包。進一步說明,spark-shell的本質是在后臺調用了spark-submit腳本來啟動應用程序。

3.1.2 加載text文件

Spark創建sc之后,可以加載本地文件創建RDD,我們以加載Spark自帶的本地文件README.md文件進行測試,返回一個MapPartitionsRDD文件。

          scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md")
          textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at
          <console>:21

需要說明的是,加載HDFS文件和本地文件都是使用textFile,區別是添加前綴(hdfs://和f ile://)進行標識,從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉成HadoopRDD,然后隱式轉換成MapPartitionsRDD。

上面所說的MapPartitionsRDD和HadoopRDD都是基于Spark的彈性分布式數據集(RDD)。

3.1.3 簡單RDD操作

對于RDD,可以執行Transformation返回新RDD,也可以執行Action得到返回結果。下面從f irst和count命令開始Action之旅,示例代碼如下:

          // 獲取RDD文件textFile的第一項
          scala>textFile.f irst()
          res0: String = # Apache Spark
          // 獲取RDD文件textFile所有項的計數
          scala>textFile.count()
          res1: Long = 98

接下來通過Transformation操作,使用f ilter命令返回一個新的RDD,即抽取文件全部條目的一個子集,返回一個新的FilteredRDD,示例代碼如下:

// 抽取含有"Spark"的子集
            scala>valtext Filter = textFile.filter(line >line.contains("Spark"))
            textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at
            <console>:23

我們可以鏈接多個Transformation和Action進行操作。示例代碼如下:

            scala>textFile.filter(line =>line.contains("Spark")).count()
            res2: Long = 18

3.1.4 簡單RDD操作應用

通過簡單RDD操作進行組合,來實現找出文本中每行最多單詞數,詞頻統計等。

1.找出文本中每行最多單詞數

基于RDD的Transformation和Action可以用作更復雜的計算,假設想要找到文本中每行最多單詞數,可以參考如下代碼:

            scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b)
            res3: Int = 14

在上面這段代碼中,首先將textFile每一行文本中的句子使用split(" ")進行分詞,并統計分詞后的單詞數。創建一個基于單詞數的新RDD,然后針對該RDD執行Reduce操作使用(a, b) => if (a > b) a else b函數進行比較,返回最大值。

2.詞頻統計

從MapReduce開始,詞頻統計已經成為大數據處理最流行的入門程序,類似MapReduce,Spark也能很容易地實現MapReduce,示例程序如下:

            // 結合f latMap、map和reduceByKey來計算文件中每個單詞的詞頻
            scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)).
            reduceByKey((a, b) => a + b)
            wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey
            at <console>:23
            // 使用collect聚合單詞計數結果
            scala>wordCount.collect()
            res4: Array[(String, Int)] = Array((package,1), (this,1) ,...

這里,結合f latMap、Map和reduceByKey來計算文件中每個單詞的詞頻,并返回(string、int)類型的鍵值對ShuffledRDD(由于reduceByKey執行時需要進行Shuffle操作,返回的是一個Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合單詞計數結果。

如果想讓Scala的函數文本更簡潔,可以使用占位符“_”,占位符可以看作表達式里需要被“填入”的“空白”,這個“空白”在每次函數被調用時,由函數的參數填入。

當每個參數在函數文本中最多出現一次的情況下,可以使用_+_擴展成帶兩個參數的函數文本;多個下劃線指代多個參數,而不是單個參數的重復使用。第一個下劃線代表第一個參數,第二個下劃線代表第二個參數,依此類推。

下面通過占位符對詞頻統計進行優化。

          scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1)
          .reduceByKey(_+_)

Spark默認是不進行排序的,如果以排序的方法進行輸出,需要進行key和value互換,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。這樣就完成了數據統計和排序,具體代碼如下:

          scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_).
          map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))

上面的代碼通過第一個x=>(x._2,x._1)實現key和value互換,然后通過sortByKey(false)實現降序排列,通過第二個x=>(x._2,x._1)再次實現key和value互換,最終實現排序功能。

3.1.5 RDD緩存

Spark也支持將數據集存進一個集群的內存緩存中,當數據被反復訪問時,如在查詢一個小而“熱”數據集,或運行像PageRank的迭代算法時,是非常有用的。舉一個簡單的例子,緩存變量textFilter(即包含字符串“Spark”的數據集),并針對緩存計算。

          scala>textFilter.cache()
          res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23
          scala>textFilter.count()
          res6: Long = 18

通過cache緩存數據可以用于非常大的數據集,支持跨越幾十或幾百個節點。

主站蜘蛛池模板: 大石桥市| 永丰县| 兴义市| 昌吉市| 德惠市| 广饶县| 黔西| 塔城市| 娄底市| 安庆市| 娄底市| 昌图县| 石狮市| 新晃| 永仁县| 东莞市| 滨海县| 乐都县| 龙胜| 广东省| 北海市| 嘉禾县| 洪泽县| 嘉定区| 财经| 鹤山市| 蓬莱市| 敦化市| 梁山县| 石渠县| 伊宁县| 茶陵县| 新泰市| 九龙县| 闽侯县| 泉州市| 兰考县| 长沙市| 德化县| 芦山县| 古浪县|