官术网_书友最值得收藏!

10.2 Spark中Accumulator原理和源碼詳解

本節講解Spark中Accumulator原理及對Spark中Accumulator源碼進行詳解。

10.2.1 Spark中Accumulator原理詳解

Spark的Broadcast和Accumulator很重要,在實際的企業級開發環境中一般會使用Broadcast和Accumulator。Broadcast、Accumulator和RDD是Spark中并列的三大基礎數據結構。大家談Spark的時候,首先談RDD。RDD是一個并行的數據,關注在JVM中怎么處理數據。很多時候可能忽略了Broadcast和Accumulator,這兩個變量都是全局級別的。例如,集群中有1000臺機器,那Broadcast和Accumulator可以在1000臺機器中共享。在分布式的基礎上,如果有共享的數據結構,那是非常有用的。

分布式大數據系統中,進行編程的時候首先考慮數據結構。

 RDD:分布式私有數據結構。RDD本身是一個并行化的、本地化的數據結構,運行時在一個個線程中運行,RDD是私有的運行數據和私有的運行過程,但在一個Stage里面是一樣的,一個線程一個時刻只處理一個數據分片,另一個線程一個時刻只處理另一個數據片。在設計業務邏輯的時候,我們通常考慮這個分片如何去處理。

 Broadcast:分布式全局只讀數據結構。

 Accumulator:分布式全局只寫的數據結構。我們不會在線程池中讀取Accumulator,但在Driver上可以讀取Accumulator。

在生產環境下,我們一定會自定義Accumulator。

(1)自定義時可以讓Accumulator非常復雜,基本上可以是任意類型的Java和Scala對象。

(2)自定義Accumulator時,可以實現一些“技術福利”。例如,在Accumulator變化的時候可以把數據同步到MySQL中。我們在進行流處理的時候,數據不斷地流進來,如要查詢用戶點擊量的趨勢圖,計算點擊量以后須實時反饋到生產環境的server上。一個非常簡單的實現方式是:每次發現累加的時候,就更新一下數據庫,這是一個非常強大的同步機制和同步效果。

10.2.2 Spark中Accumulator源碼詳解

Accumulator是一個簡單的value值[Accumulable],相同類型的元素合并時結果可以累加,通過added到關聯和交換操作,可以有效地支持并行,可以用來實現計數(如MapReduce)或求和。Spark原生支持數值類型的累加器,也可以自定義開發實現新類型的支持。

累加器由一個初始值V通過調用[SparkContext#accumulator SparkContext.accumulator]創建。在群集上運行的任務可以使用“+=”運算符寫入,但是不能讀取它的值。只有Driver程序使用[#value]方法可以讀取累加器的值。例如:

1.   scala> val accum = sc.accumulator(0)
2.  accum: org.apache.spark.Accumulator[Int] = 0
3.  scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
4.  ...
5.  10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
6.
7.  scala> accum.value
8.  res2: Int = 10

Accumulator.scala的源碼如下。

1.    @deprecated("use AccumulatorV2", "2.0.0")
2.  class Accumulator[T] private[spark] (
3.      // SI-8813: 必須顯式地定義private val,否則Scala 2.11不編譯
4.      @transient private val initialValue: T,
5.      param: AccumulatorParam[T],
6.      name: Option[String] = None,
7.      countFailedValues: Boolean = false)
8.    extends Accumulable[T, T](initialValue, param, name, countFailedValues)
9.  ......

Accumulator是一個類,繼承自Accumulable。Accumulator已經被標識為過時的(deprecated),在Spark 2.0版本中可以使用AccumulatorV2。

1.   abstract class AccumulatorV2[IN, OUT] extends Serializable {
2.    private[spark] var metadata: AccumulatorMetadata = _
3.    private[this] var atDriverSide = true
4.  ......

可以通過繼承創建自己的類型AccumulatorV2。AccumulatorV2抽象類有幾種方法必須覆蓋:reset用于將累加器重置為零,add用于將另一個值添加到累加器中,merge用于將另一個相同類型的累加器合并到該累加器中。例如,假設有一個MyVector代表數學向量的類,代碼如下:

1.   class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
2.
3.    private val myVector: MyVector = MyVector.createZeroVector
4.
5.    def reset(): Unit = {
6.      myVector.reset()
7.    }
8.
9.    def add(v: MyVector): Unit = {
10.     myVector.add(v)
11.   }
12.   ...
13. }
14.
15. //創建一個這種類型的累加器
16. val myVectorAcc = new VectorAccumulatorV2
17. //然后,把它注冊到Spark上下文中
18. sc.register(myVectorAcc, "MyVectorAcc1")

當自定義自己的AccumulatorV2類型時,生成的類型可能與添加的元素的類型不同。累加器更新僅在Action動作內執行,Spark保證每個任務對累加器的更新只能應用一次,即重新啟動的任務將不會更新該值。在transformations轉換中,如果重新執行任務或作業階段,則每個任務的更新可能會被多次執行。Accumulators不會改變Spark的Lazy評估模型。如果它們在RDD的操作中更新,則只有在RDD作為操作的一部分進行計算時,才會更新其值。因此,累加器更新不能保證在Lazy變換中執行時執行map()。

以下代碼中,accum仍然為0 ,因為沒有action算子觸發map操作。

1.   val accum = sc.longAccumulator
2.  data.map { x => accum.add(x); x }
主站蜘蛛池模板: 宁远县| 武夷山市| 高安市| 报价| 太谷县| 莱阳市| 长岛县| 古田县| 葫芦岛市| 怀远县| 平度市| 苏尼特右旗| 威信县| 新津县| 沅陵县| 宜昌市| 阳信县| 连江县| 兴安盟| 南木林县| 罗山县| 麻阳| 监利县| 乐亭县| 平塘县| 肇东市| 峨眉山市| 余姚市| 洞口县| 阳春市| 连云港市| 定边县| 九台市| 嵊泗县| 万盛区| 西青区| 孝感市| 得荣县| 乌拉特后旗| 五大连池市| 开封县|