官术网_书友最值得收藏!

4.3 RDD操作

RDD提供了一個抽象的分布式數(shù)據(jù)架構,我們不必擔心底層數(shù)據(jù)的分布式特性,而應用邏輯可以表達為一系列轉換處理。

通常應用邏輯是以一系列轉換(Transformation)和執(zhí)行(Action)來表達的,前者在RDD之間指定處理的相互依賴關系,后者指定輸出的形式。

其中:

轉換:是指該操作從已經存在的數(shù)據(jù)集上創(chuàng)建一個新的數(shù)據(jù)集,是數(shù)據(jù)集的邏輯操作,并沒有真正計算。

執(zhí)行:是指該方法提交一個與前一個Action之間的所有Transformation組成的Job進行計算,Spark會根據(jù)Action將作業(yè)切分成多個Job。

比如,Map操作傳遞數(shù)據(jù)集中的每一個元素經過一個函數(shù),形成一個新的RDD轉換結果,而Reduce操作通過一些函數(shù)對RDD的所有元素進行操作,并返回最終結果給Driver程序。

在默認情況下,Spark所有的轉換操作都是惰性(Lazy)的,每個被轉換得到的RDD不會立即計算出結果,只是記下該轉換操作應用的一些基礎數(shù)據(jù)集,可以有多個轉換結果。轉換只有在遇到一個Action時才會執(zhí)行,如圖4-2所示。

圖4-2 Spark轉換和執(zhí)行

這種設計使得Spark以更高的效率運行。例如,可以通過將要在Reduce操作中使用的Map轉換來創(chuàng)建一個數(shù)據(jù)集,并且只返回Reduce的結果給驅動程序,而不是整個Map所得的數(shù)據(jù)集。

每當一個Job計算完成,其內部的所有RDD都會被清除,如果在下一個Job中有用到其他Job中的RDD,會引發(fā)該RDD的再次計算,為避免這種情況,我們可以使用Persist(默認是Cache)方法“持久化”一個RDD到內存中。在這種情況下,Spark將會在集群中保留這個RDD,以便其他Job可以更快地訪問,另外,Spark也支持持久化RDD到磁盤中,或者復制RDD到各個節(jié)點。

下面,通過幾行簡單的程序,進一步說明RDD的基礎知識。

          val lines=sc.textFile("data.txt")
          val lineLengths=lines.map(s=>s.length)
          val totalLength=lineLengths.reduce((a,b)=>a+b)

第一行讀取外部文件data.txt返回一個基礎的MappedRDD,該MappedRDD并不加載到內存中或被執(zhí)行操作,lines只是記錄轉換操作結果的指針。

第二行定義了lineLengths作為一個Map轉換的結果,由于惰性機制的存在,lineLengths的值不會立即計算。

最后,運行Reduce,該操作為一個Action。Spark將計算打散成多個任務以便在不同的機器上分別運行,每臺機器并行運行Map,并將結果進行Reduce操作,返回結果值Driver程序。

如果需要繼續(xù)使用lineLengths,可以添加緩存Persist或Cache,該持久化會在執(zhí)行Reduce之前,第一次計算成功之后,將lineLengths保存在內存中。

4.3.1 轉換操作

轉換操作是RDD的核心之一,通過轉換操作實現(xiàn)不同的RDD結果,作為下一次RDD計算的數(shù)據(jù)輸入,轉換操作不會觸發(fā)Job的提交,僅僅是標記對RDD的操作,形成DAG圖,以供Action觸發(fā)Job提交后調用。

常用的轉換操作包括:基礎轉換操作和鍵-值轉換操作。

1.基礎轉換操作

表4-2列出了目前支持的基礎轉換操作,具體內容請參見RDD的API官方文檔,以獲得更多的細節(jié)。

表4-2 基礎轉換操作

(續(xù))

2.鍵-值轉換操作

盡管大多數(shù)Spark操作都基于包含各種類型對象的RDD,但是一小部分特殊的卻只能在鍵-值對形式的RDD上執(zhí)行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通過鍵進行分組或聚合元素。

例如,使用reduceByKey操作對文件中每行出現(xiàn)的文字次數(shù)進行計數(shù),各種語言的示例如下。

在Scala中,只要在程序中導入org.apache.spark.SparkContext,就能使用Spark的隱式轉換,這些操作就可用于包含二元組對象的RDD(Scala中的內建元組,可通過(a,b)創(chuàng)建),鍵-值對操作可用PairRDDFunction類,如果導入了轉換,該類將自動封裝元組RDD。

            val lines = sc.textFile("data.txt")
            val pairs = lines.map(s => (s, 1))
            val counts = pairs.reduceByKey((a, b) => a + b)

基于counts,可以使用counts.sortByKey()按字母表順序對這些鍵-值對排序,然后使用counts.collect(),以對象數(shù)組的形式向Driver返回結果。

