官术网_书友最值得收藏!

9.2 Spark中checkpoint原理和源碼詳解

本節(jié)對Spark中checkpoint原理及Spark中checkpoint源碼進(jìn)行詳解。

9.2.1 Spark中checkpoint原理詳解

checkpoint到底是什么?

(1)Spark在生產(chǎn)環(huán)境下經(jīng)常會面臨Tranformations的RDD非常多(例如,一個Job中包含10 000個RDD)或者具體Tranformation產(chǎn)生的RDD本身計(jì)算特別復(fù)雜和耗時(例如,計(jì)算時常超過1h),此時我們必須考慮對計(jì)算結(jié)果數(shù)據(jù)的持久化。

(2)Spark擅長多步驟迭代,同時擅長基于Job的復(fù)用,這時如果能夠?qū)υ?jīng)計(jì)算的過程產(chǎn)生的數(shù)據(jù)進(jìn)行復(fù)用,就可以極大地提升效率。

(3)如果采用persist把數(shù)據(jù)放在內(nèi)存中,雖然是最快速的,但是也是最不可靠的。如果放在磁盤上,也不是完全可靠的。例如,磁盤會損壞,管理員可能清空磁盤等。

(4)checkpoint的產(chǎn)生就是為了相對更加可靠地持久化數(shù)據(jù),checkpoint可以指定把數(shù)據(jù)放在本地并且是多副本的方式,但是在正常的生產(chǎn)情況下是放在HDFS,這就自然地借助HDFS高容錯、高可靠的特征完成了最大化的、可靠的持久化數(shù)據(jù)的方式。

(5)為確保RDD復(fù)用計(jì)算的可靠性,checkpoint把數(shù)據(jù)持久化到HDFS中,保證數(shù)據(jù)最大程度的安全性。

(6)checkpoint就是針對整個RDD計(jì)算鏈條中特別需要數(shù)據(jù)持久化的環(huán)節(jié)(后面會反復(fù)使用當(dāng)前環(huán)節(jié)的RDD)開始基于HDFS等的數(shù)據(jù)持久化復(fù)用策略,通過對RDD啟動checkpoint機(jī)制來實(shí)現(xiàn)容錯和高可用。

9.2.2 Spark中checkpoint源碼詳解

1.checkpoint的運(yùn)行原理和源碼實(shí)現(xiàn)徹底詳解

RDD進(jìn)行計(jì)算前須先看一下是否有checkpoint,如果有checkpoint,就不需要再進(jìn)行計(jì)算了。

RDD.scala的iterator方法的源碼如下。

1.      final def iterator(split: Partition, context: TaskContext):
        Iterator[T] = {
2.    if (storageLevel != StorageLevel.NONE) {
3.      getOrCompute(split, context)
4.    } else {
5.      computeOrReadCheckpoint(split, context)
6.    }
7.  }

進(jìn)入RDD.scala的getOrCompute方法,源碼如下。

