官术网_书友最值得收藏!

第3章

Spark程序開(kāi)發(fā)

致虛極,守靜篤。萬(wàn)物并作,吾以觀復(fù)。

——《道德經(jīng)》第十六章

這世間,一切原本都是空虛而寧?kù)o的,萬(wàn)物也因而能夠在其中生長(zhǎng)。因此,要追尋萬(wàn)物的本質(zhì),必須恢復(fù)其最原始的虛靜狀態(tài),只有致虛和守靜做到極篤的境地,萬(wàn)物才能蓬勃生長(zhǎng),往復(fù)循環(huán)。

作為程序員,怎么提倡超越都不為過(guò),但落地到具體問(wèn)題,我們需要有比較實(shí)際的措施。從簡(jiǎn)單程序開(kāi)始,以致虛和守靜的心態(tài),清空自己在大數(shù)據(jù)方向不勞而獲的幻想,逐步成長(zhǎng)為業(yè)內(nèi)有影響力的角色。對(duì)于大部分程序員而言,本章內(nèi)容略顯基礎(chǔ),首先通過(guò)Spark交互Shell來(lái)介紹Spark API,編寫簡(jiǎn)單的Spark程序,然后展示如何構(gòu)建Spark開(kāi)發(fā)環(huán)境,以及編寫簡(jiǎn)單的Spark案例程序,并提交應(yīng)用程序。

3.1 使用Spark Shell編寫程序

要學(xué)習(xí)Spark程序開(kāi)發(fā),建議首先通過(guò)spark-shell交互式學(xué)習(xí),加深對(duì)Spark程序開(kāi)發(fā)的理解。spark-shell提供了一種學(xué)習(xí)API的簡(jiǎn)單方式,以及一個(gè)能夠交互式分析數(shù)據(jù)的強(qiáng)大工具,在Scala語(yǔ)言環(huán)境下(Scala運(yùn)行于Java虛擬機(jī),因此能有效使用現(xiàn)有的Java庫(kù))或Python語(yǔ)言環(huán)境下均可使用。

3.1.1 啟動(dòng)Spark Shell

在spark-shell中,已經(jīng)創(chuàng)建了一個(gè)名為sc的SparkContext對(duì)象,如在4個(gè)CPU核上運(yùn)行bin/spark-shell,命令如下:

./bin/spark-shell --master local[4]

如果指定Jar包路徑,命令如下:

./bin/spark-shell --master local[4] --jars testcode.jar

其中,--master用來(lái)設(shè)置context將要連接并使用的資源主節(jié)點(diǎn),master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一個(gè)local地址;使用--jars可以添加Jar包的路徑,使用逗號(hào)分隔可以添加多個(gè)包。進(jìn)一步說(shuō)明,spark-shell的本質(zhì)是在后臺(tái)調(diào)用了spark-submit腳本來(lái)啟動(dòng)應(yīng)用程序。

3.1.2 加載text文件

Spark創(chuàng)建sc之后,可以加載本地文件創(chuàng)建RDD,我們以加載Spark自帶的本地文件README.md文件進(jìn)行測(cè)試,返回一個(gè)MapPartitionsRDD文件。

          scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md")
          textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at
          <console>:21

需要說(shuō)明的是,加載HDFS文件和本地文件都是使用textFile,區(qū)別是添加前綴(hdfs://和f ile://)進(jìn)行標(biāo)識(shí),從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉(zhuǎn)成HadoopRDD,然后隱式轉(zhuǎn)換成MapPartitionsRDD。

上面所說(shuō)的MapPartitionsRDD和HadoopRDD都是基于Spark的彈性分布式數(shù)據(jù)集(RDD)。

3.1.3 簡(jiǎn)單RDD操作

對(duì)于RDD,可以執(zhí)行Transformation返回新RDD,也可以執(zhí)行Action得到返回結(jié)果。下面從f irst和count命令開(kāi)始Action之旅,示例代碼如下:

          // 獲取RDD文件textFile的第一項(xiàng)
          scala>textFile.f irst()
          res0: String = # Apache Spark
          // 獲取RDD文件textFile所有項(xiàng)的計(jì)數(shù)
          scala>textFile.count()
          res1: Long = 98

接下來(lái)通過(guò)Transformation操作,使用f ilter命令返回一個(gè)新的RDD,即抽取文件全部條目的一個(gè)子集,返回一個(gè)新的FilteredRDD,示例代碼如下:

// 抽取含有"Spark"的子集
            scala>valtext Filter = textFile.filter(line >line.contains("Spark"))
            textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at
            <console>:23

我們可以鏈接多個(gè)Transformation和Action進(jìn)行操作。示例代碼如下:

            scala>textFile.filter(line =>line.contains("Spark")).count()
            res2: Long = 18

3.1.4 簡(jiǎn)單RDD操作應(yīng)用

通過(guò)簡(jiǎn)單RDD操作進(jìn)行組合,來(lái)實(shí)現(xiàn)找出文本中每行最多單詞數(shù),詞頻統(tǒng)計(jì)等。

1.找出文本中每行最多單詞數(shù)

基于RDD的Transformation和Action可以用作更復(fù)雜的計(jì)算,假設(shè)想要找到文本中每行最多單詞數(shù),可以參考如下代碼:

            scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b)
            res3: Int = 14

