- 01 引言
- 02 Flink中的有状态计算
- 03 有状态和无状态计算
- 3.1 无状态计算
- 3.1.1 无状态计算特点
- 3.1.2 无状态计算例子(消费延迟计算)
- 3.2 有状态计算
- 3.2.1 有状态计算特点
- 3.2.2 有状态计算例子(访问量统计)
- 3.2.3 有状态计算的场景
- 04 状态的分类
- 4.1 Managed State & Raw State
- 4.2 Keyed State & Operator State
- 4.2.1 Keyed State
- 4.2.2 Operator State
- 05 State存储结构
- 5.1 State API
- 06 State 案例
- 6.1 Keyed State案例
- 6.2 Operator State案例
- 07 文末
在前面的博客,我们已经对Flink
批流一体API
的使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
在前面的教程,我们已经学习了Flink
的四大基石里面的Time
了,如下图,本文讲解下State 状态
:
Flink中已经对需要进行有状态计算的API
做了封装,底层已经维护好了状态!
不需要像SparkStreaming
那样还得自己写updateStateByKey
,也就是说我们今天学习的State
只需要掌握原理,实际开发中一般都是使用Flink
底层维护好的状态或第三方维护好的状态(如Flink
整合Kafka
的offset
维护底层就是使用的State
,但是人家已经写好了的)
/**
* 有状态计算
*
* @author : YangLinWei
* @createTime: 2022/3/7 11:31 下午
*/
public class SourceDemo031 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2.source
DataStream linesDS = env.socketTextStream("node1", 9999);
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
DataStream wordsDS = linesDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//3.2对集合中的每个单词记为1
DataStream wordAndOnesDS = wordsDS.map(new MapFunction() {
@Override
public Tuple2 map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//3.3对数据按照单词(key)进行分组
//KeyedStream groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
DataStream result = groupedDS.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();
}
}
执行 netcat,然后在终端输入 hello world,执行程序会输出什么?
- 答案很明显,
(hello, 1)
和(word,1)
那么问题来了,如果再次在终端输入 hello world,程序会输入什么?
- 答案其实也很明显,
(hello, 2)
和(world, 2)
。
因为 Flink
知道之前已经处理过一次 hello world
,这就是 state
发挥作用了,这里是被称为 keyed state
存储了之前需要统计的数据,所以Flink
知道hello
和world
分别出现过一次。
无状态计算的特点:
- 不需要考虑历史数据
- 相同的输入得到相同的输出就是无状态计算,如
map/flatMap/filter
…
上图是一个无状态计算的例子,消费延迟计算:假设现在有一个消息队列,消息队列中有一个生产者持续往消费队列写入消息,多个消费者分别从消息队列中读取消息。
从图上可以看出,生产者已经写入 16 条消息,Offset
停留在 15 ;有 3 个消费者,有的消费快,而有的消费慢。消费快的已经消费了 13 条数据,消费者慢的才消费了 7、8 条数据。
如何实时统计每个消费者落后多少条数据,如图给出了输入输出的示例。可以了解到输入的时间点有一个时间戳,生产者将消息写到了某个时间点的位置,每个消费者同一时间点分别读到了什么位置。刚才也提到了生产者写入了 15 条,消费者分别读取了 10、7、12 条。那么问题来了,怎么将生产者、消费者的进度转换为右侧示意图信息呢?
consumer 0
落后了 5 条,consumer 1
落后了 8 条,consumer 2
落后了 3 条,根据 Flink
的原理,此处需进行Map
操作。Map
首先把消息读取进来,然后分别相减,即可知道每个 consumer
分别落后了几条。Map
一直往下发,则会得出最终结果。
大家会发现,在这种模式的计算中,无论这条输入进来多少次,输出的结果都是一样的,因为单条输入中已经包含了所需的所有信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中可以得到,消费者的数据也可以在单条数据中得到,所以相同输入可以得到相同输出,这就是一个无状态的计算。
3.2 有状态计算 3.2.1 有状态计算特点特点:
- 需要考虑历史数据
- 相同的输入得到不同的输出/不一定得到相同的输出,就是有状态计算,如:
sum/reduce
以访问日志统计量的例子进行说明,比如当前拿到一个 Nginx
访问日志,一条日志表示一个请求,记录该请求从哪里来,访问的哪个地址,需要实时统计每个地址总共被访问了多少次,也即每个 API 被调用了多少次。
可以看到下面简化的输入和输出,输入第一条是在某个时间点请求 GET 了 /api/a
;第二条日志记录了某个时间点 Post /api/b
;第三条是在某个时间点 GET
了一个 /api/a
,总共有 3 个 Nginx
日志。
从这 3 条 Nginx
日志可以看出,第一条进来输出 /api/a
被访问了一次,第二条进来输出 /api/b
被访问了一次,紧接着又进来一条访问api/a
,所以 api/a
被访问了 2 次。
不同的是,两条/api/a
的Nginx
日志进来的数据是一样的,但输出的时候结果可能不同,第一次输出 count=1
,第二次输出count=2
,说明相同输入可能得到不同输出。
输出的结果取决于当前请求的 API
地址之前累计被访问过多少次。第一条过来累计是 0 次,count
= 1,第二条过来API
的访问已经有一次了,所以/api/a
访问累计次数 count
=2。单条数据其实仅包含当前这次访问的信息,而不包含所有的信息。
要得到这个结果,还需要依赖 API 累计访问的量,即状态。
这个计算模式是将数据输入算子中,用来进行各种复杂的计算并输出数据。这个过程中算子会去访问之前存储在里面的状态。另外一方面,它还会把现在的数据对状态的影响实时更新,如果输入 200 条数据,最后输出就是 200 条结果。
3.2.3 有状态计算的场景有状态计算用到了以下4个场景:
- 去重:比如上游的系统数据可能会有重复,落到下游系统时希望把重复的数据都去掉。去重需要先了解哪些数据来过,哪些数据还没有来,也就是把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在。
- 窗口计算:比如统计每分钟
Nginx
日志API
被访问了多少次。窗口是一分钟计算一次,在窗口触发前,如08:00 ~ 08:01
这个窗口,前59
秒的数据来了需要先放入内存,即需要把这个窗口之内的数据先保留下来,等到8:01
时一分钟后,再将整个窗口内触发的数据输出。未触发的窗口数据也是一种状态。 - 机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可能每次都用有一个数据集,需要在数据集上进行学习,对模型进行一个反馈。
- 访问历史数据:比如与昨天的数据进行对比,需要访问一些历史数据。如果每次从外部去读,对资源的消耗可能比较大,所以也希望把这些历史数据也放入状态中做对比。
从Flink
是否接管角度可以分为:
- ManagedState(托管状态)
- RawState(原始状态)
Flink Runtime
管理(自动存储,自动恢复、内存管理上的优化)用户自己管理(需要自己序列化 )状态数据结构已知的数据结构(value,list,map
…)字节数组(byte[]
)推荐使用场景大多数情况下均可使用自定义Operator
时可使用
两者的区别如下:
- 从状态管理方式的方式来说,
Managed State
由Flink Runtime
管理,自动存储,自动恢复,在内存管理上有优化;而Raw State
需要用户自己管理,需要自己序列化,Flink
不知道State
中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。 - 从状态数据结构来说,
Managed State
支持已知的数据结构,如Value
、List
、Map
等。而Raw State
只支持字节数组 ,所有状态都要转换为二进制字节数组才可以。 - 从推荐使用场景来说,
Managed State
大多数情况下均可使用,而Raw State
是当Managed State
不够用时,比如需要自定义Operator
时,才会使用Raw State
。
在实际生产中,都只推荐使用ManagedState
。
Managed State 分为两种:Keyed State
和Operator State
(Raw State
都是Operator State
)
KeyedStream
上的算子中可以用于所有算子(常用于source
,例如FlinkKafakaConsumer
)对应State
每个key
对应一个State
(一个Operator
实例处理多个key
,访问相应的多个State
)一个Operator
实例对应一个State
并发改变并发改变,State随着Key在实例间迁移并发改变时有多重重新分配方式可选(均匀分配、合并后每个得到全量)访问方式通过RuntimeContext
访问(Rich Function
)实现CheckpointedFunction
或ListCheckpointed
接口支持结构支持的数据结构(ValueState
、ListState
、ReducingState
、AggregatingState
、MapState
)支持的数据结构(ListState
)
4.2.1 Keyed State
在
Flink Stream
模型中,Datastream
经过 keyBy
的操作可以变为KeyedStream
。
Keyed State
是基于KeyedStream
上的状态。这个状态是跟特定的key
绑定的,对KeyedStream
流上的每一个key
,都对应一个state
,如stream.keyBy(…)
KeyBy
之后的State
,可以理解为分区过的State
,每个并行keyed Operator
的每个实例的每个key
都有一个Keyed State
,即就是一个唯一的状态,由于每个
key
属于一个keyed Operator
的并行实例,因此我们将其简单的理解为。
这里的
fromElements
会调用FromElementsFunction
的类,其中就使用了类型为list state
的operator state
Operator State
又称为 non-keyed state
,与Key
无关的State
,每一个 operator state
都仅与一个operator
的实例绑定。
Operator State
可以用于所有算子,但一般常用于 Source
。
前面说过有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来,Flink
为了方便不同分类的State
的存储和管理,提供了如下的API
/数据结构来存储State
!
Keyed State
通过 RuntimeContext
访问,这需要 Operator
是一个RichFunction
。
保存Keyed state
的数据结构:
ValueState
:即类型为T
的单值状态。这个状态与对应的key
绑定,是最简单的状态了。它可以通过update
方法更新状态值,通过value()
方法获取状态值,如求按用户id
统计用户交易总额ListState
:即key
上的状态值为一个列表。可以通过add
方法往列表中附加值;也可以通过get()
方法返回一个Iterable
来遍历状态值,如统计按用户id
统计用户经常登录的Ip
ReducingState
:这种状态通过用户传入的reduceFunction
,每次调用add
方法添加值的时候,会调用reduceFunction
,最后合并到一个单一的状态值MapState
:即状态值为一个map
。用户通过put
或putAll
方法添加元素 需要注意的是,以上所述的State
对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄Operator State
:需要自己实现CheckpointedFunction
或ListCheckpointed
接口。保存Operator state
的数据结构:(ListState
,BroadcastState
)
举例来说,Flink
中的FlinkKafkaConsumer
,就使用了operator state
,它会在每个connector
实例中,保存该实例中消费topic
的所有(partition, offset
)映射。
参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state
下图就 word count
的 sum
所使用的StreamGroupedReduce
类为例讲解了如何在代码中使用 keyed state
: 需求:使用
KeyState
中的ValueState
获取数据中的最大值(实际中直接使用maxBy
即可)
编码步骤:
//-1.定义一个状态用来存放最大值
private transient ValueState maxValueState;
//-2.创建一个状态描述符对象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
//-4.使用State
Long historyValue = maxValueState.value();
//判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue)
//-5.更新状态
maxValueState.update(currentValue);
示例代码:
/**
* 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
*
* @author : YangLinWei
* @createTime: 2022/3/8 12:13 上午
*/
public class KeyedState {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//方便观察
//2.Source
DataStreamSource tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L)
);
//3.Transformation
//使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
//实现方式1:直接使用maxBy--开发中使用该方式即可
//min只会求出最小的那个字段,其他的字段不管
//minBy会求出最小的那个字段和对应的其他的字段
//max只会求出最大的那个字段,其他的字段不管
//maxBy会求出最大的那个字段和对应的其他的字段
SingleOutputStreamOperator result = tupleDS.keyBy(t -> t.f0)
.maxBy(1);
//实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
SingleOutputStreamOperator result2 = tupleDS.keyBy(t -> t.f0)
.map(new RichMapFunction() {
//-1.定义状态用来存储最大值
private ValueState maxValueState = null;
@Override
public void open(Configuration parameters) throws Exception {
//-2.定义状态描述符:描述状态的名称和里面的数据类型
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符初始化状态
maxValueState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple3 map(Tuple2 value) throws Exception {
//-4.使用State,取出State中的最大值/历史最大值
Long historyMaxValue = maxValueState.value();
Long currentValue = value.f1;
if (historyMaxValue == null || currentValue > historyMaxValue) {
//5-更新状态,把当前的作为新的最大值存到状态中
maxValueState.update(currentValue);
return Tuple3.of(value.f0, currentValue, currentValue);
} else {
return Tuple3.of(value.f0, currentValue, historyMaxValue);
}
}
});
//4.Sink
//result.print();
result2.print();
//5.execute
env.execute();
}
}
运行结果:
参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
下图对 word count
示例中的FromElementsFunction
类进行详解并分享如何在代码中使用operator state
: 需求:使用
ListState
存储offset
模拟Kafka
的offset
维护
编码步骤:
//-1.声明一个OperatorState来记录offset
private ListState offsetState = null;
private Long offset = 0L;
//-2.创建状态描述器
ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
//-3.根据状态描述器获取State
offsetState = context.getOperatorStateStore().getListState(descriptor);
//-4.获取State中的值
Iterator iterator = offsetState.get().iterator();
if (iterator.hasNext()) {//迭代器中有值
offset = iterator.next();//取出的值就是offset
}
offset += 1L;
ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);
if (offset % 5 == 0) {//每隔5条消息,模拟一个异常
//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中
//-6.将offset存入State中
offsetState.add(offset);
示例代码:
/**
* OperatorState
*
* @author : YangLinWei
* @createTime: 2022/3/8 12:17 上午
*/
public class OperatorState {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
//2.Source
DataStreamSource sourceData = env.addSource(new MyKafkaSource());
//3.Transformation
//4.Sink
sourceData.print();
//5.execute
env.execute();
}
/**
* MyKafkaSource就是模拟的FlinkKafkaConsumer并维护offset
*/
public static class MyKafkaSource extends RichParallelSourceFunction implements CheckpointedFunction {
//-1.声明一个OperatorState来记录offset
private ListState offsetState = null;
private Long offset = 0L;
private boolean flag = true;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
//-2.创建状态描述器
ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
//-3.根据状态描述器初始化状态
offsetState = context.getOperatorStateStore().getListState(descriptor);
}
@Override
public void run(SourceContext ctx) throws Exception {
//-4.获取并使用State中的值
Iterator iterator = offsetState.get().iterator();
if (iterator.hasNext()) {
offset = iterator.next();
}
while (flag) {
offset += 1;
int id = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect("分区:" + id + "消费到的offset位置为:" + offset);//1 2 3 4 5 6
//Thread.sleep(1000);
TimeUnit.SECONDS.sleep(2);
if (offset % 5 == 0) {
System.out.println("程序遇到异常了.....");
throw new Exception("程序遇到异常了.....");
}
}
}
@Override
public void cancel() {
flag = false;
}
/**
* 下面的snapshotState方法会按照固定的时间间隔将State信息存储到Checkpoint/磁盘中,也就是在磁盘做快照!
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中
//-6.将offset存入State中
offsetState.add(offset);
}
}
}
运行结果:
本文主要讲解了Flink
高级API
里面的状态管理,谢谢大家的阅读,本文完!