1.  private[spark] def getOrCompute(partition: Partition, context: TaskContext):
    Iterator[T] = {
2.      val blockId = RDDBlockId(id, partition.index)
3.      var readCachedBlock = true
4.      //這種方法被Executors調(diào)用,所以我們需要調(diào)用SparkEnv.get代替sc.env
5.      SparkEnv.get.blockManager.getOrElseUpdate(blockId,               storageLevel,
        elementClassTag, () => {
6.        readCachedBlock = false
7.        computeOrReadCheckpoint(partition, context)
8.      }) match {

getOrCompute方法的getOrElseUpdate方法傳入的第四個參數(shù)是匿名函數(shù),調(diào)用computeOrReadCheckpoint(partition, context)檢查checkpoint中是否有數(shù)據(jù)。

RDD.scala的computeOrReadCheckpoint的源碼如下。

1.        private[spark]      def   computeOrReadCheckpoint(split:  Partition,
          context: TaskContext): Iterator[T] =
2.  {
3.    if (isCheckpointedAndMaterialized) {
4.      firstParent[T].iterator(split, context)
5.    } else {
6.      compute(split, context)
7.    }
8.  }

computeOrReadCheckpoint方法中的isCheckpointedAndMaterialized是一個布爾值,判斷這個RDD是否checkpointed和被物化,Spark 2.0 checkpoint中有兩種方式:reliably或者locally。computeOrReadCheckpoint作為isCheckpointed語義的別名返回值。

isCheckpointedAndMaterialized方法的源碼如下。

1.   private[spark] def isCheckpointedAndMaterialized: Boolean =
2.  checkpointData.exists(_.isCheckpointed)

回到RDD.scala的computeOrReadCheckpoint,如果已經(jīng)持久化及物化isCheckpointed-AndMaterialized,就調(diào)用firstParent[T]的iterator。如果沒有持久化,則進(jìn)行compute。

2.checkpoint原理機(jī)制

(1)通過調(diào)用SparkContext.setCheckpointDir方法指定進(jìn)行checkpoint操作的RDD把數(shù)據(jù)放在哪里,在生產(chǎn)集群中是放在HDFS上的,同時為了提高效率,在進(jìn)行checkpoint的使用時,可以指定很多目錄。

SparkContext為即將計(jì)算的RDD設(shè)置checkpoint保存的目錄。如果在集群中運(yùn)行,必須是HDFS的目錄路徑。

SparkContext.scala的setCheckpointDir的源碼如下。

1.     def setCheckpointDir(directory: String) {
2.
3.  /**
      *如果在集群上運(yùn)行,如目錄是本地的,則記錄一個警告。否則,driver可能會試圖從它自己
      *的本地文件系統(tǒng)重建RDD的checkpoint檢測點(diǎn),因?yàn)閏heckpoint檢查點(diǎn)文件不正確。
      *實(shí)際上是在Executor機(jī)器上
     */
4.     if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
5.       logWarning("Spark is not running in local mode, therefore the
         checkpoint directory " +
6.         s"must not be on the local filesystem. Directory '$directory' " +
7.         "appears to be on the local filesystem.")
8.     }
9.
10.    checkpointDir = Option(directory).map { dir =>
11.      val path = new Path(dir, UUID.randomUUID().toString)
12.      val fs = path.getFileSystem(hadoopConfiguration)
13.      fs.mkdirs(path)
14.      fs.getFileStatus(path).getPath.toString
15.    }
16.  }

RDD.scala的checkpoint方法標(biāo)記RDD的檢查點(diǎn)checkpoint。它將保存到SparkContext# setCheckpointDir的目錄檢查點(diǎn)內(nèi)的文件中,所有引用它的父RDDs將被移除。須在任何作業(yè)之前調(diào)用此函數(shù)。建議RDD在內(nèi)存中緩存,否則保存在文件中時需要重新計(jì)算。

RDD.scala的checkpoint的源碼如下。

1.  def checkpoint(): Unit = RDDCheckpointData.synchronized {
2.      //注意:我們在這里使用全局鎖,原因是下游的復(fù)雜性:子RDD分區(qū)指向正確的父分區(qū)。未
        //來我們應(yīng)該重新考慮這個問題
3.      if (context.checkpointDir.isEmpty) {
4.        throw new SparkException("Checkpoint directory has not been set in
          the SparkContext")
5.      } else if (checkpointData.isEmpty) {
6.        checkpointData = Some(new ReliableRDDCheckpointData(this))
7.      }
8.    }

其中的checkpointData是RDDCheckpointData。

1.  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

RDDCheckpointData標(biāo)識某個RDD要進(jìn)行checkpoint。如果某個RDD要進(jìn)行checkpoint,那在Spark框架內(nèi)部就會生成RDDCheckpointData。

1.   private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient
     private val rdd: RDD[T])
2.    extends Serializable {
3.
4.    import CheckpointState._
5.
6.    //相關(guān)的RDD檢查狀態(tài)
7.    protected var cpState = Initialized
8.
9.    //RDD包含檢查點(diǎn)數(shù)據(jù)
10.   private var cpRDD: Option[CheckpointRDD[T]] = None
11.
12.   //待辦事宜:確定需要在下面的方法中使用全局鎖嗎
13.
14.   /**
15.     *返回RDD的checkpoint數(shù)據(jù)是否已經(jīng)持久化
16.     */
17.   def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
18.     cpState == Checkpointed
19.   }
20.
21.   /**
22.     *物化RDD和持久化其內(nèi)容
23.     *RDD的第一個行動完成以后立即觸發(fā)調(diào)用
24.     */
25.   final def checkpoint(): Unit = {
26.    //防止多個線程同時對相同RDDCheckpointing,這RDDCheckpointData狀態(tài)自動翻轉(zhuǎn)
27.     RDDCheckpointData.synchronized {
28.       if (cpState == Initialized) {
29.         cpState = CheckpointingInProgress
30.       } else {
31.         return
32.       }
33.     }
34.
35.     val newRDD = doCheckpoint()
36.
37.     //更新我們的狀態(tài)和截?cái)郣DD的血統(tǒng)
38.     RDDCheckpointData.synchronized {
39.       cpRDD = Some(newRDD)
40.       cpState = Checkpointed
41.       rdd.markCheckpointed()
42.     }
43.   }
44.
45.   /**
46.     *物化RDD和持久化其內(nèi)容
47.     *
48.     *子類應(yīng)重寫此方法,以定義自定義檢查點(diǎn)行為
49.     * @return the Checkpoint RDD 在進(jìn)程中創(chuàng)建
50.     */
51.   protected def doCheckpoint(): CheckpointRDD[T]
52.   /**
        *返回包含我們的檢查點(diǎn)數(shù)據(jù)。如果checkpoint的狀態(tài)是Checkpointed,才定義
        */
53.
54.   def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.
      synchronized { cpRDD }
55.   /**
        *返回checkpoint RDD的分區(qū),僅用于測試
        */
56.
57.   def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
58.     cpRDD.map(_.partitions).getOrElse { Array.empty }
59.   }
60.
61. }
62. /**
      *同步檢查點(diǎn)操作的全局鎖
      */