順便說一句,進行分組的groupByKey不進行本地合并,而進行聚合的reduceByKey會在本地對每個分區(qū)的數(shù)據(jù)合并后再做Shuffle,效率比groupByKey高得多。下面通過幾行基于Scala的代碼對鍵-值轉換操作進行說明。

            // 初始化List
            scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2))
            data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2))
            // 并行數(shù)組創(chuàng)建RDD
            scala>val rdd =sc.parallelize(data)
            rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0]
            // 按照key進行reduceByKey操作
          scala>val rbk = rdd.reduceByKey(_+_).collect
          rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3))
          // 按照key進行groupByKey操作
          scala>val gbk = rdd.groupByKey().collect
          gbk:  Array[(String,  Iterable[Int])]  =  Array((a,CompactBuffer(1,  2)),  (b,
          CompactBuffer(1, 2)), (c,CompactBuffer(1, 2)))
          // 按照key進行sortByKey操作
          scala>val sbk = rdd.sortByKey().collect
          sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))

表4-3列出了常用的健-值轉換。

表4-3 常用的鍵-值轉換

4.3.2 執(zhí)行操作

Spark將提交的Action與前一個Action之間的所有Transformation組成的Job進行計算,并根據(jù)Action將作業(yè)切分成多個Job,指定Transformation的輸出結果。

1.常用執(zhí)行操作

這里以加載Spark自帶的本地文件README.md文件進行測試,返回一個MappedRDD文件,進行Filter轉換操作和Count執(zhí)行。

          // 讀取README.md數(shù)據(jù),并轉化為RDD
          scala>val data = sc.textFile("file:///$SPARK_HOME/README.md")
          data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]
            // 執(zhí)行f ilter操作,提取帶有"Spark"的子集
            scala>val datafilter = data.filter(line =>line.contains("Spark"))
            datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2]
            // 執(zhí)行Action操作,輸出結果
            scala>val datacount = datafilter.count()
            datacount: Long = 21

如果想了解更多,請參考表4-4中列出的常用的執(zhí)行操作。

表4-4 常用的執(zhí)行操作

通過常用執(zhí)行操作,Spark可以實現(xiàn)大部分MapReduce流式計算的任務,提升了計算效率,對Transformation操作進行結果值輸出。

2.存儲執(zhí)行操作

常用存儲操作主要包含的執(zhí)行如表4-5所示。

表4-5 常用存儲操作包含的執(zhí)行

存儲執(zhí)行操作將結果進行保存,以文本、序列化文件、對象文件的方式輸出到存儲設備進行持久化。

4.3.3 控制操作

控制操作主要包括故障恢復、數(shù)據(jù)持久性,以及移除數(shù)據(jù)。其中,緩存操作Cache/Pesist是惰性的,在進行執(zhí)行操作時才會執(zhí)行,而Unpesist是即時的,會立即釋放內存。checkpoint會直接將RDD持久化到磁盤或HDFS等路徑,不同于Cache/Persist的是,被checkpoint的RDD不會因作業(yè)的結束而被消除,會一直存在,并可以被后續(xù)的作業(yè)直接讀取并加載。

1. RDD故障恢復

在一個典型的分布式系統(tǒng)中,容錯機制主要是采取檢查點(checkpoint)機制和數(shù)據(jù)備份機制。故障恢復是由主動檢查,以及不同機器之間的數(shù)據(jù)復制實現(xiàn)的。由于進行故障恢復需要跨集群網絡來復制大量數(shù)據(jù),這無疑是相當昂貴的。因此,在Spark中則采取了不同的方法進行故障恢復。

作為一個大型的分布式集群,Spark針對工作負載會做出兩種假設:

□處理時間是有限的;

□保持數(shù)據(jù)持久性是外部數(shù)據(jù)源的職責,主要是讓處理過程中的數(shù)據(jù)保持穩(wěn)定。

基于假設,Spark在執(zhí)行期間發(fā)生數(shù)據(jù)丟失時會選擇折中方案,它會重新執(zhí)行之前的步驟來恢復丟失的數(shù)據(jù),但并不是說丟棄之前所有已經完成的工作,而重新開始再來一遍。

假如其中一個RDD壞掉,RDD中有記錄之前的依賴關系,且依賴關系中記錄算子和分區(qū)。此時,僅僅需要再執(zhí)行一遍父RDD的相應分區(qū)。

但是,跨寬依賴的再執(zhí)行能夠涉及多個父RDD,從而引發(fā)全部的再執(zhí)行。為了規(guī)避這一點,Spark會保持Map階段中間數(shù)據(jù)輸出的持久,在機器發(fā)生故障的情況下,再執(zhí)行只需要回溯Mapper持續(xù)輸出的相應分區(qū),來獲取中間數(shù)據(jù)。

Spark還提供了數(shù)據(jù)檢查點和記錄日志,用于持久化中間RDD,這樣再執(zhí)行就不必追溯到最開始的階段。通過比較恢復延遲和檢查點開銷進行權衡,Spark會自動化地選擇相應的策略進行故障恢復。

2. RDD持久化

