- Spark核心技術與高級應用
- 于俊等
- 844字
- 2019-01-01 01:24:39
4.3 RDD操作
RDD提供了一個抽象的分布式數(shù)據(jù)架構,我們不必擔心底層數(shù)據(jù)的分布式特性,而應用邏輯可以表達為一系列轉換處理。
通常應用邏輯是以一系列轉換(Transformation)和執(zhí)行(Action)來表達的,前者在RDD之間指定處理的相互依賴關系,后者指定輸出的形式。
其中:
□轉換:是指該操作從已經存在的數(shù)據(jù)集上創(chuàng)建一個新的數(shù)據(jù)集,是數(shù)據(jù)集的邏輯操作,并沒有真正計算。
□執(zhí)行:是指該方法提交一個與前一個Action之間的所有Transformation組成的Job進行計算,Spark會根據(jù)Action將作業(yè)切分成多個Job。
比如,Map操作傳遞數(shù)據(jù)集中的每一個元素經過一個函數(shù),形成一個新的RDD轉換結果,而Reduce操作通過一些函數(shù)對RDD的所有元素進行操作,并返回最終結果給Driver程序。
在默認情況下,Spark所有的轉換操作都是惰性(Lazy)的,每個被轉換得到的RDD不會立即計算出結果,只是記下該轉換操作應用的一些基礎數(shù)據(jù)集,可以有多個轉換結果。轉換只有在遇到一個Action時才會執(zhí)行,如圖4-2所示。

圖4-2 Spark轉換和執(zhí)行
這種設計使得Spark以更高的效率運行。例如,可以通過將要在Reduce操作中使用的Map轉換來創(chuàng)建一個數(shù)據(jù)集,并且只返回Reduce的結果給驅動程序,而不是整個Map所得的數(shù)據(jù)集。
每當一個Job計算完成,其內部的所有RDD都會被清除,如果在下一個Job中有用到其他Job中的RDD,會引發(fā)該RDD的再次計算,為避免這種情況,我們可以使用Persist(默認是Cache)方法“持久化”一個RDD到內存中。在這種情況下,Spark將會在集群中保留這個RDD,以便其他Job可以更快地訪問,另外,Spark也支持持久化RDD到磁盤中,或者復制RDD到各個節(jié)點。
下面,通過幾行簡單的程序,進一步說明RDD的基礎知識。
val lines=sc.textFile("data.txt") val lineLengths=lines.map(s=>s.length) val totalLength=lineLengths.reduce((a,b)=>a+b)
第一行讀取外部文件data.txt返回一個基礎的MappedRDD,該MappedRDD并不加載到內存中或被執(zhí)行操作,lines只是記錄轉換操作結果的指針。
第二行定義了lineLengths作為一個Map轉換的結果,由于惰性機制的存在,lineLengths的值不會立即計算。
最后,運行Reduce,該操作為一個Action。Spark將計算打散成多個任務以便在不同的機器上分別運行,每臺機器并行運行Map,并將結果進行Reduce操作,返回結果值Driver程序。
如果需要繼續(xù)使用lineLengths,可以添加緩存Persist或Cache,該持久化會在執(zhí)行Reduce之前,第一次計算成功之后,將lineLengths保存在內存中。
4.3.1 轉換操作
轉換操作是RDD的核心之一,通過轉換操作實現(xiàn)不同的RDD結果,作為下一次RDD計算的數(shù)據(jù)輸入,轉換操作不會觸發(fā)Job的提交,僅僅是標記對RDD的操作,形成DAG圖,以供Action觸發(fā)Job提交后調用。
常用的轉換操作包括:基礎轉換操作和鍵-值轉換操作。
1.基礎轉換操作
表4-2列出了目前支持的基礎轉換操作,具體內容請參見RDD的API官方文檔,以獲得更多的細節(jié)。
表4-2 基礎轉換操作

(續(xù))

2.鍵-值轉換操作
盡管大多數(shù)Spark操作都基于包含各種類型對象的RDD,但是一小部分特殊的卻只能在鍵-值對形式的RDD上執(zhí)行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通過鍵進行分組或聚合元素。
例如,使用reduceByKey操作對文件中每行出現(xiàn)的文字次數(shù)進行計數(shù),各種語言的示例如下。
在Scala中,只要在程序中導入org.apache.spark.SparkContext,就能使用Spark的隱式轉換,這些操作就可用于包含二元組對象的RDD(Scala中的內建元組,可通過(a,b)創(chuàng)建),鍵-值對操作可用PairRDDFunction類,如果導入了轉換,該類將自動封裝元組RDD。
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
基于counts,可以使用counts.sortByKey()按字母表順序對這些鍵-值對排序,然后使用counts.collect(),以對象數(shù)組的形式向Driver返回結果。
順便說一句,進行分組的groupByKey不進行本地合并,而進行聚合的reduceByKey會在本地對每個分區(qū)的數(shù)據(jù)合并后再做Shuffle,效率比groupByKey高得多。下面通過幾行基于Scala的代碼對鍵-值轉換操作進行說明。
// 初始化List scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2)) data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2)) // 并行數(shù)組創(chuàng)建RDD scala>val rdd =sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] // 按照key進行reduceByKey操作
scala>val rbk = rdd.reduceByKey(_+_).collect rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3)) // 按照key進行groupByKey操作 scala>val gbk = rdd.groupByKey().collect gbk: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b, CompactBuffer(1, 2)), (c,CompactBuffer(1, 2))) // 按照key進行sortByKey操作 scala>val sbk = rdd.sortByKey().collect sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))
表4-3列出了常用的健-值轉換。
表4-3 常用的鍵-值轉換