63.
64. private[spark] object RDDCheckpointData

(2)在進(jìn)行RDD的checkpoint的時候,其所依賴的所有的RDD都會從計(jì)算鏈條中清空掉。

(3)作為最佳實(shí)踐,一般在進(jìn)行checkpoint方法調(diào)用前都要進(jìn)行persist把當(dāng)前RDD的數(shù)據(jù)持久化到內(nèi)存或者磁盤上,這是因?yàn)閏heckpoint是Lazy級別,必須有Job的執(zhí)行,且在Job執(zhí)行完成后,才會從后往前回溯哪個RDD進(jìn)行了checkpoint標(biāo)記,然后對標(biāo)記過的RDD新啟動一個Job執(zhí)行具體的checkpoint過程。

(4)checkpoint改變了RDD的Lineage。

(5)當(dāng)調(diào)用checkpoint方法要對RDD進(jìn)行checkpoint操作,此時框架會自動生成RDDCheckpointData,當(dāng)RDD上運(yùn)行過一個Job后,就會立即觸發(fā)RDDCheckpointData中的checkpoint方法,在其內(nèi)部會調(diào)用doCheckpoint,實(shí)際上在生產(chǎn)時會調(diào)用ReliableRDDCheckpointData的doCheckpoint,在生產(chǎn)過程中會導(dǎo)致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的調(diào)用,而在writeRDDToCheckpointDirectory方法內(nèi)部,會觸發(fā)runJob來執(zhí)行把當(dāng)前的RDD中的數(shù)據(jù)寫到checkpoint的目錄中,同時會產(chǎn)生ReliableCheckpointRDD實(shí)例。

RDDCheckpointData.scala的checkpoint方法進(jìn)行真正的checkpoint:在RDDCheckpointData. synchronized同步塊中先判斷cpState的狀態(tài),然后調(diào)用doCheckpoint()。

RDDCheckpointData.scala的checkpoint方法的源碼如下。

1.     final def checkpoint(): Unit = {
2.    //防止多個線程同時對相同RDDcheckpointing,這RDDCheckpointData狀態(tài)自動翻轉(zhuǎn)
3.  RDDCheckpointData.synchronized {
4.        if (cpState == Initialized) {
5.          cpState = CheckpointingInProgress
6.        } else {
7.          return
8.        }
9.      }
10.
11.     val newRDD = doCheckpoint()
12.
13.     //更新我們的狀態(tài)和截?cái)郣DD的血統(tǒng)
14.     RDDCheckpointData.synchronized {
15.       cpRDD = Some(newRDD)
16.       cpState = Checkpointed
17.       rdd.markCheckpointed()
18.     }
19.   }

其中的doCheckpoint方法是RDDCheckpointData.scala中的方法,這里沒有具體的實(shí)現(xiàn)。

