- Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優
- 王家林
- 1295字
- 2019-12-12 17:29:53
3.3 RDD依賴關系
RDD依賴關系為成兩種:窄依賴(Narrow Dependency)、寬依賴(Shuffle Dependency)。窄依賴表示每個父RDD中的Partition最多被子RDD的一個Partition所使用;寬依賴表示一個父RDD的Partition都會被多個子RDD的Partition所使用。
3.3.1 窄依賴解析
RDD的窄依賴(Narrow Dependency)是RDD中最常見的依賴關系,用來表示每一個父RDD中的Partition最多被子RDD的一個Partition所使用,如圖3-1窄依賴關系圖所示,父RDD有2~3個Partition,每一個分區都只對應子RDD的一個Partition(join with inputs co-partitioned:對數據進行基于相同Key的數值相加)。

圖3-1 窄依賴關系圖
窄依賴分為兩類:第一類是一對一的依賴關系,在Spark中用OneToOneDependency來表示父RDD與子RDD的依賴關系是一對一的依賴關系,如map、filter、join with inputs co-partitioned;第二類是范圍依賴關系,在Spark中用RangeDependency表示,表示父RDD與子RDD的一對一的范圍內依賴關系,如union。
OneToOneDependency依賴關系的Dependency.scala的源碼如下。
1. class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T] (rdd) { 2. override def getParents(partitionId: Int): List[Int] = List(partitionId) 3. }
OneToOneDependency的getParents重寫方法引入了參數partitionId,而在具體的方法中也使用了這個參數,這表明子RDD在使用getParents方法的時候,查詢的是相同partitionId的內容。也就是說,子RDD僅僅依賴父RDD中相同partitionID的Partition。
Spark窄依賴中第二種依賴關系是RangeDependency。Dependency.scala的RangeDependency的源碼如下。
1. class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) 2. extends NarrowDependency[T](rdd) { 3. 4. override def getParents(partitionId: Int): List[Int] = { 5. if (partitionId >= outStart && partitionId < outStart + length) { 6. List(partitionId - outStart + inStart) 7. } else { 8. Nil 9. } 10. } 11. }
RangeDependency和OneToOneDependency最大的區別是實現方法中出現了outStart、length、instart,子RDD在通過getParents方法查詢對應的Partition時,會根據這個partitionId減去插入時的開始ID,再加上它在父RDD中的位置ID,換而言之,就是將父RDD中的Partition,根據partitionId的順序依次插入到子RDD中。
分析完Spark中的源碼,下邊通過兩個例子來講解從實例角度去看RDD窄依賴輸出的結果。
對于OneToOneDependency,采用map操作進行實驗,實驗代碼和結果如下所示。
1. def main (args: Array[String]) { 2. val num1 = Array(100,80,70) 3. val rddnum1 = sc.parallelize(num1) 4. val mapRdd = rddnum1.map(_*2) 5. mapRdd.collect().foreach(println) 6. }
結果為200 160 140。
對于RangeDependency,采用union操作進行實驗,實驗代碼和結果如下所示。
1. def main (args: Array[String]) { 2. //創建數組1 3. val data1= Array("spark","scala","hadoop") 4. //創建數組2 5. val data2=Array("SPARK","SCALA","HADOOP") 6. //將數組1的數據形成RDD1 7. val rdd1 = sc.parallelize(data1) 8. //將數組2的數據形成RDD2 9. val rdd2=sc.parallelize(data2) 10. //把RDD1與RDD2聯合 11. val unionRdd = rdd1.union(rdd2) 12. //將結果收集并輸出 13. unionRdd.collect().foreach(println) 14. }
結果為spark scala hadoop SPARK SCALA HADOOP。
3.3.2 寬依賴解析
RDD的寬依賴(Shuffle Dependency)是一種會導致計算時產生Shuffle操作的RDD操作,用來表示一個父RDD的Partition都會被多個子RDD的Partition使用,如圖3-2寬依賴關系圖中groupByKey算子操作所示,父RDD有3個Partition,每個Partition中的數據會被子RDD中的兩個Partition使用。

圖3-2 寬依賴關系圖
寬依賴的源碼位于Dependency.scala文件的ShuffleDependency方法中,newShuffleId()產生了新的shuffleId,表明寬依賴過程需要涉及shuffle操作,后續的代碼表示寬依賴進行時的shuffle操作需要向shuffleManager注冊信息。
Dependency.scala的ShuffleDependency的源碼如下。
1. @DeveloperApi 2. class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( 3. @transient private val _rdd: RDD[_ <: Product2[K, V]], 4. val partitioner: Partitioner, 5. val serializer: Serializer = SparkEnv.get.serializer, 6. val keyOrdering: Option[Ordering[K]] = None, 7. val aggregator: Option[Aggregator[K, V, C]] = None, 8. val mapSideCombine: Boolean = false) 9. extends Dependency[Product2[K, V]] { 10. 11. override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2 [K, V]]] 12. 13. private[spark] val keyClassName: String = reflect.classTag[K]. runtimeClass.getName 14. private[spark] val valueClassName: String = reflect.classTag[V]. runtimeClass.getName 15. //如果在PairRDDFunctions方法中使用combineBykeyWithClassTag, combiner 16. //類標簽可能是空的 17. private[spark] val combinerClassName: Option[String] = 18. Option(reflect.classTag[C]).map(_.runtimeClass.getName) 19. 20. val shuffleId: Int = _rdd.context.newShuffleId() 21. 22. val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager. registerShuffle( 23. shuffleId, _rdd.partitions.length, this) 24. 25. _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) 26. }
Spark中寬依賴關系非常常見,其中較經典的操作為GroupByKey(將輸入的key-value類型的數據進行分組,對相同key的value值進行合并,生成一個tuple2,如圖3-3所示),具體代碼和操作結果如下所示。輸入5個tuple2類型的數據,通過運行產生3個tuple2數據。
1. def main (args: Array[String]) { 2. //設置輸入的Tuple2數組 3. val data = Array(Tuple2("spark",100),Tuple2("spark",95), 4. Tuple2("hadoop",99),Tuple2("hadoop",80),Tuple2("scala",75)) 5. //將數組內容轉化為RDD 6. val rdd = sc.parallelize(data) 7. //對RDD進行groupByKey操作 8. val rddGrouped=rdd.groupByKey() 9. //輸出結果 10. rddGrouped.collect.foreach(println) 11. }
操作結果如圖3-3所示。

圖3-3 GroupByKey結果
- 課課通計算機原理
- Big Data Analytics with Hadoop 3
- Dreamweaver CS3+Flash CS3+Fireworks CS3創意網站構建實例詳解
- Microsoft Power BI Quick Start Guide
- 我的J2EE成功之路
- Expert AWS Development
- 工業機器人現場編程(FANUC)
- Implementing Oracle API Platform Cloud Service
- 完全掌握AutoCAD 2008中文版:機械篇
- Excel 2007技巧大全
- 氣動系統裝調與PLC控制
- 軟件工程及實踐
- 中國戰略性新興產業研究與發展·增材制造
- Wireshark Revealed:Essential Skills for IT Professionals
- 歐姆龍PLC應用系統設計實例精解