官术网_书友最值得收藏!

9.1 Spark中Cache原理和源碼詳解

本節對Spark中Cache原理及Spark中Cache源碼進行詳解。

9.1.1 Spark中Cache原理詳解

Spark中Cache機制原理:首先,RDD是通過iterator進行計算的。

(1)CacheManager會通過BlockManager從Local或者Remote獲取數據直接通過RDD的compute進行計算,有可能需要考慮checkpoint。

(2)通過BlockManager首先從本地獲取數據,如果得不到數據,就會從遠程獲取數據。

(3)首先查看當前的RDD是否進行了checkpoint,如果進行了的話,就直接讀取checkpoint的數據,否則必須進行計算;因為此時RDD需要緩存,所以計算如果需要,則通過BlockManager再次進行持久化。

(4)如果持久化的時候只是緩存到磁盤中,就直接使用BlockManager的doPut方法寫入磁盤(需要考慮Replication)。

(5)如果指定內存作緩存,優先保存到內存中,此時會使用MemoryStore.unrollSafely方法來嘗試安全地將數據保存在內存中,如果內存不夠,會使用一個方法來整理一部分內存空間,然后基于整理出來的內存空間放入我們想緩存的最新數據。

(6)直接通過RDD的compute進行計算,有可能需要考慮checkpoint。

Spark中,Cache原理示意圖如圖9-1所示。

9.1.2 Spark中Cache源碼詳解

CacheManager管理是緩存,而緩存可以是基于內存的緩存,也可以是基于磁盤的緩存。CacheManager需要通過BlockManager來操作數據。

Task發生計算時要調用RDD的compute進行計算。下面看一下MapPartitionsRDD的 compute方法。

圖9-1 Cache原理示意圖

MapPartitionsRDD的源碼如下。

1.   private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
2.      var prev: RDD[T],
3.      f: (TaskContext, Int, Iterator[T]) => Iterator[U],  //(TaskContext,
        partition index, iterator)
4.      preservesPartitioning: Boolean = false)
5.    extends RDD[U](prev) {
6.
7.    override val partitioner = if (preservesPartitioning) firstParent[T].
      partitioner else None
8.
9.    override def getPartitions: Array[Partition] = firstParent[T].
      partitions
10.
11.   override def compute(split: Partition, context: TaskContext):
      Iterator[U] =
12.     f(context, split.index, firstParent[T].iterator(split, context))
13.
14.   override def clearDependencies() {
15.     super.clearDependencies()
16.     prev = null
17.   }
18. }

compute真正計算的時候通過iterator計算,MapPartitionsRDD的iterator依賴父RDD計算。iterator是RDD內部的方法,如有緩存,將從緩存中讀取數據,否則進行計算。這不是被用戶直接調用,但可用于實現自定義子RDD。

RDD.scala的iterator方法如下。

1.  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
2.    if (storageLevel != StorageLevel.NONE) {
3.      getOrCompute(split, context)
4.    } else {
5.      computeOrReadCheckpoint(split, context)
6.    }
7.  }

RDD.scala的iterator方法中判斷storageLevel != StorageLevel.NONE,說明數據可能存放在內存、磁盤中,調用getOrCompute(split, context)方法。如果之前計算過一次,再次計算可以找CacheManager要數據。

RDD.scala的getOrCompute的源碼如下。

1.   private[spark] def getOrCompute(partition: Partition, context:
     TaskContext): Iterator[T] = {
2.      val blockId = RDDBlockId(id, partition.index)
3.      var readCachedBlock = true
4.      //這種方法被Executors調用,所以我們需要調用SparkEnv.get代替sc.env
5.      SparkEnv.get.blockManager.getOrElseUpdate(blockId,               storageLevel,
        elementClassTag, () => {
6.        readCachedBlock = false
7.        computeOrReadCheckpoint(partition, context)
8.      }) match {
9.        case Left(blockResult) =>
10.         if (readCachedBlock) {
11.           val existingMetrics = context.taskMetrics().inputMetrics
12.           existingMetrics.incBytesRead(blockResult.bytes)
13.           new InterruptibleIterator[T](context, blockResult.data.
              asInstanceOf[Iterator[T]]) {
14.             override def next(): T = {
15.               existingMetrics.incRecordsRead(1)
16.               delegate.next()
17.             }
18.           }
19.         } else {
20.           new InterruptibleIterator(context, blockResult.data.asInstanceOf
              [Iterator[T]])
21.         }
22.       case Right(iter) =>
23.         new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
24.     }
25.   }