1.  protected def doCheckpoint(): CheckpointRDD[T]

RDDCheckpointData的子類包括LocalRDDCheckpointData、ReliableRDDCheckpointData。ReliableRDDCheckpointData子類中doCheckpoint方法具體的實(shí)現(xiàn),在方法中進(jìn)行writeRDDToCheckpointDirectory的調(diào)用。

ReliableRDDCheckpointData.scala的doCheckpoint的源碼如下。

1.  protected override def doCheckpoint(): CheckpointRDD[T] = {
2.  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd,
    cpDir)
3.
4.      //如果引用超出范圍,則可選地清理檢查點(diǎn)文件
5.      if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.
        cleanCheckpoints", false)) {
6.        rdd.context.cleaner.foreach { cleaner =>
7.          cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
8.        }
9.      }
10.
11.     logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is
        RDD ${newRDD.id}")
12.     newRDD
13.   }
14.
15. }

writeRDDToCheckpointDirectory將RDD的數(shù)據(jù)寫入到checkpoint的文件中,返回一個ReliableCheckpointRDD。

 首先找到sparkContext,賦值給sc變量。

 基于checkpointDir創(chuàng)建checkpointDirPath。

 fs獲取文件系統(tǒng)的內(nèi)容。

 然后是廣播sc.broadcast,將路徑信息廣播給所有的Executor。

 接下來是sc.runJob,觸發(fā)runJob執(zhí)行,把當(dāng)前的RDD中的數(shù)據(jù)寫到checkpoint的目錄中。

 最后返回ReliableCheckpointRDD。無論是對哪個RDD進(jìn)行checkpoint,最終都會產(chǎn)生ReliableCheckpointRDD,以checkpointDirPath.toString中的數(shù)據(jù)為數(shù)據(jù)來源;以originalRDD.partitioner的分區(qū)器partitioner作為partitioner;這里的originalRDD就是要進(jìn)行checkpoint的RDD。

writeRDDToCheckpointDirectory的源碼如下。