在上面這段代碼中,首先將textFile每一行文本中的句子使用split(" ")進(jìn)行分詞,并統(tǒng)計(jì)分詞后的單詞數(shù)。創(chuàng)建一個(gè)基于單詞數(shù)的新RDD,然后針對(duì)該RDD執(zhí)行Reduce操作使用(a, b) => if (a > b) a else b函數(shù)進(jìn)行比較,返回最大值。

2.詞頻統(tǒng)計(jì)

從MapReduce開(kāi)始,詞頻統(tǒng)計(jì)已經(jīng)成為大數(shù)據(jù)處理最流行的入門程序,類似MapReduce,Spark也能很容易地實(shí)現(xiàn)MapReduce,示例程序如下:

            // 結(jié)合f latMap、map和reduceByKey來(lái)計(jì)算文件中每個(gè)單詞的詞頻
            scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)).
            reduceByKey((a, b) => a + b)
            wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey
            at <console>:23
            // 使用collect聚合單詞計(jì)數(shù)結(jié)果
            scala>wordCount.collect()
            res4: Array[(String, Int)] = Array((package,1), (this,1) ,...

這里,結(jié)合f latMap、Map和reduceByKey來(lái)計(jì)算文件中每個(gè)單詞的詞頻,并返回(string、int)類型的鍵值對(duì)ShuffledRDD(由于reduceByKey執(zhí)行時(shí)需要進(jìn)行Shuffle操作,返回的是一個(gè)Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合單詞計(jì)數(shù)結(jié)果。

如果想讓Scala的函數(shù)文本更簡(jiǎn)潔,可以使用占位符“_”,占位符可以看作表達(dá)式里需要被“填入”的“空白”,這個(gè)“空白”在每次函數(shù)被調(diào)用時(shí),由函數(shù)的參數(shù)填入。

當(dāng)每個(gè)參數(shù)在函數(shù)文本中最多出現(xiàn)一次的情況下,可以使用_+_擴(kuò)展成帶兩個(gè)參數(shù)的函數(shù)文本;多個(gè)下劃線指代多個(gè)參數(shù),而不是單個(gè)參數(shù)的重復(fù)使用。第一個(gè)下劃線代表第一個(gè)參數(shù),第二個(gè)下劃線代表第二個(gè)參數(shù),依此類推。

下面通過(guò)占位符對(duì)詞頻統(tǒng)計(jì)進(jìn)行優(yōu)化。

          scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1)
          .reduceByKey(_+_)

Spark默認(rèn)是不進(jìn)行排序的,如果以排序的方法進(jìn)行輸出,需要進(jìn)行key和value互換,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。這樣就完成了數(shù)據(jù)統(tǒng)計(jì)和排序,具體代碼如下:

          scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_).
          map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))

上面的代碼通過(guò)第一個(gè)x=>(x._2,x._1)實(shí)現(xiàn)key和value互換,然后通過(guò)sortByKey(false)實(shí)現(xiàn)降序排列,通過(guò)第二個(gè)x=>(x._2,x._1)再次實(shí)現(xiàn)key和value互換,最終實(shí)現(xiàn)排序功能。

3.1.5 RDD緩存

Spark也支持將數(shù)據(jù)集存進(jìn)一個(gè)集群的內(nèi)存緩存中,當(dāng)數(shù)據(jù)被反復(fù)訪問(wèn)時(shí),如在查詢一個(gè)小而“熱”數(shù)據(jù)集,或運(yùn)行像PageRank的迭代算法時(shí),是非常有用的。舉一個(gè)簡(jiǎn)單的例子,緩存變量textFilter(即包含字符串“Spark”的數(shù)據(jù)集),并針對(duì)緩存計(jì)算。

          scala>textFilter.cache()
          res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23
          scala>textFilter.count()
          res6: Long = 18

通過(guò)cache緩存數(shù)據(jù)可以用于非常大的數(shù)據(jù)集,支持跨越幾十或幾百個(gè)節(jié)點(diǎn)。

主站蜘蛛池模板: 鞍山市| 师宗县| 永川市| 新昌县| 湖北省| 加查县| 宣威市| 滨州市| 始兴县| 淄博市| 河池市| 甘孜县| 龙口市| 黔南| 芜湖县| 苏州市| 合水县| 雅安市| 黑河市| 伽师县| 开化县| 民和| 贞丰县| 栖霞市| 汽车| 双江| 揭阳市| 梁山县| 姚安县| 肥西县| 资中县| 芜湖市| 罗田县| 盐山县| 海口市| 云霄县| 军事| 上虞市| 儋州市| 柳林县| 武平县|