Spark: Checkpoint(Spark-Core)记录

之前在学习flink的过程中记录过flink的checkpoint机制,今天主要来记录一下spark的checkpoint机制,结合源码来分析一下整体的流程。

checkpoint

checkpoint作为一种容错机制在很多场景下会使用到。例如之前记录的flink中,flink使用checkpoint机制来保证对计算状态的容错,当发生异常任务失败时,能够使任务根据上一次的checkpoint结果进行恢复。同时checkpoint也参与了flink内部仅一次语义的实现。

从概念层面来说:checkpoint就是将系统内存当前的状态作为数据存储到一个可靠的文件系统中。

从实现层面来讲每一个系统还是差异较大的。flink基于abs算法实现了轻量级的checkpoint,这其中包括异步快照、barrier等概念,可以说在flink中在保证了checkpoint准确的同时,又减小了对系统性能的影响。

在spark中较比flink 在checkpoint方面确实有一些不足,例如在checkpoint过程中同步,checkpoint过程中如果没有预先cache那么会重新计算一遍当前job等问题。

spark checkpoint使用

调用checkpoint方式比较简单,在想进行checkpoint的RDD后直接调用即可。

1
2
3
4
5
6
7
8
9
10
11
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("CheckPoint test").setMaster("local[*]");
JavaSparkContext context = new JavaSparkContext(sparkConf);
//进行checkpoint需要先设置checkpoint路径,也就是你要存的那个文件系统如 HDFS
context.setCheckpointDir("target/checkpoint/");
//加载数据,设定五个分区
JavaRDD<String> lineRDD = context.textFile("src/main/resources/sparkresource/access.log", 5);
//开始进行checkpoint
lineRDD.checkpoint();
lineRDD.saveAsTextFile("target/result/");
context.stop();

路径target/checkpoint/下目录结构,因为在source数据时设定五个partition,所以生成checkpoint文件时也是五个。

1
2
3
4
5
6
7
8
9
10
11
12
f53129a7-c29a-4250-a477-c02abd042383
rdd-3
.part-00000.crc
.part-00001.crc
.part-00002.crc
.part-00003.crc
.part-00004.crc
part-00000
part-00001
part-00002
part-00003
part-00004

checkpoint流程

接下来根据下边画出的图,并结合上边得样例代码,一步一步分析spark得checkpoint流程。

4PQauJ63qLjA58V

  • 在checkpoint开始之初,需要我们在spark代码中显示调用checkpoint()函数,需要注意一点,该函数要在job的action操作之前,也就是说如果你在saveAsTextFile()后去调用,那么将不会产生checkpoint。而后在job执行完成后,将会在代码中调用doCheckpoint(),这个会向前递归调用所有的依赖的RDD, 看看需不需要 checkpoint,将初始化RDD的 checkpointData 转变为 ReliableRDDCheckpointData,状态变为Initialized.
1
2
3
4
5
6
7
8
9
10
11
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
//对给定的RDD运行一个操作作业,并在结果到达时将所有结果传递给resultHandler函数。
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
//在所有的job执行完成后在进行checkpoint
rdd.doCheckpoint()
}
  • 然后调用 checkpointData.get.checkpoint(), 为 CheckpointingInProgress, 里面调用具体实现类的 ReliableRDDCheckpointData 的 doCheckpoint 方法。
1
2
3
4
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
......
}
  • ReliableRDDCheckpointData.doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 如果已经cache 了,就可以直接使用缓存中的 RDD 了, 就不需要重头计算一遍了, 这时候直接把RDD, 输出到 hdfs, 每个分区一个文件, 会先写到一个临时文件, 如果全部输出完,进行 rename , 如果输出失败,就回滚delete。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def writeRDDToCheckpointDirectory[T: ClassTag](): ReliableCheckpointRDD[T] = {
......
// 创建检查点的输出路径
val checkpointDirPath = new Path(checkpointDir)
// 重新执行任务计算rdd,如果进行了cache那么将会减少所需时间。
sc.runJob(originalRDD,
// 调用该方法将数据写出到文件
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
//将Partitioner写入给定的RDD检查点目录, 这也就是为什么我们的检查点目录下会有两套文件。
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
......
}
  • 标记状态为 Checkpointed,调用markCheckpointed()方法中清除所有的依赖,怎么清除依赖的呢?就是把RDD变量的强引用,设置为 null,垃圾回收了,会触发 ContextCleaner 里面监听清除实际 BlockManager 缓存中的数据。
1
2
3
4
5
private[spark] def markCheckpointed(): Unit = {
clearDependencies() //内部将所有的dependencies置为null
partitions_ = null
deps = null
}

至此spark checkpoint过程的写操作流程基本分析完成。

那么如何根据生成的checkpoint文件进行spark的数据恢复呢?

在做完checkpoint后,获取原来RDD的依赖以及partitions数据都将从CheckpointRDD中获取。也就是说获取原来rdd中每个partition数据以及partitioner等对象,都将转移到CheckPointRDD中。

在CheckPointRDD的一个具体实现ReliableRDDCheckpintRDD中的compute方法中可以看到,将会从hdfs的checkpoint目录中恢复之前写入的partition数据。而partitioner对象(如果有)也会从之前写入hdfs的paritioner对象恢复。

总结

今天学习并记录了Spark Core的checkpoint流程,大体来说就是一个将内存数据存储进文件系统的过程,而spark在这一过程中针对rdd,partition等都有一些特殊的处理,使得其checkpoint流程能够安全流畅的进行下去,保证在发生异常时,能够根据checkpoint数据进行恢复。而在Spark Streaming中与Core中的checkpoint又有不同,后边有机会再详细记录。

over~