Spark的持久化,是指在不同轉換操作之間,將過程數(shù)據(jù)緩存在內存中,實現(xiàn)快速重用,或者故障快速恢復。持久化主要分為兩類,主動持久化和自動持久化。

主動持久化,主要目標是RDD重用,從而實現(xiàn)快速處理,是Spark構建迭代算法的關鍵。例如,持久化一個RDD,每一個節(jié)點都將把它的計算分塊結果保存在內存中,并在該數(shù)據(jù)集(或者衍生數(shù)據(jù)集)進行的后續(xù)Action中重用,使得后續(xù)Action執(zhí)行變得更加迅速(通常快10倍)。

可以使用persist()方法標記一個持久化的RDD,一旦被一個執(zhí)行(action)觸發(fā)計算,它將會被保留在計算節(jié)點的內存中并重用。如果RDD的任一分區(qū)丟失,通過使用原先創(chuàng)建的轉換操作,它將會被自動重算,不需要全部重算,而只計算丟失的部分。

此外,每一個RDD都可以用不同的保存級別進行保存,從而允許持久化數(shù)據(jù)集在硬盤或內存作為序列化的Java對象(節(jié)省空間),甚至跨節(jié)點復制

持久化的等級選擇,是通過將一個StorageLevel對象傳遞給persist()方法進行確定的,cache()方法調用persist()的默認級別MEMORY_ONLY。表4-6是持久化的等級。

表4-6 持久化的等級

相對于MEMORY_ONLY_SER,OFF_HEAP減小了垃圾回收的開銷,同時也允許Executor變得更小且可共享內存儲備,Executor的崩潰不會導致內存中的緩存丟失。在這種模式下,Tachyon中的內存是不可丟棄的。

自動持久化,是指不需要用戶調用persist(),Spark自動地保存一些Shuffle操作(如reduceByKey)的中間結果。這樣做是為了避免在Shuffle過程中一個節(jié)點崩潰時重新計算所有的輸入。

持久化時,一旦設置了就不能改變,想要改變就要先去持久化。推薦用戶在重用RDD結果時調用Persist,這樣會使持久化變得可控。

Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而檢查點在持久化的同時切斷Lineage,修改了RDD的meta info中的Lineage。二者均返回經過修改的RDD對象自身,而非新的RDD對象,也均屬于Lazy操作。

3. 選擇存儲等級

Spark的不同存儲級別,旨在滿足內存使用和CPU效率權衡上的不同需求,建議通過以下步驟進行選擇:

□如果你的RDD可以很好地與默認的存儲級別(MEMORY_ONLY)契合,那么就不需要做任何修改。這已經是CPU使用效率最高的選項,它使RDD的操作盡可能快。

□如果不能與MEMORY_ONLY很好地契合,建議使用MEMORY_ONLY_SER并選擇一個快速序列化的庫,使對象在有較高空間使用率的情況下,依然可以較快地被訪問。

□盡可能不要存儲數(shù)據(jù)到硬盤上,除非計算數(shù)據(jù)集的函數(shù),計算量特別大,或者它們過濾了大量的數(shù)據(jù)。否則,重新計算一個分區(qū)的速度與從硬盤中讀取的效率差不多。

□如果想擁有快速故障恢復能力,可使用復制存儲級別(例如,用Spark來響應Web應用的請求)。所有的存儲級別都有通過重新計算丟失數(shù)據(jù)恢復錯誤的容錯機制,但是復制存儲級別可以讓你在RDD上持續(xù)地運行任務,而不需要等待丟失的分區(qū)被重新計算。

□如果想要定義自己的存儲級別(如復制因子為3而不是2),可以使用StorageLevel單例對象的apply()方法。

4. 移除數(shù)據(jù)

RDD可以隨意在RAM中進行緩存,因此它提供了更快速的數(shù)據(jù)訪問。目前,緩存的粒度為RDD級別,只能緩存全部的RDD。

Spark自動監(jiān)視每個節(jié)點上使用的緩存,在集群中沒有足夠的內存時,Spark會根據(jù)緩存情況確定一個LRU(Least Recently Used,最近最少使用算法)的數(shù)據(jù)分區(qū)進行刪除。

如果想手動刪除RDD,而不想等待它從緩存中消失,可以使用RDD的unpersist()方法移除數(shù)據(jù),unpersist()方法是立即生效的。

主站蜘蛛池模板: 乌兰浩特市| 长丰县| 安西县| 镇康县| 巩留县| 东宁县| 济南市| 获嘉县| 玉山县| 嘉祥县| 无为县| 浦东新区| 峡江县| 淳安县| 综艺| 荃湾区| 中宁县| 昭苏县| 临湘市| 高要市| 读书| 赫章县| 大足县| 屏南县| 梁山县| 仁布县| 葫芦岛市| 布拖县| 泰顺县| 瓮安县| 浮山县| 韶关市| 澎湖县| 余姚市| 南昌县| 泰来县| 清远市| 东乡族自治县| 麻城市| 万源市| 灵武市|