4.3.2 執(zhí)行操作
Spark將提交的Action與前一個Action之間的所有Transformation組成的Job進行計算,并根據(jù)Action將作業(yè)切分成多個Job,指定Transformation的輸出結果。
1.常用執(zhí)行操作
這里以加載Spark自帶的本地文件README.md文件進行測試,返回一個MappedRDD文件,進行Filter轉換操作和Count執(zhí)行。
// 讀取README.md數(shù)據(jù),并轉化為RDD scala>val data = sc.textFile("file:///$SPARK_HOME/README.md") data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]
// 執(zhí)行f ilter操作,提取帶有"Spark"的子集 scala>val datafilter = data.filter(line =>line.contains("Spark")) datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] // 執(zhí)行Action操作,輸出結果 scala>val datacount = datafilter.count() datacount: Long = 21
如果想了解更多,請參考表4-4中列出的常用的執(zhí)行操作。
表4-4 常用的執(zhí)行操作

通過常用執(zhí)行操作,Spark可以實現(xiàn)大部分MapReduce流式計算的任務,提升了計算效率,對Transformation操作進行結果值輸出。
2.存儲執(zhí)行操作
常用存儲操作主要包含的執(zhí)行如表4-5所示。
表4-5 常用存儲操作包含的執(zhí)行

存儲執(zhí)行操作將結果進行保存,以文本、序列化文件、對象文件的方式輸出到存儲設備進行持久化。
4.3.3 控制操作
控制操作主要包括故障恢復、數(shù)據(jù)持久性,以及移除數(shù)據(jù)。其中,緩存操作Cache/Pesist是惰性的,在進行執(zhí)行操作時才會執(zhí)行,而Unpesist是即時的,會立即釋放內存。checkpoint會直接將RDD持久化到磁盤或HDFS等路徑,不同于Cache/Persist的是,被checkpoint的RDD不會因作業(yè)的結束而被消除,會一直存在,并可以被后續(xù)的作業(yè)直接讀取并加載。
1. RDD故障恢復
在一個典型的分布式系統(tǒng)中,容錯機制主要是采取檢查點(checkpoint)機制和數(shù)據(jù)備份機制。故障恢復是由主動檢查,以及不同機器之間的數(shù)據(jù)復制實現(xiàn)的。由于進行故障恢復需要跨集群網絡來復制大量數(shù)據(jù),這無疑是相當昂貴的。因此,在Spark中則采取了不同的方法進行故障恢復。
作為一個大型的分布式集群,Spark針對工作負載會做出兩種假設:
□處理時間是有限的;
□保持數(shù)據(jù)持久性是外部數(shù)據(jù)源的職責,主要是讓處理過程中的數(shù)據(jù)保持穩(wěn)定。
基于假設,Spark在執(zhí)行期間發(fā)生數(shù)據(jù)丟失時會選擇折中方案,它會重新執(zhí)行之前的步驟來恢復丟失的數(shù)據(jù),但并不是說丟棄之前所有已經完成的工作,而重新開始再來一遍。
假如其中一個RDD壞掉,RDD中有記錄之前的依賴關系,且依賴關系中記錄算子和分區(qū)。此時,僅僅需要再執(zhí)行一遍父RDD的相應分區(qū)。
但是,跨寬依賴的再執(zhí)行能夠涉及多個父RDD,從而引發(fā)全部的再執(zhí)行。為了規(guī)避這一點,Spark會保持Map階段中間數(shù)據(jù)輸出的持久,在機器發(fā)生故障的情況下,再執(zhí)行只需要回溯Mapper持續(xù)輸出的相應分區(qū),來獲取中間數(shù)據(jù)。
Spark還提供了數(shù)據(jù)檢查點和記錄日志,用于持久化中間RDD,這樣再執(zhí)行就不必追溯到最開始的階段。通過比較恢復延遲和檢查點開銷進行權衡,Spark會自動化地選擇相應的策略進行故障恢復。
2. RDD持久化
Spark的持久化,是指在不同轉換操作之間,將過程數(shù)據(jù)緩存在內存中,實現(xiàn)快速重用,或者故障快速恢復。持久化主要分為兩類,主動持久化和自動持久化。
主動持久化,主要目標是RDD重用,從而實現(xiàn)快速處理,是Spark構建迭代算法的關鍵。例如,持久化一個RDD,每一個節(jié)點都將把它的計算分塊結果保存在內存中,并在該數(shù)據(jù)集(或者衍生數(shù)據(jù)集)進行的后續(xù)Action中重用,使得后續(xù)Action執(zhí)行變得更加迅速(通常快10倍)。
可以使用persist()方法標記一個持久化的RDD,一旦被一個執(zhí)行(action)觸發(fā)計算,它將會被保留在計算節(jié)點的內存中并重用。如果RDD的任一分區(qū)丟失,通過使用原先創(chuàng)建的轉換操作,它將會被自動重算,不需要全部重算,而只計算丟失的部分。
此外,每一個RDD都可以用不同的保存級別進行保存,從而允許持久化數(shù)據(jù)集在硬盤或內存作為序列化的Java對象(節(jié)省空間),甚至跨節(jié)點復制。
持久化的等級選擇,是通過將一個StorageLevel對象傳遞給persist()方法進行確定的,cache()方法調用persist()的默認級別MEMORY_ONLY。表4-6是持久化的等級。
表4-6 持久化的等級