在有緩存的情況下,緩存可能基于內存,也可能基于磁盤,getOrCompute獲取緩存;如沒有緩存,則需重新計算RDD。為何需要重新計算?如果數據放在內存中,假設緩存了100萬個數據分片,下一個步驟計算的時候需要內存,因為需要進行計算的內存空間占用比之前緩存的數據占用內存空間重要,假設須騰出10000個數據分片所在的空間,因此從BlockManager中將內存中的緩存數據drop到磁盤上,如果不是內存和磁盤的存儲級別,那10000個數據分片的緩存數據就可能丟失,99萬個數據分片可以復用,而這10000個數據分片須重新進行計算。

Cache在工作的時候會最大化地保留數據,但是數據不一定絕對完整,因為當前的計算如果需要內存空間,那么Cache在內存中的數據必須讓出空間,此時如何在RDD持久化的時候同時指定可以把數據放在Disk上,那么部分Cache的數據就可以從內存轉入磁盤,否則數據就會丟失。

getOrCompute方法返回的是Iterator。進行Cache以后,BlockManager對其進行管理,通過blockId可以獲得曾經緩存的數據。具體CacheManager在獲得緩存數據的時候會通過BlockManager來抓到數據。

getOrElseUpdate方法中,如果block存在,檢索給定的塊block;如果不存在,則調用提供makeIterator方法計算塊block,對塊block進行持久化,并返回block的值。

BlockManager.scala的getOrElseUpdate的源碼如下。

1.     def getOrElseUpdate[T](
2.       blockId: BlockId,
3.       level: StorageLevel,
4.       classTag: ClassTag[T],
5.       makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
6.     //嘗試從本地或遠程存儲讀取塊。如果它存在,那么我們就不需要通過本地get或put路
       //徑獲取
7.     get[T](blockId)(classTag) match {
8.       case Some(block) =>
9.         return Left(block)
10.      case _ =>
11.        //需要計算塊
12.    }
13.    //需要計算blockInitially,在塊上我們沒有鎖
14.    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock =
       true) match {
15.      case None =>
16.        //doput()方法沒有返回,所以塊已存在或者已成功存儲。因此,我們現在在塊上持有
           //讀取鎖
17.        val blockResult = getLocalValues(blockId).getOrElse {
18.     //在doPut()和get()方法調用的時候,我們持有讀取鎖,塊不應被驅逐,這樣,get()
        //方法沒返回塊,表示發生一些內部錯誤
19.          releaseLock(blockId)
20.          throw new SparkException(s"get() failed for block $blockId even
             though we held a lock")
21.        }
22.   //我們已經持有調用doPut()方法在塊上的讀取鎖,getLocalValues()再一次獲取鎖,
      //所以我們需要調用releaseLock(),這樣獲取鎖的數量是1(因為調用者只release()一次)
23.        releaseLock(blockId)
24.        Left(blockResult)
25.      case Some(iter) =>
26.   //輸入失敗,可能是因為數據太大而不能存儲在內存中,不能溢出到磁盤上。因此,我們需
      //要將輸入迭代器傳遞給調用者,他們可以決定如何處理這些值(例如,不緩存它們)
27.       Right(iter)
28.    }
29.  }

BlockManager.scala的getOrElseUpdate中根據blockId調用了get[T](blockId)方法,get方法從block塊管理器(本地或遠程)獲取一個塊block。如果塊在本地存儲且沒獲取鎖,則先獲取塊block的讀取鎖。如果該塊是從遠程塊管理器獲取的,當data迭代器被完全消費以后,那么讀取鎖將自動釋放。get的時候,如果本地有數據,從本地獲取數據返回;如果沒有數據,則從遠程節點獲取數據。

BlockManager.scala的get方法的源碼如下:

1.  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
2.    val local = getLocalValues(blockId)
3.    if (local.isDefined) {
4.      logInfo(s"Found block $blockId locally")
5.      return local
6.     }
7.     val remote = getRemoteValues[T](blockId)
8.     if (remote.isDefined) {
9.       logInfo(s"Found block $blockId remotely")
10.      return remote
11.    }
12.    None
13.  }

