文章目录
原文地址
事件驱动应用
处理函数(Process Functions)
简介
- 事件驱动应用
- 处理函数(Process Functions)
- 简介
- 示例
- `open()` 方法
- `processElement()` 方法
- `onTimer()` 方法
- 性能考虑
- 旁路输出(Side Outputs)
- 简介
- 示例
- 结语
ProcessFunction
将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction
十分相似, 但是增加了 Timer。
如果你已经体验了 流式分析训练 的动手实践, 你应该记得,它是采用 TumblingEventTimeWindow
来计算每个小时内每个司机的小费总和, 像下面的示例这样:
// 计算每个司机每小时的小费总和
DataStream hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());
使用 KeyedProcessFunction
去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:
// 计算每个司机每小时的小费总和
DataStream hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));
在这个代码片段中,一个名为 PseudoWindow
的 KeyedProcessFunction
被应用于 KeyedStream, 其结果是一个 DataStream
(与使用 Flink 内置时间窗口的实现生成的流相同)。
PseudoWindow
的总体轮廓示意如下:
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction {
private final long durationMsec;
public PseudoWindow(Time duration) {
this.durationMsec = duration.toMilliseconds();
}
@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}
@Override
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Collector out) throws Exception {
. . .
}
@Override
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector out) throws Exception {
. . .
}
}
注意事项:
- 有几种类型的 ProcessFunctions – 不仅包括
KeyedProcessFunction
,还包括CoProcessFunctions
、BroadcastProcessFunctions
等. KeyedProcessFunction
是一种RichFunction
。作为RichFunction
,它可以访问使用 Managed Keyed State 所需的open
和getRuntimeContext
方法。- 有两个回调方法须要实现:
processElement
和onTimer
。每个输入事件都会调用processElement
方法; 当计时器触发时调用onTimer
。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElement
和onTimer
都提供了一个上下文对象,该对象可用于与TimerService
交互。 这两个回调还传递了一个可用于发出结果的Collector
。
open()
方法
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState sumOfTips;
@Override
public void open(Configuration conf) {
MapStateDescriptor sumDesc =
new MapStateDescriptor("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}
由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState
来支持处理这一点,该 MapState
将每个窗口的结束时间戳映射到该窗口的小费总和。
processElement()
方法
public void processElement(
TaxiFare fare,
Context ctx,
Collector out) throws Exception {
long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();
if (eventTime
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?