1.   def writeRDDToCheckpointDirectory[T: ClassTag](
2.       originalRDD: RDD[T],
3.       checkpointDir: String,
4.       blockSize: Int = -1): ReliableCheckpointRDD[T] = {
5.
6.     val sc = originalRDD.sparkContext
7.
8.     //為檢查點(diǎn)創(chuàng)建輸出路徑
9.     val checkpointDirPath = new Path(checkpointDir)
10.    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
11.    if (!fs.mkdirs(checkpointDirPath)) {
12.      throw new SparkException(s"Failed to create checkpoint path
         $checkpointDirPath")
13.    }
14.
15.    //保存文件,并重新加載它作為一個RDD
16.    val broadcastedConf = sc.broadcast(
17.      new SerializableConfiguration(sc.hadoopConfiguration))
18.    //待辦事項(xiàng):這是代價昂貴的,因?yàn)樗忠淮斡?jì)算RDD是不必要的(SPARK-8582)
       sc.runJob(originalRDD,
19.      writePartitionToCheckpointFile[T](checkpointDirPath.toString,
         broadcastedConf) _)
20.
21.    if (originalRDD.partitioner.nonEmpty) {
22.      writePartitionerToCheckpointDir(sc,          originalRDD.partitioner.get,
         checkpointDirPath)
23.    }
24.
25.    val newRDD = new ReliableCheckpointRDD[T](
26.      sc, checkpointDirPath.toString, originalRDD.partitioner)
27.    if (newRDD.partitions.length != originalRDD.partitions.length) {
28.      throw new SparkException(
29.        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has
           different " +
30.          s"number of partitions from original RDD $originalRDD
             (${originalRDD.partitions.length})")
31.    }
32.    newRDD
33.  }

ReliableCheckpointRDD是讀取以前寫入可靠存儲系統(tǒng)檢查點(diǎn)文件數(shù)據(jù)的RDD。其中的partitioner是構(gòu)建ReliableCheckpointRDD的時候傳進(jìn)來的。其中的getPartitions是構(gòu)建一個一個的分片。其中,getPreferredLocations獲取數(shù)據(jù)本地性,fs.getFileBlockLocations獲取文件的位置信息。compute方法通過ReliableCheckpointRDD.readCheckpointFile讀取數(shù)據(jù)。

ReliableCheckpointRDD.scala的源碼如下。

1.    private[spark] class ReliableCheckpointRDD[T: ClassTag](
2.      sc: SparkContext,
3.      val checkpointPath: String,
4.      _partitioner: Option[Partitioner] = None
5.    ) extends CheckpointRDD[T](sc) {
6.
7.    @transient private val hadoopConf = sc.hadoopConfiguration
8.    @transient private val cpath = new Path(checkpointPath)
9.    @transient private val fs = cpath.getFileSystem(hadoopConf)
10.   private val broadcastedConf = sc.broadcast(new SerializableConfiguration
      (hadoopConf))
11.   //如果檢查點(diǎn)目錄不存在,則快速失敗
12.   require(fs.exists(cpath),        s"Checkpoint     directory    does   not   exist:
      $checkpointPath")
13.   /**
        *返回checkpoint的路徑,RDD從中讀取數(shù)據(jù)
        */
14.
15.   override val getCheckpointFile: Option[String] = Some(checkpointPath)
16.   override val partitioner: Option[Partitioner] = {
17.     _partitioner.orElse {
18.       ReliableCheckpointRDD.readCheckpointedPartitionerFile(context,
          checkpointPath)
19.     }
20.   }
21.  /**
       *返回檢查點(diǎn)目錄中的文件所描述的分區(qū)
       *由于原來的RDD可能屬于一個之前的應(yīng)用,沒辦法知道之前的分區(qū)數(shù)。此方法假定在應(yīng)用
       *生命周期,原始集檢查點(diǎn)文件完全保存在可靠的存儲里面
       */
22.
23.   protected override def getPartitions: Array[Partition] = {
24.     //如果路徑不存在,listStatus就拋出異常
25.     val inputFiles = fs.listStatus(cpath)
26.       .map(_.getPath)
27.       .filter(_.getName.startsWith("part-"))
28.       .sortBy(_.getName.stripPrefix("part-").toInt)
29.     //如果輸入文件無效,則快速失敗
30.     inputFiles.zipWithIndex.foreach { case (path, i) =>
31.       if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
32.         throw new SparkException(s"Invalid checkpoint file: $path")
33.       }
34.     }
35.     Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
36.   }
37.  /**
       *返回與給定分區(qū)關(guān)聯(lián)的檢查點(diǎn)文件的位置
38.    */
39.   protected override def getPreferredLocations(split: Partition):
      Seq[String] = {
40.     val status = fs.getFileStatus(
41.       new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName
         (split.index)))
42.     val locations = fs.getFileBlockLocations(status, 0, status.getLen)
43.     locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
44.   }
45.
46.   /**
       *讀取與給定分區(qū)關(guān)聯(lián)的檢查點(diǎn)文件的內(nèi)容
47.    */
48.   override def compute(split: Partition, context: TaskContext): I
      terator[T] = {
49.     val file = new Path(checkpointPath, ReliableCheckpointRDD.
        checkpointFileName(split.index))
50.     ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
51.   }
52.
53. }
54. .......

下面看一下ReliableCheckpointRDD.scala中compute方法中的ReliableCheckpointRDD. readCheckpointFile。readCheckpointFile讀取指定檢查點(diǎn)文件checkpoint的內(nèi)容。readCheckpointFile方法通過deserializeStream反序列化fileInputStream文件輸入流,然后將deserializeStream變成一個Iterator。

Spark 2.1.1版本的ReliableCheckpointRDD.scala的readCheckpointFile的源碼如下。

1.   def readCheckpointFile[T](
2.        path: Path,
3.        broadcastedConf: Broadcast[SerializableConfiguration],
4.        context: TaskContext): Iterator[T] = {
5.      val env = SparkEnv.get
6.      val fs = path.getFileSystem(broadcastedConf.value.value)
7.      val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
8.      val fileInputStream = fs.open(path, bufferSize)
9.      val serializer = env.serializer.newInstance()
10.     val deserializeStream = serializer.deserializeStream(fileInputStream)
11.
12.     //注冊一個任務(wù)完成回調(diào)以,關(guān)閉輸入流
13.     context.addTaskCompletionListener(context => deserializeStream.close())
14.
15.     deserializeStream.asIterator.asInstanceOf[Iterator[T]]
16.   }
17.
18. }