BlockManager的get方法從Local的角度講,如果數據在本地,get方法調用getLocalValues獲取數據。如果數據在內存中(level.useMemory且memoryStore包含了blockId),則從memoryStore中獲取數據;如果數據在磁盤中(level.useDisk且diskStore包含了blockId),則從diskStore中獲取數據。這說明數據在本地緩存,可以在內存中,也可以在磁盤上。

BlockManager的get方法從remote的角度講,get方法中將調用getRemoteValues方法。

BlockManager.Scala的getRemoteValues的源碼如下。

1.    private def getRemoteValues[T: ClassTag](blockId: BlockId): Option
      [BlockResult] = {
2.    val ct = implicitly[ClassTag[T]]
3.    getRemoteBytes(blockId).map { data =>
4.      val values =
5.        serializerManager.dataDeserializeStream(blockId, data.toInputStream
          (dispose = true))(ct)
6.      new BlockResult(values, DataReadMethod.Network, data.size)
7.    }
8.  }

getRemoteValues方法中調用getRemoteBytes方法,通過blockTransferService.fetchBlockSync從遠程節點獲取數據。

BlockManager.Scala的getRemoteBytes的源碼如下。

1.   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
2.     logDebug(s"Getting remote block $blockId")
3.     require(blockId != null, "BlockId is null")
4.     var runningFailureCount = 0
5.     var totalFailureCount = 0
6.     val locations = getLocations(blockId)
7.     val maxFetchFailures = locations.size
8.     var locationIterator = locations.iterator
9.     while (locationIterator.hasNext) {
10.      val loc = locationIterator.next()
11.      logDebug(s"Getting remote block $blockId from $loc")
12.      val data = try {
13.        blockTransferService.fetchBlockSync(
14.          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
15.      } catch {
16.        case NonFatal(e) =>
17.          runningFailureCount += 1
18.          totalFailureCount += 1
19.
20.          if (totalFailureCount >= maxFetchFailures) {
21.            //放棄嘗試的位置。要么我們已經嘗試了所有的原始位置,或者我們已經從master
               //節點刷新了位置列表,并且仍然在刷新列表中嘗試位置后命中失敗logWarning
               //(s"Failed to fetch block after $totalFailureCount fetch failures."+
               //s"Most recent failure cause:", e)
22.
23.            return None
24.          }
25.
26.          logWarning(s"Failed to fetch remote block $blockId " +
27.            s"from $loc (failed attempt $runningFailureCount)", e)
28.
29.    //如果有大量的Executors,那么位置列表可以包含一個舊的條目造成大量重試,可能花
       //費大量的時間。在一定數量的獲取失敗之后,為去掉這些舊的條目,我們刷新塊位置
30.          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
31.            locationIterator = getLocations(blockId).iterator
32.            logDebug(s"Refreshed locations from the driver " +
33.              s"after ${runningFailureCount} fetch failures.")
34.            runningFailureCount = 0
35.          }
36.
37.          //此位置失敗,所以我們嘗試從不同的位置獲取,這里返回一個null
38.
39.      }
40.
41.      if (data != null) {
42.        return Some(new ChunkedByteBuffer(data))
43.      }
44.      logDebug(s"The value of block $blockId is null")
45.    }
46.    logDebug(s"Block $blockId not found")
47.    None
48.  }

