文章目录
- 01 引言
- 02 Checkpoint
- 2.1 Checkpoint VS State
- 2.2 Checkpoint 执行流程
- 2.2.1 简单流程
- 2.2.2 复杂流程
- 2.3 State状态后端/State存储介质
- 2.3.1 MemStateBackend
- 2.3.2 FastStateBackend
- 2.3.3 RocksDBStateBackend
- 2.4 Checkpoint配置方式
- 2.4.1 全局配置
- 2.4.2 代码配置
- 2.5 示例代码
- 03 状态恢复和重启策略
- 3.1 自动重启策略和恢复
- 3.1.1 重启策略配置方式
- 3.1.2 重启策略分类
- 3.1.2.1 默认重启策略
- 3.1.2.2 无重启策略
- 3.1.2.3 固定延迟重启策略
- 3.1.2.4 失败率重启策略
- 3.1.2.5 示例代码
- 3.2 手动重启并恢复
- 04 Savepoint
- 4.1 Savepoint介绍
- 4.2 Savepoint VS Checkpoint
- 4.3 Savepoint 案例演示
- 05 文末
01 引言
在前面的博客,我们学习了Flink的一些高级API,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
在前面的教程,我们已经学习了Flink的四大基石里面的State了,如下图,本文讲解下Checkpoint容错机制:
02 Checkpoint
2.1 Checkpoint VS State
State:
- 维护/存储的是某一个
Operator的运行的状态/历史值,是维护在内存中; - 一般指一个具体的
Operator的状态(operator的状态表示一些算子在运行的过程中会产生的一些历史结果,如前面的maxBy底层会维护当前的最大值,也就是会维护一个keyedOperator,这个State里面存放就是maxBy这个Operator中的最大值); State数据默认保存在Java的堆内存中/TaskManage节点的内存中;State可以被记录,在失败的情况下数据还可以恢复。
Checkpoint:
- 某一时刻,
Flink中所有的Operator的当前State的全局快照,一般存在磁盘上; - 表示了一个
Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Operator的状态; - 可以理解为
Checkpoint是把State数据定时持久化存储了; - 比如
KafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取。
注意:
Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性!- Chandy-Lamport algorithm算法的作者也是
ZK中Paxos一致性算法的作者
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark的StructuredStreaming也借鉴了该算法。
2.2 Checkpoint 执行流程
2.2.1 简单流程
流程描述:
- step1 :
Flink的JobManager创建CheckpointCoordinator - step2 :
Coordinator向所有的SourceOperator发送Barrier栅栏(理解为执行Checkpoint的信号) - step3 :
SourceOperator接收到Barrier之后,暂停当前的操作(暂停的时间很短,因为后续的写快照是异步的),并制作State快照, 然后将自己的快照保存到指定的介质中(如HDFS), 一切ok之后向Coordinator汇报并将Barrier发送给下游的其他Operator - step4 : 其他的如
TransformationOperator接收到Barrier,重复第2步,最后将Barrier发送给Sink - step5 :
Sink接收到Barrier之后重复第2步 - step6 :
Coordinator接收到所有的Operator的执行ok的汇报结果,认为本次快照执行成功
注意:
- 在往介质(如
HDFS)中写入快照数据的时候是异步的(为了提高效率) - 分布式快照执行时的数据一致性由
Chandy-Lamport algorithm分布式快照算法保证
2.2.2 复杂流程
下图左侧是Checkpoint Coordinator,是整个Checkpoint的发起者,中间是由两个 source,一个 sink组成的Flink作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。
step1 :Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。
step2 :source 节点向下游广播 barrier,这个barrier就是实现 Chandy-Lamport分布式快照算法的核心,下游的task只有收到所有input的 barrier 才会执行相应的Checkpoint。
step3 :当 task完成state 备份后,会将备份数据的地址(state handle)通知给Checkpoint coordinator
step4 :下游的 sink节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,(栅栏对齐),这里还展示了RocksDB incremental Checkpoint(增量Checkpoint)的流程,首先RocksDB会全量刷数据到磁盘上(红色大三角表示),然后Flink框架会从中选择没有上传的文件进行持久化备份(紫色小三角)
step5 :同样的,sink节点在完成自己的 Checkpoint之后,会将 state handle 返回通知 Coordinator
step6 :最后,当 Checkpoint coordinator 收集齐所有task的state handle,就认为这一次的Checkpoint全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
2.3 State状态后端/State存储介质
注意:前面学习了Checkpoint其实就是Flink中某一时刻,所有的Operator的全局快照,那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端,Flink中的State状态后端有很多种,如下:
| 功能 | MemStateBackend | FastStateBackend | RocksDBStateBackend | |||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 构造方法 | MemoryStateBackend(int maxStateSize,boolean asynchronousSnapshots) | FsStateBackend(URI checkpointDataUri,boolean asynchronousSnapshots) | RocksDBStateBackend(URLI checkpointDataUri,boolean enableIncrementalCheckpointing) | |||||||||||||||||||
| 存储方式 | State(TaskManager内存)、Checkpoint(JobManager内存) | State(TaskManager内存),Checkpoint(外部文件系统、本地或HDFS) | State(TaskManager上的KV数据库,实际使用内存+磁盘),Checkpoint(外部文件系统,本地或HDFS) | |||||||||||||||||||
| 容量限制 | 单个State maxStateSize(默认为5M),maxStateSize t.f0);
//3.3聚合
DataStream aggResult = groupedDS.sum(1);
DataStream result = (SingleOutputStreamOperator) aggResult.map(new RichMapFunction() {
@Override
public String map(Tuple2 value) throws Exception {
return value.f0 + ":::" + value.f1;
}
});
//4.sink
result.print();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer("flink_kafka", new SimpleStringSchema(), props);
result.addSink(kafkaSink);
//5.execute
env.execute();
// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
}
03 状态恢复和重启策略3.1 自动重启策略和恢复3.1.1 重启策略配置方式如果在配置文件中,可以在
还可以在代码中针对该任务进行配置,示例如下:
3.1.2 重启策略分类3.1.2.1 默认重启策略如果配置了 3.1.2.2 无重启策略Job直接失败,不会尝试进行重启 配置文件的方式:
代码配置的方式:
3.1.2.3 固定延迟重启策略重启策略可以配置
也可以在程序中设置:
3.1.2.4 失败率重启策略失败率重启策略可以在
失败率重启策略也可以在程序中设置:
3.1.2.5 示例代码
3.2 手动重启并恢复1.把程序打包
3.访问webUI
4.使用FlinkWebUI提交
04 Savepoint4.1 Savepoint介绍保存点(Savepoint):类似于以前玩游戏的时候,遇到难关了/遇到boss了,赶紧手动存个档,然后接着玩,如果失败了,赶紧从上次的存档中恢复,然后接着玩。 在实际开发中,可能会遇到这样的情况:如要对集群进行停机维护/扩容… 那么这时候需要执行一次 当维护/扩容完毕之后,可以从上一次 4.2 Savepoint VS Checkpoint
4.3 Savepoint 案例演示
05 文末本文主要讲解了 关注
打赏
立即登录/注册
微信扫码登录 |
