您当前的位置: 首页 >  flink

顧棟

暂无认证

  • 4浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Flink学习】入门教程之Event-driven Applications

顧棟 发布时间:2022-01-29 11:42:54 ,浏览量:4

文章目录
  • 事件驱动应用
    • 处理函数(Process Functions)
      • 简介
      • 示例
        • `open()` 方法
        • `processElement()` 方法
        • `onTimer()` 方法
      • 性能考虑
    • 旁路输出(Side Outputs)
      • 简介
      • 示例
    • 结语
原文地址

事件驱动应用 处理函数(Process Functions) 简介

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

示例

如果你已经体验了 流式分析训练 的动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:

// 计算每个司机每小时的小费总和
DataStream hourlyTips = fares
        .keyBy((TaxiFare fare) -> fare.driverId)
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .process(new AddTips());

使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:

// 计算每个司机每小时的小费总和
DataStream hourlyTips = fares
        .keyBy((TaxiFare fare) -> fare.driverId)
        .process(new PseudoWindow(Time.hours(1)));

在这个代码片段中,一个名为 PseudoWindowKeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream (与使用 Flink 内置时间窗口的实现生成的流相同)。

PseudoWindow 的总体轮廓示意如下:

// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends 
        KeyedProcessFunction {

    private final long durationMsec;

    public PseudoWindow(Time duration) {
        this.durationMsec = duration.toMilliseconds();
    }

    @Override
    // 在初始化期间调用一次。
    public void open(Configuration conf) {
        . . .
    }

    @Override
    // 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
    public void processElement(
            TaxiFare fare,
            Context ctx,
            Collector out) throws Exception {

        . . .
    }

    @Override
    // 当当前水印(watermark)表明窗口现在需要完成的时候调用。
    public void onTimer(long timestamp, 
            OnTimerContext context, 
            Collector out) throws Exception {

        . . .
    }
}

注意事项:

  • 有几种类型的 ProcessFunctions – 不仅包括 KeyedProcessFunction,还包括 CoProcessFunctionsBroadcastProcessFunctions 等.
  • KeyedProcessFunction 是一种 RichFunction。作为 RichFunction,它可以访问使用 Managed Keyed State 所需的 opengetRuntimeContext 方法。
  • 有两个回调方法须要实现: processElementonTimer。每个输入事件都会调用 processElement 方法; 当计时器触发时调用 onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElementonTimer 都提供了一个上下文对象,该对象可用于与 TimerService 交互。 这两个回调还传递了一个可用于发出结果的 Collector
open() 方法
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState sumOfTips;

@Override
public void open(Configuration conf) {

    MapStateDescriptor sumDesc =
            new MapStateDescriptor("sumOfTips", Long.class, Float.class);
    sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。

processElement() 方法
public void processElement(
        TaxiFare fare,
        Context ctx,
        Collector out) throws Exception {

    long eventTime = fare.getEventTime();
    TimerService timerService = ctx.timerService();

    if (eventTime             
关注
打赏
1663402667
查看更多评论
0.0480s