BlockManager的get方法,如果本地有數據,則從本地獲取數據返回;如果遠程有數據,則從遠程獲取數據返回;如果都沒有數據,就返回None。get方法的返回類型是Option[BlockResult],Option的結果分為兩種情況:①如果有內容,則返回Some[BlockResult;②如果沒有內容,則返回None。這是Option的基礎語法。

Option.scala的源碼如下。

1.    sealed abstract class Option[+A] extends Product with Serializable {
2.    self =>
3.  .....
4.  final case class Some[+A](x: A) extends Option[A] {
5.    def isEmpty = false
6.    def get = x
7.  }
8.
9.  .......
10. case object None extends Option[Nothing] {
11.   def isEmpty = true
12.   def get = throw new NoSuchElementException("None.get")
13. }

回到BlockManager的getOrElseUpdate方法,從get方法返回的結果進行模式匹配,如果有數據,則對Some(block)返回Left(block),這是獲取到block的情況;如果沒數據,則是None,須計算block。

回到RDD.scala的getOrCompute方法,在getOrCompute方法中調用SparkEnv.get. blockManager.getOrElseUpdate方法時,傳入blockId、storageLevel、elementClassTag,其中第四個參數是一個匿名函數,在匿名函數中調用了computeOrReadCheckpoint(partition, context)。然后在getOrElseUpdate方法中,根據blockId獲取數據,如果獲取到緩存數據,就返回;如果沒有數據,就調用doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true)進行計算,doPutIterator其中第二個參數makeIterator就是getOrElseUpdate方法中傳入的匿名函數,在匿名函數中獲取到Iterator數據。

RDD.getOrCompute中computeOrReadCheckpoint方法,如果RDD進行了checkpoint,則從父RDD的iterator中直接獲取數據;或者沒有Checkpoint物化,則重新計算RDD的數據。

RDD.scala的computeOrReadCheckpoint的源碼如下。

1.   private[spark] def computeOrReadCheckpoint(split: Partition, context:
     TaskContext): Iterator[T] =
2.  {
3.    if (isCheckpointedAndMaterialized) {
4.      firstParent[T].iterator(split, context)
5.    } else {
6.      compute(split, context)
7.    }
8.  }

BlockManager.scala的getOrElseUpdate方法中如果根據blockID沒有獲取到本地數據,則調用doPutIterator將通過BlockManager再次進行持久化。

BlockManager.scala的getOrElseUpdate方法的源碼如下。

1.   def getOrElseUpdate[T](
2.        blockId: BlockId,
3.        level: StorageLevel,
4.        classTag: ClassTag[T],
5.        makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
6.      //嘗試從本地或遠程存儲讀取塊。如果它存在,那么我們就不需要通過本地GET或PUT路
        //徑獲取
7.      get[T](blockId)(classTag) match {
8.        case Some(block) =>
9.          return Left(block)
10.       case _ =>
11.         //Need to compute the block.
12.     }
13.     //起初我們不鎖這個塊
14.     doPutIterator(blockId, makeIterator, level, classTag, keepReadLock =
        true) match {
15. .......

BlockManager.scala的getOrElseUpdate方法中調用了doPutIterator。doPutIterator將makeIterator從父RDD的checkpoint讀取的數據或者重新計算的數據存放到內存中,如果內存不夠,就溢出到磁盤中持久化。

Spark 2.1.1版本的BlockManager.scala的doPutIterator方法的源碼如下。

1.  private def doPutIterator[T](
2.    blockId: BlockId,
3.    iterator: () => Iterator[T],
4.    level: StorageLevel,
5.    classTag: ClassTag[T],
6.    tellMaster: Boolean = true,
7.    keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]]={
8.  doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock
    = keepReadLock) { info =>
9.   val startTimeMs = System.currentTimeMillis
10.  var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator
     [T]] = None
11.  //塊的大小為字節
12.  var size = 0L
13.  if (level.useMemory) {
14.    //首先把它放在內存中,即使useDisk設置為true;如果內存存儲不能保存,我們
       //稍后會把它放在磁盤上
15.    if (level.deserialized) {
16.      memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
         match {
17.        case Right(s) =>
18.          size = s
19.        case Left(iter) =>
20.          //沒有足夠的空間來展開塊;如果適用,可以溢出到磁盤
21.          if (level.useDisk) {
22.            logWarning(s"Persisting block $blockId to disk instead.")
23.            diskStore.put(blockId) { fileOutputStream =>
24.              serializerManager.dataSerializeStream(blockId,
                 fileOutputStream, iter)(classTag)
25.            }
26.            size = diskStore.getSize(blockId)
27.          } else {
28.            iteratorFromFailedMemoryStorePut = Some(iter)
29.          }
30.      }
31.    } else { //!level.deserialized
32.      memoryStore.putIteratorAsBytes(blockId, iterator(), classTag,
         level.memoryMode) match {
33.        case Right(s) =>
34.          size = s
35.        case Left(partiallySerializedValues) =>
36.          //沒有足夠的空間來展開塊;如果適用,可以溢出到磁盤
37.          if (level.useDisk) {
38.            logWarning(s"Persisting block $blockId to disk instead.")
39.            diskStore.put(blockId) { fileOutputStream =>
40.              partiallySerializedValues.finishWritingToStream
                 (fileOutputStream)
41.            }
42.            size = diskStore.getSize(blockId)
43.          } else {
44.            iteratorFromFailedMemoryStorePut = Some
               (partiallySerializedValues. valuesIterator)
45.          }
46.      }
47.    }
48.
49.  } else if (level.useDisk) {
50.    diskStore.put(blockId) { fileOutputStream =>
51.      serializerManager.dataSerializeStream(blockId,          fileOutputStream,
         iterator())(classTag)
52.    }
53.    size = diskStore.getSize(blockId)
54.  }
55.
56.  val putBlockStatus = getCurrentBlockStatus(blockId, info)
57.  val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
58.  if (blockWasSuccessfullyStored) {
59.    //現在塊位于內存或磁盤存儲中,通知master
60.        info.size = size
61.        if (tellMaster && info.tellMaster) {
62.          reportBlockStatus(blockId, putBlockStatus)
63.        }
64.        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
65.        logDebug("Put block %s locally took %s".format(blockId, Utils.
           getUsedTimeMs(startTimeMs)))
66.        if (level.replication > 1) {
67.          val remoteStartTime = System.currentTimeMillis
68.          val bytesToReplicate = doGetLocalBytes(blockId, info)
69.          //[SPARK-16550] 使用默認的序列化時擦除      classTag  類型,當反序列化類時
             //NettyBlockRpcServer崩潰。待辦事項(EKL)刪除遠程節點類裝載器的問題
             //已經修復val remoteClassTag = if (!serializerManager.canUseKryo
             //(classTag)) {
70.            scala.reflect.classTag[Any]
71.          } else {
72.            classTag
73.          }
74.          try {
75.            replicate(blockId, bytesToReplicate, level, remoteClassTag)
76.          } finally {
77.            bytesToReplicate.unmap()
78.          }
79.          logDebug("Put block %s remotely took %s"
80.            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
81.        }
82.      }
83.     assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.
        isEmpty)
84.      iteratorFromFailedMemoryStorePut
85.    }
86.  }

