- Spark大數(shù)據(jù)商業(yè)實(shí)戰(zhàn)三部曲:內(nèi)核解密|商業(yè)案例|性能調(diào)優(yōu)
- 王家林
- 3849字
- 2019-12-12 17:30:04
9.2 Spark中checkpoint原理和源碼詳解
本節(jié)對Spark中checkpoint原理及Spark中checkpoint源碼進(jìn)行詳解。
9.2.1 Spark中checkpoint原理詳解
checkpoint到底是什么?
(1)Spark在生產(chǎn)環(huán)境下經(jīng)常會面臨Tranformations的RDD非常多(例如,一個Job中包含10 000個RDD)或者具體Tranformation產(chǎn)生的RDD本身計(jì)算特別復(fù)雜和耗時(例如,計(jì)算時常超過1h),此時我們必須考慮對計(jì)算結(jié)果數(shù)據(jù)的持久化。
(2)Spark擅長多步驟迭代,同時擅長基于Job的復(fù)用,這時如果能夠?qū)υ?jīng)計(jì)算的過程產(chǎn)生的數(shù)據(jù)進(jìn)行復(fù)用,就可以極大地提升效率。
(3)如果采用persist把數(shù)據(jù)放在內(nèi)存中,雖然是最快速的,但是也是最不可靠的。如果放在磁盤上,也不是完全可靠的。例如,磁盤會損壞,管理員可能清空磁盤等。
(4)checkpoint的產(chǎn)生就是為了相對更加可靠地持久化數(shù)據(jù),checkpoint可以指定把數(shù)據(jù)放在本地并且是多副本的方式,但是在正常的生產(chǎn)情況下是放在HDFS,這就自然地借助HDFS高容錯、高可靠的特征完成了最大化的、可靠的持久化數(shù)據(jù)的方式。
(5)為確保RDD復(fù)用計(jì)算的可靠性,checkpoint把數(shù)據(jù)持久化到HDFS中,保證數(shù)據(jù)最大程度的安全性。
(6)checkpoint就是針對整個RDD計(jì)算鏈條中特別需要數(shù)據(jù)持久化的環(huán)節(jié)(后面會反復(fù)使用當(dāng)前環(huán)節(jié)的RDD)開始基于HDFS等的數(shù)據(jù)持久化復(fù)用策略,通過對RDD啟動checkpoint機(jī)制來實(shí)現(xiàn)容錯和高可用。
9.2.2 Spark中checkpoint源碼詳解
1.checkpoint的運(yùn)行原理和源碼實(shí)現(xiàn)徹底詳解
RDD進(jìn)行計(jì)算前須先看一下是否有checkpoint,如果有checkpoint,就不需要再進(jìn)行計(jì)算了。
RDD.scala的iterator方法的源碼如下。
1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 2. if (storageLevel != StorageLevel.NONE) { 3. getOrCompute(split, context) 4. } else { 5. computeOrReadCheckpoint(split, context) 6. } 7. }
進(jìn)入RDD.scala的getOrCompute方法,源碼如下。
1. private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { 2. val blockId = RDDBlockId(id, partition.index) 3. var readCachedBlock = true 4. //這種方法被Executors調(diào)用,所以我們需要調(diào)用SparkEnv.get代替sc.env 5. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 6. readCachedBlock = false 7. computeOrReadCheckpoint(partition, context) 8. }) match {
getOrCompute方法的getOrElseUpdate方法傳入的第四個參數(shù)是匿名函數(shù),調(diào)用computeOrReadCheckpoint(partition, context)檢查checkpoint中是否有數(shù)據(jù)。
RDD.scala的computeOrReadCheckpoint的源碼如下。
1. private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = 2. { 3. if (isCheckpointedAndMaterialized) { 4. firstParent[T].iterator(split, context) 5. } else { 6. compute(split, context) 7. } 8. }
computeOrReadCheckpoint方法中的isCheckpointedAndMaterialized是一個布爾值,判斷這個RDD是否checkpointed和被物化,Spark 2.0 checkpoint中有兩種方式:reliably或者locally。computeOrReadCheckpoint作為isCheckpointed語義的別名返回值。
isCheckpointedAndMaterialized方法的源碼如下。
1. private[spark] def isCheckpointedAndMaterialized: Boolean = 2. checkpointData.exists(_.isCheckpointed)
回到RDD.scala的computeOrReadCheckpoint,如果已經(jīng)持久化及物化isCheckpointed-AndMaterialized,就調(diào)用firstParent[T]的iterator。如果沒有持久化,則進(jìn)行compute。
2.checkpoint原理機(jī)制
(1)通過調(diào)用SparkContext.setCheckpointDir方法指定進(jìn)行checkpoint操作的RDD把數(shù)據(jù)放在哪里,在生產(chǎn)集群中是放在HDFS上的,同時為了提高效率,在進(jìn)行checkpoint的使用時,可以指定很多目錄。
SparkContext為即將計(jì)算的RDD設(shè)置checkpoint保存的目錄。如果在集群中運(yùn)行,必須是HDFS的目錄路徑。
SparkContext.scala的setCheckpointDir的源碼如下。
1. def setCheckpointDir(directory: String) { 2. 3. /** *如果在集群上運(yùn)行,如目錄是本地的,則記錄一個警告。否則,driver可能會試圖從它自己 *的本地文件系統(tǒng)重建RDD的checkpoint檢測點(diǎn),因?yàn)閏heckpoint檢查點(diǎn)文件不正確。 *實(shí)際上是在Executor機(jī)器上 */ 4. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { 5. logWarning("Spark is not running in local mode, therefore the checkpoint directory " + 6. s"must not be on the local filesystem. Directory '$directory' " + 7. "appears to be on the local filesystem.") 8. } 9. 10. checkpointDir = Option(directory).map { dir => 11. val path = new Path(dir, UUID.randomUUID().toString) 12. val fs = path.getFileSystem(hadoopConfiguration) 13. fs.mkdirs(path) 14. fs.getFileStatus(path).getPath.toString 15. } 16. }
RDD.scala的checkpoint方法標(biāo)記RDD的檢查點(diǎn)checkpoint。它將保存到SparkContext# setCheckpointDir的目錄檢查點(diǎn)內(nèi)的文件中,所有引用它的父RDDs將被移除。須在任何作業(yè)之前調(diào)用此函數(shù)。建議RDD在內(nèi)存中緩存,否則保存在文件中時需要重新計(jì)算。
RDD.scala的checkpoint的源碼如下。
1. def checkpoint(): Unit = RDDCheckpointData.synchronized { 2. //注意:我們在這里使用全局鎖,原因是下游的復(fù)雜性:子RDD分區(qū)指向正確的父分區(qū)。未 //來我們應(yīng)該重新考慮這個問題 3. if (context.checkpointDir.isEmpty) { 4. throw new SparkException("Checkpoint directory has not been set in the SparkContext") 5. } else if (checkpointData.isEmpty) { 6. checkpointData = Some(new ReliableRDDCheckpointData(this)) 7. } 8. }
其中的checkpointData是RDDCheckpointData。
1. private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
RDDCheckpointData標(biāo)識某個RDD要進(jìn)行checkpoint。如果某個RDD要進(jìn)行checkpoint,那在Spark框架內(nèi)部就會生成RDDCheckpointData。
1. private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T]) 2. extends Serializable { 3. 4. import CheckpointState._ 5. 6. //相關(guān)的RDD檢查狀態(tài) 7. protected var cpState = Initialized 8. 9. //RDD包含檢查點(diǎn)數(shù)據(jù) 10. private var cpRDD: Option[CheckpointRDD[T]] = None 11. 12. //待辦事宜:確定需要在下面的方法中使用全局鎖嗎 13. 14. /** 15. *返回RDD的checkpoint數(shù)據(jù)是否已經(jīng)持久化 16. */ 17. def isCheckpointed: Boolean = RDDCheckpointData.synchronized { 18. cpState == Checkpointed 19. } 20. 21. /** 22. *物化RDD和持久化其內(nèi)容 23. *RDD的第一個行動完成以后立即觸發(fā)調(diào)用 24. */ 25. final def checkpoint(): Unit = { 26. //防止多個線程同時對相同RDDCheckpointing,這RDDCheckpointData狀態(tài)自動翻轉(zhuǎn) 27. RDDCheckpointData.synchronized { 28. if (cpState == Initialized) { 29. cpState = CheckpointingInProgress 30. } else { 31. return 32. } 33. } 34. 35. val newRDD = doCheckpoint() 36. 37. //更新我們的狀態(tài)和截?cái)郣DD的血統(tǒng) 38. RDDCheckpointData.synchronized { 39. cpRDD = Some(newRDD) 40. cpState = Checkpointed 41. rdd.markCheckpointed() 42. } 43. } 44. 45. /** 46. *物化RDD和持久化其內(nèi)容 47. * 48. *子類應(yīng)重寫此方法,以定義自定義檢查點(diǎn)行為 49. * @return the Checkpoint RDD 在進(jìn)程中創(chuàng)建 50. */ 51. protected def doCheckpoint(): CheckpointRDD[T] 52. /** *返回包含我們的檢查點(diǎn)數(shù)據(jù)。如果checkpoint的狀態(tài)是Checkpointed,才定義 */ 53. 54. def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData. synchronized { cpRDD } 55. /** *返回checkpoint RDD的分區(qū),僅用于測試 */ 56. 57. def getPartitions: Array[Partition] = RDDCheckpointData.synchronized { 58. cpRDD.map(_.partitions).getOrElse { Array.empty } 59. } 60. 61. } 62. /** *同步檢查點(diǎn)操作的全局鎖 */ 63. 64. private[spark] object RDDCheckpointData
(2)在進(jìn)行RDD的checkpoint的時候,其所依賴的所有的RDD都會從計(jì)算鏈條中清空掉。
(3)作為最佳實(shí)踐,一般在進(jìn)行checkpoint方法調(diào)用前都要進(jìn)行persist把當(dāng)前RDD的數(shù)據(jù)持久化到內(nèi)存或者磁盤上,這是因?yàn)閏heckpoint是Lazy級別,必須有Job的執(zhí)行,且在Job執(zhí)行完成后,才會從后往前回溯哪個RDD進(jìn)行了checkpoint標(biāo)記,然后對標(biāo)記過的RDD新啟動一個Job執(zhí)行具體的checkpoint過程。
(4)checkpoint改變了RDD的Lineage。
(5)當(dāng)調(diào)用checkpoint方法要對RDD進(jìn)行checkpoint操作,此時框架會自動生成RDDCheckpointData,當(dāng)RDD上運(yùn)行過一個Job后,就會立即觸發(fā)RDDCheckpointData中的checkpoint方法,在其內(nèi)部會調(diào)用doCheckpoint,實(shí)際上在生產(chǎn)時會調(diào)用ReliableRDDCheckpointData的doCheckpoint,在生產(chǎn)過程中會導(dǎo)致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的調(diào)用,而在writeRDDToCheckpointDirectory方法內(nèi)部,會觸發(fā)runJob來執(zhí)行把當(dāng)前的RDD中的數(shù)據(jù)寫到checkpoint的目錄中,同時會產(chǎn)生ReliableCheckpointRDD實(shí)例。
RDDCheckpointData.scala的checkpoint方法進(jìn)行真正的checkpoint:在RDDCheckpointData. synchronized同步塊中先判斷cpState的狀態(tài),然后調(diào)用doCheckpoint()。
RDDCheckpointData.scala的checkpoint方法的源碼如下。
1. final def checkpoint(): Unit = { 2. //防止多個線程同時對相同RDDcheckpointing,這RDDCheckpointData狀態(tài)自動翻轉(zhuǎn) 3. RDDCheckpointData.synchronized { 4. if (cpState == Initialized) { 5. cpState = CheckpointingInProgress 6. } else { 7. return 8. } 9. } 10. 11. val newRDD = doCheckpoint() 12. 13. //更新我們的狀態(tài)和截?cái)郣DD的血統(tǒng) 14. RDDCheckpointData.synchronized { 15. cpRDD = Some(newRDD) 16. cpState = Checkpointed 17. rdd.markCheckpointed() 18. } 19. }
其中的doCheckpoint方法是RDDCheckpointData.scala中的方法,這里沒有具體的實(shí)現(xiàn)。
1. protected def doCheckpoint(): CheckpointRDD[T]
RDDCheckpointData的子類包括LocalRDDCheckpointData、ReliableRDDCheckpointData。ReliableRDDCheckpointData子類中doCheckpoint方法具體的實(shí)現(xiàn),在方法中進(jìn)行writeRDDToCheckpointDirectory的調(diào)用。
ReliableRDDCheckpointData.scala的doCheckpoint的源碼如下。
1. protected override def doCheckpoint(): CheckpointRDD[T] = { 2. val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) 3. 4. //如果引用超出范圍,則可選地清理檢查點(diǎn)文件 5. if (rdd.conf.getBoolean("spark.cleaner.referenceTracking. cleanCheckpoints", false)) { 6. rdd.context.cleaner.foreach { cleaner => 7. cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) 8. } 9. } 10. 11. logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") 12. newRDD 13. } 14. 15. }
writeRDDToCheckpointDirectory將RDD的數(shù)據(jù)寫入到checkpoint的文件中,返回一個ReliableCheckpointRDD。
首先找到sparkContext,賦值給sc變量。
基于checkpointDir創(chuàng)建checkpointDirPath。
fs獲取文件系統(tǒng)的內(nèi)容。
然后是廣播sc.broadcast,將路徑信息廣播給所有的Executor。
接下來是sc.runJob,觸發(fā)runJob執(zhí)行,把當(dāng)前的RDD中的數(shù)據(jù)寫到checkpoint的目錄中。
最后返回ReliableCheckpointRDD。無論是對哪個RDD進(jìn)行checkpoint,最終都會產(chǎn)生ReliableCheckpointRDD,以checkpointDirPath.toString中的數(shù)據(jù)為數(shù)據(jù)來源;以originalRDD.partitioner的分區(qū)器partitioner作為partitioner;這里的originalRDD就是要進(jìn)行checkpoint的RDD。
writeRDDToCheckpointDirectory的源碼如下。
1. def writeRDDToCheckpointDirectory[T: ClassTag]( 2. originalRDD: RDD[T], 3. checkpointDir: String, 4. blockSize: Int = -1): ReliableCheckpointRDD[T] = { 5. 6. val sc = originalRDD.sparkContext 7. 8. //為檢查點(diǎn)創(chuàng)建輸出路徑 9. val checkpointDirPath = new Path(checkpointDir) 10. val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) 11. if (!fs.mkdirs(checkpointDirPath)) { 12. throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") 13. } 14. 15. //保存文件,并重新加載它作為一個RDD 16. val broadcastedConf = sc.broadcast( 17. new SerializableConfiguration(sc.hadoopConfiguration)) 18. //待辦事項(xiàng):這是代價昂貴的,因?yàn)樗忠淮斡?jì)算RDD是不必要的(SPARK-8582) sc.runJob(originalRDD, 19. writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) 20. 21. if (originalRDD.partitioner.nonEmpty) { 22. writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) 23. } 24. 25. val newRDD = new ReliableCheckpointRDD[T]( 26. sc, checkpointDirPath.toString, originalRDD.partitioner) 27. if (newRDD.partitions.length != originalRDD.partitions.length) { 28. throw new SparkException( 29. s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + 30. s"number of partitions from original RDD $originalRDD (${originalRDD.partitions.length})") 31. } 32. newRDD 33. }
ReliableCheckpointRDD是讀取以前寫入可靠存儲系統(tǒng)檢查點(diǎn)文件數(shù)據(jù)的RDD。其中的partitioner是構(gòu)建ReliableCheckpointRDD的時候傳進(jìn)來的。其中的getPartitions是構(gòu)建一個一個的分片。其中,getPreferredLocations獲取數(shù)據(jù)本地性,fs.getFileBlockLocations獲取文件的位置信息。compute方法通過ReliableCheckpointRDD.readCheckpointFile讀取數(shù)據(jù)。
ReliableCheckpointRDD.scala的源碼如下。
1. private[spark] class ReliableCheckpointRDD[T: ClassTag]( 2. sc: SparkContext, 3. val checkpointPath: String, 4. _partitioner: Option[Partitioner] = None 5. ) extends CheckpointRDD[T](sc) { 6. 7. @transient private val hadoopConf = sc.hadoopConfiguration 8. @transient private val cpath = new Path(checkpointPath) 9. @transient private val fs = cpath.getFileSystem(hadoopConf) 10. private val broadcastedConf = sc.broadcast(new SerializableConfiguration (hadoopConf)) 11. //如果檢查點(diǎn)目錄不存在,則快速失敗 12. require(fs.exists(cpath), s"Checkpoint directory does not exist: $checkpointPath") 13. /** *返回checkpoint的路徑,RDD從中讀取數(shù)據(jù) */ 14. 15. override val getCheckpointFile: Option[String] = Some(checkpointPath) 16. override val partitioner: Option[Partitioner] = { 17. _partitioner.orElse { 18. ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) 19. } 20. } 21. /** *返回檢查點(diǎn)目錄中的文件所描述的分區(qū) *由于原來的RDD可能屬于一個之前的應(yīng)用,沒辦法知道之前的分區(qū)數(shù)。此方法假定在應(yīng)用 *生命周期,原始集檢查點(diǎn)文件完全保存在可靠的存儲里面 */ 22. 23. protected override def getPartitions: Array[Partition] = { 24. //如果路徑不存在,listStatus就拋出異常 25. val inputFiles = fs.listStatus(cpath) 26. .map(_.getPath) 27. .filter(_.getName.startsWith("part-")) 28. .sortBy(_.getName.stripPrefix("part-").toInt) 29. //如果輸入文件無效,則快速失敗 30. inputFiles.zipWithIndex.foreach { case (path, i) => 31. if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) { 32. throw new SparkException(s"Invalid checkpoint file: $path") 33. } 34. } 35. Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i)) 36. } 37. /** *返回與給定分區(qū)關(guān)聯(lián)的檢查點(diǎn)文件的位置 38. */ 39. protected override def getPreferredLocations(split: Partition): Seq[String] = { 40. val status = fs.getFileStatus( 41. new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName (split.index))) 42. val locations = fs.getFileBlockLocations(status, 0, status.getLen) 43. locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost") 44. } 45. 46. /** *讀取與給定分區(qū)關(guān)聯(lián)的檢查點(diǎn)文件的內(nèi)容 47. */ 48. override def compute(split: Partition, context: TaskContext): I terator[T] = { 49. val file = new Path(checkpointPath, ReliableCheckpointRDD. checkpointFileName(split.index)) 50. ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context) 51. } 52. 53. } 54. .......
下面看一下ReliableCheckpointRDD.scala中compute方法中的ReliableCheckpointRDD. readCheckpointFile。readCheckpointFile讀取指定檢查點(diǎn)文件checkpoint的內(nèi)容。readCheckpointFile方法通過deserializeStream反序列化fileInputStream文件輸入流,然后將deserializeStream變成一個Iterator。
Spark 2.1.1版本的ReliableCheckpointRDD.scala的readCheckpointFile的源碼如下。
1. def readCheckpointFile[T]( 2. path: Path, 3. broadcastedConf: Broadcast[SerializableConfiguration], 4. context: TaskContext): Iterator[T] = { 5. val env = SparkEnv.get 6. val fs = path.getFileSystem(broadcastedConf.value.value) 7. val bufferSize = env.conf.getInt("spark.buffer.size", 65536) 8. val fileInputStream = fs.open(path, bufferSize) 9. val serializer = env.serializer.newInstance() 10. val deserializeStream = serializer.deserializeStream(fileInputStream) 11. 12. //注冊一個任務(wù)完成回調(diào)以,關(guān)閉輸入流 13. context.addTaskCompletionListener(context => deserializeStream.close()) 14. 15. deserializeStream.asIterator.asInstanceOf[Iterator[T]] 16. } 17. 18. }
Spark 2.2.0版本的ReliableCheckpointRDD.scala的readCheckpointFile的源碼與Spark 2.1.1版本相比具有如下特點(diǎn):上段代碼中第8行整體替換,新增fileInputStream變量中對CHECKPOINT_COMPRESS壓縮配置的判斷。如果CHECKPOINT壓縮配置為true,則對fileStream文件流進(jìn)行壓縮。
1. ...... 2. val fileInputStream = { 3. val fileStream = fs.open(path, bufferSize) 4. if (env.conf.get(CHECKPOINT_COMPRESS)) { 5. CompressionCodec.createCodec(env.conf).compressedInputStream (fileStream) 6. } else { 7. fileStream 8. } 9. } 10. ......
ReliableRDDCheckpointData.scala的cleanCheckpoint方法,清理RDD數(shù)據(jù)相關(guān)的checkpoint文件。
1. def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = { 2. checkpointPath(sc, rddId).foreach { path => 3. path.getFileSystem(sc.hadoopConfiguration).delete(path, true) 4. } 5. }
在生產(chǎn)環(huán)境中不使用LocalCheckpointRDD。LocalCheckpointRDD的getPartitions直接從toArray級別中調(diào)用new()函數(shù)創(chuàng)建CheckpointRDDPartition。LocalCheckpointRDD的compute方法直接報(bào)異常。
LocalCheckpointRDD的源碼如下。
1. private[spark] class LocalCheckpointRDD[T: ClassTag]( 2. sc: SparkContext, 3. rddId: Int, 4. numPartitions: Int) 5. extends CheckpointRDD[T](sc) { 6. ...... 7. protected override def getPartitions: Array[Partition] = { 8. (0 until numPartitions).toArray.map { i => new CheckpointRDDPartition(i) } 9. } 10. ....... 11. override def compute(partition: Partition, context: TaskContext): Iterator[T] = { 12. throw new SparkException( 13. s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " + 14. s"that originally checkpointed this partition is no longer alive, or the original RDD is " + 15. s"unpersisted. If this problem persists, you may consider using 'rdd.checkpoint()' " + 16. s"instead, which is slower than local checkpointing but more fault- tolerant.") 17. } 18. 19. }
checkpoint運(yùn)行流程圖如圖9-2所示。

圖9-2 Checkpoint運(yùn)行流程圖
通過SparkContext設(shè)置Checkpoint數(shù)據(jù)保存的目錄,RDD調(diào)用checkpoint方法,生產(chǎn)RDDCheckpointData,當(dāng)RDD上運(yùn)行一個Job后,就會立即觸發(fā)RDDCheckpointData中的checkpoint方法,在其內(nèi)部會調(diào)用doCheckpoint;然后調(diào)用ReliableRDDCheckpointData的doCheckpoint;ReliableCheckpointRDD的writeRDDToCheckpointDirectory的調(diào)用;在writeRDDToCheckpointDirectory方法內(nèi)部會觸發(fā)runJob,來執(zhí)行把當(dāng)前的RDD中的數(shù)據(jù)寫到Checkpoint的目錄中,同時會產(chǎn)生ReliableCheckpointRDD實(shí)例。
checkpoint保存在HDFS中,具有多個副本;persist保存在內(nèi)存中或者磁盤中。在Job作業(yè)調(diào)度的時候,checkpoint沿著finalRDD的“血統(tǒng)”關(guān)系lineage從后往前回溯向上查找,查找哪些RDD曾標(biāo)記為要進(jìn)行checkpoint,標(biāo)記為checkpointInProgress;一旦進(jìn)行checkpoint,RDD所有父RDD就被清空。
- 基于C語言的程序設(shè)計(jì)
- Mastering Mesos
- 樂高機(jī)器人:WeDo編程與搭建指南
- AWS:Security Best Practices on AWS
- IoT Penetration Testing Cookbook
- 現(xiàn)代傳感技術(shù)
- Hybrid Cloud for Architects
- Apache Superset Quick Start Guide
- 深度學(xué)習(xí)與目標(biāo)檢測
- INSTANT Munin Plugin Starter
- Silverlight 2完美征程
- 中文版AutoCAD 2013高手速成
- 大數(shù)據(jù)案例精析
- 大數(shù)據(jù)導(dǎo)論
- Web璀璨:Silverlight應(yīng)用技術(shù)完全指南