您当前的位置: 首页 >  flink

杨林伟

暂无认证

  • 4浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink教程(20)- Flink高级特性(双流Join)

杨林伟 发布时间:2022-03-08 23:21:55 ,浏览量:4

文章目录
  • 01 引言
  • 02 双流join介绍
  • 03 Window Join
    • 3.1 Tumbling Window Join
    • 3.2 Sliding Window Join
    • 3.3 Session Window Join
  • 04 Interval Join
  • 05 案例讲解
    • 5.1 案例1
    • 5.2 案例2
  • 06 文末

01 引言

在前面的博客,我们学习了FlinkBroadcastState了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》
  • 《Flink教程(07)- Flink批流一体API(Transformation示例)》
  • 《Flink教程(08)- Flink批流一体API(Sink示例)》
  • 《Flink教程(09)- Flink批流一体API(Connectors示例)》
  • 《Flink教程(10)- Flink批流一体API(其它)》
  • 《Flink教程(11)- Flink高级API(Window)》
  • 《Flink教程(12)- Flink高级API(Time与Watermaker)》
  • 《Flink教程(13)- Flink高级API(状态管理)》
  • 《Flink教程(14)- Flink高级API(容错机制)》
  • 《Flink教程(15)- Flink高级API(并行度)》
  • 《Flink教程(16)- Flink Table与SQL》
  • 《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》
  • 《Flink教程(18)- Flink阶段总结》
  • 《Flink教程(19)- Flink高级特性(BroadcastState)》

本文主要讲解Flink的高级特性其中之一的双流Join

02 双流join介绍

在这里插入图片描述 Join大体分类只有两种:Window Join和Interval Join。

Window Join又可以根据Window的类型细分出3种:

  • Tumbling Window Join
  • Sliding Window Join
  • Session Widnow Join

Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;

interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;目前Stream join的结果是数据的笛卡尔积;

03 Window Join 3.1 Tumbling Window Join

执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射!

如图所示,我们定义了一个大小为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]、。。。。该图显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。 在这里插入图片描述

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream orangeStream = ...DataStream greenStream = ...
orangeStream.join(greenStream)
    .where()
    .equalTo()
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
3.2 Sliding Window Join

在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会连接到一个滑动窗口中,但不会连接到另一个滑动窗口中!

在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[0,1],[1,2],[2,3]…。x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,您还可以看到,例如,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。 在这里插入图片描述

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream orangeStream = ...DataStream greenStream = ...
orangeStream.join(greenStream)
    .where()
    .equalTo()
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
3.3 Session Window Join

在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!

在这里,我们定义了一个会话窗口连接,其中每个会话被至少1ms的间隔分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

在这里插入图片描述

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream orangeStream = ...DataStream greenStream = ...
orangeStream.join(greenStream)
    .where()
    .equalTo()
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });
04 Interval Join

前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?

  • interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 
or 
a.timestamp + lowerBound  item.getGoodsId())
                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                .between(Time.seconds(-1), Time.seconds(0))
                .upperBoundExclusive()
                .process(new ProcessJoinFunction() {
                    @Override
                    public void processElement(OrderItem left, Goods right, Context ctx, Collector out) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(right.getGoodsId());
                        factOrderItem.setGoodsName(right.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(left.getCount()));
                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));

                        out.collect(factOrderItem);
                    }
                });

        factOrderItemDS.print();

        env.execute("Interval JOIN");
    }

    //商品类
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;

        public static List GOODS_LIST;
        public static Random r;

        static {
            r = new Random();
            GOODS_LIST = new ArrayList();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }

        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }

        public Goods() {
        }

        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //订单明细类
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //关联结果
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource11 extends RichSourceFunction {
        private Boolean isCancel;

        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction {
        private Boolean isCancel;
        private Random r;

        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    //构建水印分配器(此处为了简单),直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy {

        @Override
        public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }

        @Override
        public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

    public static class OrderItemWatermark implements WatermarkStrategy {
        @Override
        public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }

        @Override
        public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}
06 文末

本文主要讲解了Flink双流join的高级特性,谢谢大家的阅读本文完!

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.2880s