- Spark核心技術(shù)與高級(jí)應(yīng)用
- 于俊等
- 153字
- 2019-01-01 01:24:36
第3章
Spark程序開(kāi)發(fā)
致虛極,守靜篤。萬(wàn)物并作,吾以觀復(fù)。
——《道德經(jīng)》第十六章
這世間,一切原本都是空虛而寧?kù)o的,萬(wàn)物也因而能夠在其中生長(zhǎng)。因此,要追尋萬(wàn)物的本質(zhì),必須恢復(fù)其最原始的虛靜狀態(tài),只有致虛和守靜做到極篤的境地,萬(wàn)物才能蓬勃生長(zhǎng),往復(fù)循環(huán)。
作為程序員,怎么提倡超越都不為過(guò),但落地到具體問(wèn)題,我們需要有比較實(shí)際的措施。從簡(jiǎn)單程序開(kāi)始,以致虛和守靜的心態(tài),清空自己在大數(shù)據(jù)方向不勞而獲的幻想,逐步成長(zhǎng)為業(yè)內(nèi)有影響力的角色。對(duì)于大部分程序員而言,本章內(nèi)容略顯基礎(chǔ),首先通過(guò)Spark交互Shell來(lái)介紹Spark API,編寫簡(jiǎn)單的Spark程序,然后展示如何構(gòu)建Spark開(kāi)發(fā)環(huán)境,以及編寫簡(jiǎn)單的Spark案例程序,并提交應(yīng)用程序。
3.1 使用Spark Shell編寫程序
要學(xué)習(xí)Spark程序開(kāi)發(fā),建議首先通過(guò)spark-shell交互式學(xué)習(xí),加深對(duì)Spark程序開(kāi)發(fā)的理解。spark-shell提供了一種學(xué)習(xí)API的簡(jiǎn)單方式,以及一個(gè)能夠交互式分析數(shù)據(jù)的強(qiáng)大工具,在Scala語(yǔ)言環(huán)境下(Scala運(yùn)行于Java虛擬機(jī),因此能有效使用現(xiàn)有的Java庫(kù))或Python語(yǔ)言環(huán)境下均可使用。
3.1.1 啟動(dòng)Spark Shell
在spark-shell中,已經(jīng)創(chuàng)建了一個(gè)名為sc的SparkContext對(duì)象,如在4個(gè)CPU核上運(yùn)行bin/spark-shell,命令如下:
./bin/spark-shell --master local[4]
如果指定Jar包路徑,命令如下:
./bin/spark-shell --master local[4] --jars testcode.jar
其中,--master用來(lái)設(shè)置context將要連接并使用的資源主節(jié)點(diǎn),master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一個(gè)local地址;使用--jars可以添加Jar包的路徑,使用逗號(hào)分隔可以添加多個(gè)包。進(jìn)一步說(shuō)明,spark-shell的本質(zhì)是在后臺(tái)調(diào)用了spark-submit腳本來(lái)啟動(dòng)應(yīng)用程序。
3.1.2 加載text文件
Spark創(chuàng)建sc之后,可以加載本地文件創(chuàng)建RDD,我們以加載Spark自帶的本地文件README.md文件進(jìn)行測(cè)試,返回一個(gè)MapPartitionsRDD文件。
scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
需要說(shuō)明的是,加載HDFS文件和本地文件都是使用textFile,區(qū)別是添加前綴(hdfs://和f ile://)進(jìn)行標(biāo)識(shí),從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉(zhuǎn)成HadoopRDD,然后隱式轉(zhuǎn)換成MapPartitionsRDD。
上面所說(shuō)的MapPartitionsRDD和HadoopRDD都是基于Spark的彈性分布式數(shù)據(jù)集(RDD)。
3.1.3 簡(jiǎn)單RDD操作
對(duì)于RDD,可以執(zhí)行Transformation返回新RDD,也可以執(zhí)行Action得到返回結(jié)果。下面從f irst和count命令開(kāi)始Action之旅,示例代碼如下:
// 獲取RDD文件textFile的第一項(xiàng) scala>textFile.f irst() res0: String = # Apache Spark // 獲取RDD文件textFile所有項(xiàng)的計(jì)數(shù) scala>textFile.count() res1: Long = 98
接下來(lái)通過(guò)Transformation操作,使用f ilter命令返回一個(gè)新的RDD,即抽取文件全部條目的一個(gè)子集,返回一個(gè)新的FilteredRDD,示例代碼如下:
// 抽取含有"Spark"的子集
scala>valtext Filter = textFile.filter(line >line.contains("Spark")) textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23
我們可以鏈接多個(gè)Transformation和Action進(jìn)行操作。示例代碼如下:
scala>textFile.filter(line =>line.contains("Spark")).count() res2: Long = 18
3.1.4 簡(jiǎn)單RDD操作應(yīng)用
通過(guò)簡(jiǎn)單RDD操作進(jìn)行組合,來(lái)實(shí)現(xiàn)找出文本中每行最多單詞數(shù),詞頻統(tǒng)計(jì)等。
1.找出文本中每行最多單詞數(shù)
基于RDD的Transformation和Action可以用作更復(fù)雜的計(jì)算,假設(shè)想要找到文本中每行最多單詞數(shù),可以參考如下代碼:
scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 14
在上面這段代碼中,首先將textFile每一行文本中的句子使用split(" ")進(jìn)行分詞,并統(tǒng)計(jì)分詞后的單詞數(shù)。創(chuàng)建一個(gè)基于單詞數(shù)的新RDD,然后針對(duì)該RDD執(zhí)行Reduce操作使用(a, b) => if (a > b) a else b函數(shù)進(jìn)行比較,返回最大值。
2.詞頻統(tǒng)計(jì)
從MapReduce開(kāi)始,詞頻統(tǒng)計(jì)已經(jīng)成為大數(shù)據(jù)處理最流行的入門程序,類似MapReduce,Spark也能很容易地實(shí)現(xiàn)MapReduce,示例程序如下:
// 結(jié)合f latMap、map和reduceByKey來(lái)計(jì)算文件中每個(gè)單詞的詞頻 scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)). reduceByKey((a, b) => a + b) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:23 // 使用collect聚合單詞計(jì)數(shù)結(jié)果 scala>wordCount.collect() res4: Array[(String, Int)] = Array((package,1), (this,1) ,...
這里,結(jié)合f latMap、Map和reduceByKey來(lái)計(jì)算文件中每個(gè)單詞的詞頻,并返回(string、int)類型的鍵值對(duì)ShuffledRDD(由于reduceByKey執(zhí)行時(shí)需要進(jìn)行Shuffle操作,返回的是一個(gè)Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合單詞計(jì)數(shù)結(jié)果。
如果想讓Scala的函數(shù)文本更簡(jiǎn)潔,可以使用占位符“_”,占位符可以看作表達(dá)式里需要被“填入”的“空白”,這個(gè)“空白”在每次函數(shù)被調(diào)用時(shí),由函數(shù)的參數(shù)填入。
當(dāng)每個(gè)參數(shù)在函數(shù)文本中最多出現(xiàn)一次的情況下,可以使用_+_擴(kuò)展成帶兩個(gè)參數(shù)的函數(shù)文本;多個(gè)下劃線指代多個(gè)參數(shù),而不是單個(gè)參數(shù)的重復(fù)使用。第一個(gè)下劃線代表第一個(gè)參數(shù),第二個(gè)下劃線代表第二個(gè)參數(shù),依此類推。
下面通過(guò)占位符對(duì)詞頻統(tǒng)計(jì)進(jìn)行優(yōu)化。
scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1) .reduceByKey(_+_)
Spark默認(rèn)是不進(jìn)行排序的,如果以排序的方法進(jìn)行輸出,需要進(jìn)行key和value互換,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。這樣就完成了數(shù)據(jù)統(tǒng)計(jì)和排序,具體代碼如下:
scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_). map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
上面的代碼通過(guò)第一個(gè)x=>(x._2,x._1)實(shí)現(xiàn)key和value互換,然后通過(guò)sortByKey(false)實(shí)現(xiàn)降序排列,通過(guò)第二個(gè)x=>(x._2,x._1)再次實(shí)現(xiàn)key和value互換,最終實(shí)現(xiàn)排序功能。
3.1.5 RDD緩存
Spark也支持將數(shù)據(jù)集存進(jìn)一個(gè)集群的內(nèi)存緩存中,當(dāng)數(shù)據(jù)被反復(fù)訪問(wèn)時(shí),如在查詢一個(gè)小而“熱”數(shù)據(jù)集,或運(yùn)行像PageRank的迭代算法時(shí),是非常有用的。舉一個(gè)簡(jiǎn)單的例子,緩存變量textFilter(即包含字符串“Spark”的數(shù)據(jù)集),并針對(duì)緩存計(jì)算。
scala>textFilter.cache() res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23 scala>textFilter.count() res6: Long = 18
通過(guò)cache緩存數(shù)據(jù)可以用于非常大的數(shù)據(jù)集,支持跨越幾十或幾百個(gè)節(jié)點(diǎn)。
- 文本數(shù)據(jù)挖掘:基于R語(yǔ)言
- Python數(shù)據(jù)分析、挖掘與可視化從入門到精通
- Learning JavaScriptMVC
- 大話Oracle Grid:云時(shí)代的RAC
- 大數(shù)據(jù)營(yíng)銷:如何讓營(yíng)銷更具吸引力
- 數(shù)據(jù)庫(kù)原理與設(shè)計(jì)(第2版)
- Oracle PL/SQL實(shí)例精解(原書第5版)
- INSTANT Apple iBooks How-to
- 數(shù)字IC設(shè)計(jì)入門(微課視頻版)
- 從實(shí)踐中學(xué)習(xí)sqlmap數(shù)據(jù)庫(kù)注入測(cè)試
- MySQL數(shù)據(jù)庫(kù)技術(shù)與應(yīng)用
- 信息融合中估計(jì)算法的性能評(píng)估
- 大數(shù)據(jù)分析:R基礎(chǔ)及應(yīng)用
- 企業(yè)大數(shù)據(jù)處理:Spark、Druid、Flume與Kafka應(yīng)用實(shí)踐
- 一本書讀懂大數(shù)據(jù)