- Spark核心技術(shù)與高級應(yīng)用
- 于俊等
- 169字
- 2019-01-01 01:24:35
第3章
Spark程序開發(fā)
致虛極,守靜篤。萬物并作,吾以觀復。
——《道德經(jīng)》第十六章
這世間,一切原本都是空虛而寧靜的,萬物也因而能夠在其中生長。因此,要追尋萬物的本質(zhì),必須恢復其最原始的虛靜狀態(tài),只有致虛和守靜做到極篤的境地,萬物才能蓬勃生長,往復循環(huán)。
作為程序員,怎么提倡超越都不為過,但落地到具體問題,我們需要有比較實際的措施。從簡單程序開始,以致虛和守靜的心態(tài),清空自己在大數(shù)據(jù)方向不勞而獲的幻想,逐步成長為業(yè)內(nèi)有影響力的角色。對于大部分程序員而言,本章內(nèi)容略顯基礎(chǔ),首先通過Spark交互Shell來介紹Spark API,編寫簡單的Spark程序,然后展示如何構(gòu)建Spark開發(fā)環(huán)境,以及編寫簡單的Spark案例程序,并提交應(yīng)用程序。
3.1 使用Spark Shell編寫程序
要學習Spark程序開發(fā),建議首先通過spark-shell交互式學習,加深對Spark程序開發(fā)的理解。spark-shell提供了一種學習API的簡單方式,以及一個能夠交互式分析數(shù)據(jù)的強大工具,在Scala語言環(huán)境下(Scala運行于Java虛擬機,因此能有效使用現(xiàn)有的Java庫)或Python語言環(huán)境下均可使用。
3.1.1 啟動Spark Shell
在spark-shell中,已經(jīng)創(chuàng)建了一個名為sc的SparkContext對象,如在4個CPU核上運行bin/spark-shell,命令如下:
./bin/spark-shell --master local[4]
如果指定Jar包路徑,命令如下:
./bin/spark-shell --master local[4] --jars testcode.jar
其中,--master用來設(shè)置context將要連接并使用的資源主節(jié)點,master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一個local地址;使用--jars可以添加Jar包的路徑,使用逗號分隔可以添加多個包。進一步說明,spark-shell的本質(zhì)是在后臺調(diào)用了spark-submit腳本來啟動應(yīng)用程序。
3.1.2 加載text文件
Spark創(chuàng)建sc之后,可以加載本地文件創(chuàng)建RDD,我們以加載Spark自帶的本地文件README.md文件進行測試,返回一個MapPartitionsRDD文件。
scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
需要說明的是,加載HDFS文件和本地文件都是使用textFile,區(qū)別是添加前綴(hdfs://和f ile://)進行標識,從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉(zhuǎn)成HadoopRDD,然后隱式轉(zhuǎn)換成MapPartitionsRDD。
上面所說的MapPartitionsRDD和HadoopRDD都是基于Spark的彈性分布式數(shù)據(jù)集(RDD)。
3.1.3 簡單RDD操作
對于RDD,可以執(zhí)行Transformation返回新RDD,也可以執(zhí)行Action得到返回結(jié)果。下面從f irst和count命令開始Action之旅,示例代碼如下:
// 獲取RDD文件textFile的第一項 scala>textFile.f irst() res0: String = # Apache Spark // 獲取RDD文件textFile所有項的計數(shù) scala>textFile.count() res1: Long = 98
接下來通過Transformation操作,使用f ilter命令返回一個新的RDD,即抽取文件全部條目的一個子集,返回一個新的FilteredRDD,示例代碼如下:
// 抽取含有"Spark"的子集
scala>valtext Filter = textFile.filter(line >line.contains("Spark")) textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23
我們可以鏈接多個Transformation和Action進行操作。示例代碼如下:
scala>textFile.filter(line =>line.contains("Spark")).count() res2: Long = 18
3.1.4 簡單RDD操作應(yīng)用
通過簡單RDD操作進行組合,來實現(xiàn)找出文本中每行最多單詞數(shù),詞頻統(tǒng)計等。
1.找出文本中每行最多單詞數(shù)
基于RDD的Transformation和Action可以用作更復雜的計算,假設(shè)想要找到文本中每行最多單詞數(shù),可以參考如下代碼:
scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 14
在上面這段代碼中,首先將textFile每一行文本中的句子使用split(" ")進行分詞,并統(tǒng)計分詞后的單詞數(shù)。創(chuàng)建一個基于單詞數(shù)的新RDD,然后針對該RDD執(zhí)行Reduce操作使用(a, b) => if (a > b) a else b函數(shù)進行比較,返回最大值。
2.詞頻統(tǒng)計
從MapReduce開始,詞頻統(tǒng)計已經(jīng)成為大數(shù)據(jù)處理最流行的入門程序,類似MapReduce,Spark也能很容易地實現(xiàn)MapReduce,示例程序如下:
// 結(jié)合f latMap、map和reduceByKey來計算文件中每個單詞的詞頻 scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)). reduceByKey((a, b) => a + b) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:23 // 使用collect聚合單詞計數(shù)結(jié)果 scala>wordCount.collect() res4: Array[(String, Int)] = Array((package,1), (this,1) ,...
這里,結(jié)合f latMap、Map和reduceByKey來計算文件中每個單詞的詞頻,并返回(string、int)類型的鍵值對ShuffledRDD(由于reduceByKey執(zhí)行時需要進行Shuffle操作,返回的是一個Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合單詞計數(shù)結(jié)果。
如果想讓Scala的函數(shù)文本更簡潔,可以使用占位符“_”,占位符可以看作表達式里需要被“填入”的“空白”,這個“空白”在每次函數(shù)被調(diào)用時,由函數(shù)的參數(shù)填入。
當每個參數(shù)在函數(shù)文本中最多出現(xiàn)一次的情況下,可以使用_+_擴展成帶兩個參數(shù)的函數(shù)文本;多個下劃線指代多個參數(shù),而不是單個參數(shù)的重復使用。第一個下劃線代表第一個參數(shù),第二個下劃線代表第二個參數(shù),依此類推。
下面通過占位符對詞頻統(tǒng)計進行優(yōu)化。
scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1) .reduceByKey(_+_)
Spark默認是不進行排序的,如果以排序的方法進行輸出,需要進行key和value互換,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。這樣就完成了數(shù)據(jù)統(tǒng)計和排序,具體代碼如下:
scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_). map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
上面的代碼通過第一個x=>(x._2,x._1)實現(xiàn)key和value互換,然后通過sortByKey(false)實現(xiàn)降序排列,通過第二個x=>(x._2,x._1)再次實現(xiàn)key和value互換,最終實現(xiàn)排序功能。
3.1.5 RDD緩存
Spark也支持將數(shù)據(jù)集存進一個集群的內(nèi)存緩存中,當數(shù)據(jù)被反復訪問時,如在查詢一個小而“熱”數(shù)據(jù)集,或運行像PageRank的迭代算法時,是非常有用的。舉一個簡單的例子,緩存變量textFilter(即包含字符串“Spark”的數(shù)據(jù)集),并針對緩存計算。
scala>textFilter.cache() res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23 scala>textFilter.count() res6: Long = 18
通過cache緩存數(shù)據(jù)可以用于非常大的數(shù)據(jù)集,支持跨越幾十或幾百個節(jié)點。
- 數(shù)據(jù)可視化:從小白到數(shù)據(jù)工程師的成長之路
- Voice Application Development for Android
- 文本數(shù)據(jù)挖掘:基于R語言
- Learning JavaScriptMVC
- 深入淺出數(shù)字孿生
- Hadoop大數(shù)據(jù)實戰(zhàn)權(quán)威指南(第2版)
- The Game Jam Survival Guide
- SQL應(yīng)用及誤區(qū)分析
- Visual FoxPro數(shù)據(jù)庫技術(shù)基礎(chǔ)
- 數(shù)據(jù)中心經(jīng)營之道
- Learning Ansible
- Practical Convolutional Neural Networks
- GameMaker Game Programming with GML
- 一本書講透數(shù)據(jù)治理:戰(zhàn)略、方法、工具與實踐
- 大數(shù)據(jù)理論與工程實踐