- Spark核心技術與高級應用
- 于俊等
- 196字
- 2019-01-01 01:24:35
第3章
Spark程序開發
致虛極,守靜篤。萬物并作,吾以觀復。
——《道德經》第十六章
這世間,一切原本都是空虛而寧靜的,萬物也因而能夠在其中生長。因此,要追尋萬物的本質,必須恢復其最原始的虛靜狀態,只有致虛和守靜做到極篤的境地,萬物才能蓬勃生長,往復循環。
作為程序員,怎么提倡超越都不為過,但落地到具體問題,我們需要有比較實際的措施。從簡單程序開始,以致虛和守靜的心態,清空自己在大數據方向不勞而獲的幻想,逐步成長為業內有影響力的角色。對于大部分程序員而言,本章內容略顯基礎,首先通過Spark交互Shell來介紹Spark API,編寫簡單的Spark程序,然后展示如何構建Spark開發環境,以及編寫簡單的Spark案例程序,并提交應用程序。
3.1 使用Spark Shell編寫程序
要學習Spark程序開發,建議首先通過spark-shell交互式學習,加深對Spark程序開發的理解。spark-shell提供了一種學習API的簡單方式,以及一個能夠交互式分析數據的強大工具,在Scala語言環境下(Scala運行于Java虛擬機,因此能有效使用現有的Java庫)或Python語言環境下均可使用。
3.1.1 啟動Spark Shell
在spark-shell中,已經創建了一個名為sc的SparkContext對象,如在4個CPU核上運行bin/spark-shell,命令如下:
./bin/spark-shell --master local[4]
如果指定Jar包路徑,命令如下:
./bin/spark-shell --master local[4] --jars testcode.jar
其中,--master用來設置context將要連接并使用的資源主節點,master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一個local地址;使用--jars可以添加Jar包的路徑,使用逗號分隔可以添加多個包。進一步說明,spark-shell的本質是在后臺調用了spark-submit腳本來啟動應用程序。
3.1.2 加載text文件
Spark創建sc之后,可以加載本地文件創建RDD,我們以加載Spark自帶的本地文件README.md文件進行測試,返回一個MapPartitionsRDD文件。
scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
需要說明的是,加載HDFS文件和本地文件都是使用textFile,區別是添加前綴(hdfs://和f ile://)進行標識,從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉成HadoopRDD,然后隱式轉換成MapPartitionsRDD。
上面所說的MapPartitionsRDD和HadoopRDD都是基于Spark的彈性分布式數據集(RDD)。
3.1.3 簡單RDD操作
對于RDD,可以執行Transformation返回新RDD,也可以執行Action得到返回結果。下面從f irst和count命令開始Action之旅,示例代碼如下:
// 獲取RDD文件textFile的第一項 scala>textFile.f irst() res0: String = # Apache Spark // 獲取RDD文件textFile所有項的計數 scala>textFile.count() res1: Long = 98
接下來通過Transformation操作,使用f ilter命令返回一個新的RDD,即抽取文件全部條目的一個子集,返回一個新的FilteredRDD,示例代碼如下:
// 抽取含有"Spark"的子集
scala>valtext Filter = textFile.filter(line >line.contains("Spark")) textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23
我們可以鏈接多個Transformation和Action進行操作。示例代碼如下:
scala>textFile.filter(line =>line.contains("Spark")).count() res2: Long = 18
3.1.4 簡單RDD操作應用
通過簡單RDD操作進行組合,來實現找出文本中每行最多單詞數,詞頻統計等。
1.找出文本中每行最多單詞數
基于RDD的Transformation和Action可以用作更復雜的計算,假設想要找到文本中每行最多單詞數,可以參考如下代碼:
scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 14
在上面這段代碼中,首先將textFile每一行文本中的句子使用split(" ")進行分詞,并統計分詞后的單詞數。創建一個基于單詞數的新RDD,然后針對該RDD執行Reduce操作使用(a, b) => if (a > b) a else b函數進行比較,返回最大值。
2.詞頻統計
從MapReduce開始,詞頻統計已經成為大數據處理最流行的入門程序,類似MapReduce,Spark也能很容易地實現MapReduce,示例程序如下:
// 結合f latMap、map和reduceByKey來計算文件中每個單詞的詞頻 scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)). reduceByKey((a, b) => a + b) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:23 // 使用collect聚合單詞計數結果 scala>wordCount.collect() res4: Array[(String, Int)] = Array((package,1), (this,1) ,...
這里,結合f latMap、Map和reduceByKey來計算文件中每個單詞的詞頻,并返回(string、int)類型的鍵值對ShuffledRDD(由于reduceByKey執行時需要進行Shuffle操作,返回的是一個Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合單詞計數結果。
如果想讓Scala的函數文本更簡潔,可以使用占位符“_”,占位符可以看作表達式里需要被“填入”的“空白”,這個“空白”在每次函數被調用時,由函數的參數填入。
當每個參數在函數文本中最多出現一次的情況下,可以使用_+_擴展成帶兩個參數的函數文本;多個下劃線指代多個參數,而不是單個參數的重復使用。第一個下劃線代表第一個參數,第二個下劃線代表第二個參數,依此類推。
下面通過占位符對詞頻統計進行優化。
scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1) .reduceByKey(_+_)
Spark默認是不進行排序的,如果以排序的方法進行輸出,需要進行key和value互換,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。這樣就完成了數據統計和排序,具體代碼如下:
scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_). map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
上面的代碼通過第一個x=>(x._2,x._1)實現key和value互換,然后通過sortByKey(false)實現降序排列,通過第二個x=>(x._2,x._1)再次實現key和value互換,最終實現排序功能。
3.1.5 RDD緩存
Spark也支持將數據集存進一個集群的內存緩存中,當數據被反復訪問時,如在查詢一個小而“熱”數據集,或運行像PageRank的迭代算法時,是非常有用的。舉一個簡單的例子,緩存變量textFilter(即包含字符串“Spark”的數據集),并針對緩存計算。
scala>textFilter.cache() res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23 scala>textFilter.count() res6: Long = 18
通過cache緩存數據可以用于非常大的數據集,支持跨越幾十或幾百個節點。