官术网_书友最值得收藏!

3.3 RDD依賴關系

RDD依賴關系為成兩種:窄依賴(Narrow Dependency)、寬依賴(Shuffle Dependency)。窄依賴表示每個父RDD中的Partition最多被子RDD的一個Partition所使用;寬依賴表示一個父RDD的Partition都會被多個子RDD的Partition所使用。

3.3.1 窄依賴解析

RDD的窄依賴(Narrow Dependency)是RDD中最常見的依賴關系,用來表示每一個父RDD中的Partition最多被子RDD的一個Partition所使用,如圖3-1窄依賴關系圖所示,父RDD有2~3個Partition,每一個分區都只對應子RDD的一個Partition(join with inputs co-partitioned:對數據進行基于相同Key的數值相加)。

圖3-1 窄依賴關系圖

窄依賴分為兩類:第一類是一對一的依賴關系,在Spark中用OneToOneDependency來表示父RDD與子RDD的依賴關系是一對一的依賴關系,如map、filter、join with inputs co-partitioned;第二類是范圍依賴關系,在Spark中用RangeDependency表示,表示父RDD與子RDD的一對一的范圍內依賴關系,如union。

OneToOneDependency依賴關系的Dependency.scala的源碼如下。

1.   class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T]
     (rdd) {
2.    override def getParents(partitionId: Int): List[Int] = List(partitionId)
3.  }

OneToOneDependency的getParents重寫方法引入了參數partitionId,而在具體的方法中也使用了這個參數,這表明子RDD在使用getParents方法的時候,查詢的是相同partitionId的內容。也就是說,子RDD僅僅依賴父RDD中相同partitionID的Partition。

Spark窄依賴中第二種依賴關系是RangeDependency。Dependency.scala的RangeDependency的源碼如下。

1.   class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
2.    extends NarrowDependency[T](rdd) {
3.
4.    override def getParents(partitionId: Int): List[Int] = {
5.      if (partitionId >= outStart && partitionId < outStart + length) {
6.        List(partitionId - outStart + inStart)
7.      } else {
8.        Nil
9.      }
10.   }
11. }

RangeDependency和OneToOneDependency最大的區別是實現方法中出現了outStart、length、instart,子RDD在通過getParents方法查詢對應的Partition時,會根據這個partitionId減去插入時的開始ID,再加上它在父RDD中的位置ID,換而言之,就是將父RDD中的Partition,根據partitionId的順序依次插入到子RDD中。

分析完Spark中的源碼,下邊通過兩個例子來講解從實例角度去看RDD窄依賴輸出的結果。

對于OneToOneDependency,采用map操作進行實驗,實驗代碼和結果如下所示。

1.  def main (args: Array[String]) {
2.  val num1 = Array(100,80,70)
3.  val rddnum1 = sc.parallelize(num1)
4.  val mapRdd = rddnum1.map(_*2)
5.  mapRdd.collect().foreach(println)
6.  }

結果為200 160 140。

對于RangeDependency,采用union操作進行實驗,實驗代碼和結果如下所示。

1.  def main (args: Array[String]) {
2.   //創建數組1
3.  val data1= Array("spark","scala","hadoop")
4.    //創建數組2
5.  val data2=Array("SPARK","SCALA","HADOOP")
6.   //將數組1的數據形成RDD1
7.  val rdd1 = sc.parallelize(data1)
8.    //將數組2的數據形成RDD2
9.  val rdd2=sc.parallelize(data2)
10.   //把RDD1與RDD2聯合
11. val unionRdd = rdd1.union(rdd2)
12.   //將結果收集并輸出
13. unionRdd.collect().foreach(println)
14. }

結果為spark scala hadoop SPARK SCALA HADOOP。

3.3.2 寬依賴解析

RDD的寬依賴(Shuffle Dependency)是一種會導致計算時產生Shuffle操作的RDD操作,用來表示一個父RDD的Partition都會被多個子RDD的Partition使用,如圖3-2寬依賴關系圖中groupByKey算子操作所示,父RDD有3個Partition,每個Partition中的數據會被子RDD中的兩個Partition使用。

圖3-2 寬依賴關系圖

寬依賴的源碼位于Dependency.scala文件的ShuffleDependency方法中,newShuffleId()產生了新的shuffleId,表明寬依賴過程需要涉及shuffle操作,后續的代碼表示寬依賴進行時的shuffle操作需要向shuffleManager注冊信息。

Dependency.scala的ShuffleDependency的源碼如下。

1.   @DeveloperApi
2.  class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
3.      @transient private val _rdd: RDD[_ <: Product2[K, V]],
4.      val partitioner: Partitioner,
5.      val serializer: Serializer = SparkEnv.get.serializer,
6.      val keyOrdering: Option[Ordering[K]] = None,
7.      val aggregator: Option[Aggregator[K, V, C]] = None,
8.      val mapSideCombine: Boolean = false)
9.    extends Dependency[Product2[K, V]] {
10.
11.   override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2
      [K, V]]]
12.
13.   private[spark] val keyClassName: String = reflect.classTag[K].
      runtimeClass.getName
14.   private[spark] val valueClassName: String = reflect.classTag[V].
      runtimeClass.getName
15.   //如果在PairRDDFunctions方法中使用combineBykeyWithClassTag, combiner
16.   //類標簽可能是空的
17.   private[spark] val combinerClassName: Option[String] =
18.     Option(reflect.classTag[C]).map(_.runtimeClass.getName)
19.
20.   val shuffleId: Int = _rdd.context.newShuffleId()
21.
22.   val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.
      registerShuffle(
23.     shuffleId, _rdd.partitions.length, this)
24.
25.   _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
26. }

Spark中寬依賴關系非常常見,其中較經典的操作為GroupByKey(將輸入的key-value類型的數據進行分組,對相同key的value值進行合并,生成一個tuple2,如圖3-3所示),具體代碼和操作結果如下所示。輸入5個tuple2類型的數據,通過運行產生3個tuple2數據。

1.  def main (args: Array[String]) {
2.    //設置輸入的Tuple2數組
3.  val data = Array(Tuple2("spark",100),Tuple2("spark",95),
4.  Tuple2("hadoop",99),Tuple2("hadoop",80),Tuple2("scala",75))
5.    //將數組內容轉化為RDD
6.  val rdd = sc.parallelize(data)
7.    //對RDD進行groupByKey操作
8.  val rddGrouped=rdd.groupByKey()
9.    //輸出結果
10. rddGrouped.collect.foreach(println)
11. }

操作結果如圖3-3所示。

圖3-3 GroupByKey結果

主站蜘蛛池模板: 贵港市| 安义县| 江门市| 北海市| 郧西县| 黄浦区| 凤山市| 饶河县| 寿光市| 雷山县| 兖州市| 西安市| 丹东市| 佛冈县| 修武县| 垦利县| 荥阳市| 阳谷县| 德昌县| 团风县| 长沙县| 鄂州市| 开封县| 克什克腾旗| 浦江县| 高碑店市| 蓬安县| 宕昌县| 怀仁县| 陵水| 巫溪县| 洪江市| 炉霍县| 房山区| 汉寿县| 舞阳县| 柳江县| 沙雅县| 年辖:市辖区| 绥化市| 上犹县|