Spark 2.2.0版本的BlockManager.scala的doPutIterator方法的源碼與Spark 2.1.1版本相比具有如下特點。

 上段代碼中第23、39、50行fileOutputStream的名稱更新為channel。

 上段代碼中第23、39、50行之后新增加一行代碼val out = Channels.newOutputStream (channel)。

 上段代碼中第24、51行serializerManager.dataSerializeStream的第2個參數調整為out。

 上段代碼中第40行fileOutputStream參數調整為out。

 上段代碼中第77行bytesToReplicate.unmap()方法調整為bytesToReplicate.dispose()。

1.   ......
2.     diskStore.put(blockId) { channel =>
3.                    val out = Channels.newOutputStream(channel)
4.                   serializerManager.dataSerializeStream(blockId, out, iter)
                     (classTag)
5.                  }
6.  .......
7.  diskStore.put(blockId) { channel =>
8.                    val out = Channels.newOutputStream(channel)
9.                    partiallySerializedValues.finishWritingToStream(out)
10. .......
11.         diskStore.put(blockId) { channel =>
12.           val out = Channels.newOutputStream(channel)
13.          serializerManager.dataSerializeStream(blockId, out, iterator())
             (classTag)
14.         }
15. ......
16.             bytesToReplicate.dispose()
17. ........
主站蜘蛛池模板: 双城市| 朔州市| 青岛市| 卢湾区| 民勤县| 柳河县| 华池县| 深水埗区| 五华县| 元阳县| 木兰县| 托克托县| 清流县| 琼海市| 错那县| 洛南县| 山东省| 柘荣县| 会昌县| 金乡县| 双城市| 宕昌县| 崇州市| 高唐县| 嘉义市| 涟源市| 合肥市| 延边| 靖远县| 齐齐哈尔市| 泰安市| 会泽县| 罗田县| 竹溪县| 新昌县| 宁河县| 富民县| 阿巴嘎旗| 郑州市| 松潘县| 广德县|