相對于MEMORY_ONLY_SER,OFF_HEAP減小了垃圾回收的開銷,同時也允許Executor變得更小且可共享內存儲備,Executor的崩潰不會導致內存中的緩存丟失。在這種模式下,Tachyon中的內存是不可丟棄的。
自動持久化,是指不需要用戶調用persist(),Spark自動地保存一些Shuffle操作(如reduceByKey)的中間結果。這樣做是為了避免在Shuffle過程中一個節(jié)點崩潰時重新計算所有的輸入。
持久化時,一旦設置了就不能改變,想要改變就要先去持久化。推薦用戶在重用RDD結果時調用Persist,這樣會使持久化變得可控。
Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而檢查點在持久化的同時切斷Lineage,修改了RDD的meta info中的Lineage。二者均返回經過修改的RDD對象自身,而非新的RDD對象,也均屬于Lazy操作。
3. 選擇存儲等級
Spark的不同存儲級別,旨在滿足內存使用和CPU效率權衡上的不同需求,建議通過以下步驟進行選擇:
□如果你的RDD可以很好地與默認的存儲級別(MEMORY_ONLY)契合,那么就不需要做任何修改。這已經是CPU使用效率最高的選項,它使RDD的操作盡可能快。
□如果不能與MEMORY_ONLY很好地契合,建議使用MEMORY_ONLY_SER并選擇一個快速序列化的庫,使對象在有較高空間使用率的情況下,依然可以較快地被訪問。
□盡可能不要存儲數(shù)據(jù)到硬盤上,除非計算數(shù)據(jù)集的函數(shù),計算量特別大,或者它們過濾了大量的數(shù)據(jù)。否則,重新計算一個分區(qū)的速度與從硬盤中讀取的效率差不多。
□如果想擁有快速故障恢復能力,可使用復制存儲級別(例如,用Spark來響應Web應用的請求)。所有的存儲級別都有通過重新計算丟失數(shù)據(jù)恢復錯誤的容錯機制,但是復制存儲級別可以讓你在RDD上持續(xù)地運行任務,而不需要等待丟失的分區(qū)被重新計算。
□如果想要定義自己的存儲級別(如復制因子為3而不是2),可以使用StorageLevel單例對象的apply()方法。
4. 移除數(shù)據(jù)
RDD可以隨意在RAM中進行緩存,因此它提供了更快速的數(shù)據(jù)訪問。目前,緩存的粒度為RDD級別,只能緩存全部的RDD。
Spark自動監(jiān)視每個節(jié)點上使用的緩存,在集群中沒有足夠的內存時,Spark會根據(jù)緩存情況確定一個LRU(Least Recently Used,最近最少使用算法)的數(shù)據(jù)分區(qū)進行刪除。
如果想手動刪除RDD,而不想等待它從緩存中消失,可以使用RDD的unpersist()方法移除數(shù)據(jù),unpersist()方法是立即生效的。
- Java Data Science Cookbook
- 復雜性思考:復雜性科學和計算模型(原書第2版)
- 從0到1:數(shù)據(jù)分析師養(yǎng)成寶典
- 深入淺出MySQL:數(shù)據(jù)庫開發(fā)、優(yōu)化與管理維護(第2版)
- 區(qū)塊鏈:看得見的信任
- 數(shù)亦有道:Python數(shù)據(jù)科學指南
- Flutter Projects
- 科研統(tǒng)計思維與方法:SPSS實戰(zhàn)
- INSTANT Android Fragmentation Management How-to
- 數(shù)據(jù)庫技術及應用
- 新手學會計(2013-2014實戰(zhàn)升級版)
- SQL Server深入詳解
- 區(qū)域云計算和大數(shù)據(jù)產業(yè)發(fā)展:浙江樣板
- 深入理解InfluxDB:時序數(shù)據(jù)庫詳解與實踐
- Doris實時數(shù)倉實戰(zhàn)