Spark 2.2.0版本的ReliableCheckpointRDD.scala的readCheckpointFile的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行整體替換,新增fileInputStream變量中對CHECKPOINT_COMPRESS壓縮配置的判斷。如果CHECKPOINT壓縮配置為true,則對fileStream文件流進(jìn)行壓縮。

1.   ......
2.     val fileInputStream = {
3.        val fileStream = fs.open(path, bufferSize)
4.        if (env.conf.get(CHECKPOINT_COMPRESS)) {
5.          CompressionCodec.createCodec(env.conf).compressedInputStream
           (fileStream)
6.        } else {
7.          fileStream
8.        }
9.      }
10. ......

ReliableRDDCheckpointData.scala的cleanCheckpoint方法,清理RDD數(shù)據(jù)相關(guān)的checkpoint文件。

1.   def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
2.    checkpointPath(sc, rddId).foreach { path =>
3.      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
4.    }
5.  }

在生產(chǎn)環(huán)境中不使用LocalCheckpointRDD。LocalCheckpointRDD的getPartitions直接從toArray級別中調(diào)用new()函數(shù)創(chuàng)建CheckpointRDDPartition。LocalCheckpointRDD的compute方法直接報(bào)異常。

LocalCheckpointRDD的源碼如下。

1.    private[spark] class LocalCheckpointRDD[T: ClassTag](
2.      sc: SparkContext,
3.      rddId: Int,
4.      numPartitions: Int)
5.    extends CheckpointRDD[T](sc) {
6.  ......
7.  protected override def getPartitions: Array[Partition] = {
8.      (0 until numPartitions).toArray.map { i => new CheckpointRDDPartition(i) }
9.    }
10. .......
11.   override def compute(partition: Partition, context: TaskContext):
      Iterator[T] = {
12.     throw new SparkException(
13.       s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found!
          Either the executor " +
14.       s"that originally checkpointed this partition is no longer alive,
          or the original RDD is " +
15.       s"unpersisted. If this problem persists, you may consider using
          'rdd.checkpoint()' " +
16.       s"instead, which is slower than local checkpointing but more fault-
         tolerant.")
17.   }
18.
19. }

checkpoint運(yùn)行流程圖如圖9-2所示。

圖9-2 Checkpoint運(yùn)行流程圖

通過SparkContext設(shè)置Checkpoint數(shù)據(jù)保存的目錄,RDD調(diào)用checkpoint方法,生產(chǎn)RDDCheckpointData,當(dāng)RDD上運(yùn)行一個Job后,就會立即觸發(fā)RDDCheckpointData中的checkpoint方法,在其內(nèi)部會調(diào)用doCheckpoint;然后調(diào)用ReliableRDDCheckpointData的doCheckpoint;ReliableCheckpointRDD的writeRDDToCheckpointDirectory的調(diào)用;在writeRDDToCheckpointDirectory方法內(nèi)部會觸發(fā)runJob,來執(zhí)行把當(dāng)前的RDD中的數(shù)據(jù)寫到Checkpoint的目錄中,同時會產(chǎn)生ReliableCheckpointRDD實(shí)例。

checkpoint保存在HDFS中,具有多個副本;persist保存在內(nèi)存中或者磁盤中。在Job作業(yè)調(diào)度的時候,checkpoint沿著finalRDD的“血統(tǒng)”關(guān)系lineage從后往前回溯向上查找,查找哪些RDD曾標(biāo)記為要進(jìn)行checkpoint,標(biāo)記為checkpointInProgress;一旦進(jìn)行checkpoint,RDD所有父RDD就被清空。

主站蜘蛛池模板: 卫辉市| 江津市| 大余县| 莲花县| 门源| 乌兰察布市| 米易县| 安宁市| 博湖县| 酉阳| 苍梧县| 鄂州市| 高邮市| 芷江| 大理市| 武城县| 西峡县| 齐齐哈尔市| 古浪县| 南阳市| 定兴县| 龙山县| 潞城市| 康定县| 南宫市| 远安县| 墨玉县| 汾西县| 嘉兴市| 石首市| 鄯善县| 云梦县| 嵩明县| 吴忠市| 陵水| 怀仁县| 东乡县| 东山县| 丽水市| 深圳市| 垦利县|