您当前的位置: 首页 >  sql

杨林伟

暂无认证

  • 3浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink教程(17)- Flink Table与SQL(案例与SQL算子)

杨林伟 发布时间:2022-03-08 14:50:09 ,浏览量:3

文章目录
  • 01 引言
  • 02 Flink Table&SQL 案例
    • 2.1 案例1(DataStream SQL统计)
    • 2.2 案例2(DataStream Table&SQL统计)
    • 2.3 案例3(SQL与滚动窗口)
    • 2.4 案例4(SQL消费Kafka)
  • 03 Flink SQL常用算子
    • 3.1 SELECT
    • 3.2 WHERE
    • 3.3 DISTINCT
    • 3.4 GROUP BY
    • 3.5 UNION 和 UNION ALL
    • 3.6 JOIN
    • 3.7 Group Window
      • 3.7.1 Tumble Window滚动窗口
      • 3.7.2 Hop Window滑动窗口
      • 3.7.3 Session Window会话时间窗口
  • 04 文末

01 引言

在前面的博客,我们学习了FlinkTableAP和SQL,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》
  • 《Flink教程(07)- Flink批流一体API(Transformation示例)》
  • 《Flink教程(08)- Flink批流一体API(Sink示例)》
  • 《Flink教程(09)- Flink批流一体API(Connectors示例)》
  • 《Flink教程(10)- Flink批流一体API(其它)》
  • 《Flink教程(11)- Flink高级API(Window)》
  • 《Flink教程(12)- Flink高级API(Time与Watermaker)》
  • 《Flink教程(13)- Flink高级API(状态管理)》
  • 《Flink教程(14)- Flink高级API(容错机制)》
  • 《Flink教程(15)- Flink高级API(并行度)》
  • 《Flink教程(16)- Flink Table与SQL》

本文主要讲的是Flink TableSQL的一些案例。

02 Flink Table&SQL 案例 2.1 案例1(DataStream SQL统计)

需求:将DataStream注册为TableView并进行SQL统计。

代码如下:

import static org.apache.flink.table.api.Expressions.$;
/**
 * 案例1:将DataStream注册为Table和View并进行SQL统计
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:07 下午
 */
public class Demo1 {

    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        DataStream orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));

        DataStream orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));

        //3.注册表
        // convert DataStream to Table
        Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
        // register DataStream as Table
        tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));

        //4.执行查询
        System.out.println(tableA);
        // union the two tables
        Table resultTable = tEnv.sqlQuery(
                "SELECT * FROM " + tableA + " WHERE amount > 2 " +
                        "UNION ALL " +
                        "SELECT * FROM OrderB WHERE amount < 2"
        );

        //5.输出结果
        DataStream resultDS = tEnv.toAppendStream(resultTable, Order.class);
        resultDS.print();

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}

运行结果: 在这里插入图片描述

2.2 案例2(DataStream Table&SQL统计)

需求:使用SQLTable两种方式对DataStream中的单词进行统计。

示例代码如下(SQL方式):

import static org.apache.flink.table.api.Expressions.$;

/**
 * 使用SQL和Table两种方式对DataStream中的单词进行统计。
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:13 下午
 */
public class Demo02 {

    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        DataStream input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );

        //3.注册表
        tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));

        //4.执行查询
        Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

        //5.输出结果
        //toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
        //DataStream resultDS = tEnv.toAppendStream(resultTable, WC.class);
        DataStream resultDS = tEnv.toRetractStream(resultTable, WC.class);

        resultDS.print();

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}

运行结果: 在这里插入图片描述 示例代码如下(Table方式):

/**
 * 使用SQL和Table两种方式对DataStream中的单词进行统计
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:15 下午
 */
public class Demo02Table {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        DataStream input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );

        //3.注册表
        Table table = tEnv.fromDataStream(input);

        //4.执行查询
        Table resultTable = table
                .groupBy($("word"))
                .select($("word"), $("frequency").sum().as("frequency"))
                .filter($("frequency").isEqual(2));

        //5.输出结果
        DataStream resultDS = tEnv.toRetractStream(resultTable, WC.class);

        resultDS.print();

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}

在这里插入图片描述

2.3 案例3(SQL与滚动窗口)

需求:使用Flink SQL来统计5秒内 每个用户的订单总数、订单的最大金额、订单的最小金额,也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额。

上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!

编码步骤:

  1. 创建环境
  2. 使用自定义函数模拟实时流数据
  3. 设置事件时间和Watermaker
  4. 注册表
  5. 执行sql-可以使用sql风格或table风格
  6. 输出结果
  7. 触发执行

SQL方式实现:

/**
 * SQL方式实现
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:19 下午
 */
public class Demo3SQL {

    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        DataStreamSource orderDS = env.addSource(new RichSourceFunction() {
            private Boolean isRunning = true;

            @Override
            public void run(SourceContext ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(order);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        //3.Transformation
        DataStream watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
                );

        //4.注册表
        tEnv.createTemporaryView("t_order", watermakerDS,
                $("orderId"), $("userId"), $("money"), $("createTime").rowtime());

        //5.执行SQL
        String sql = "select " +
                "userId," +
                "count(*) as totalCount," +
                "max(money) as maxMoney," +
                "min(money) as minMoney " +
                "from t_order " +
                "group by userId," +
                "tumble(createTime, interval '5' second)";

        Table ResultTable = tEnv.sqlQuery(sql);

        //6.Sink
        //将SQL的执行结果转换成DataStream再打印出来
        //toAppendStream → 将计算后的数据append到结果DataStream中去
        //toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
}

Table实现方式:

/**
 * Table方式
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:26 下午
 */
public class Demo3Table {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        DataStreamSource orderDS = env.addSource(new RichSourceFunction() {
            private Boolean isRunning = true;

            @Override
            public void run(SourceContext ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(order);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        //3.Transformation
        DataStream watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
                );

        //4.注册表
        tEnv.createTemporaryView("t_order", watermakerDS,
                $("orderId"), $("userId"), $("money"), $("createTime").rowtime());

        //查看表约束
        tEnv.from("t_order").printSchema();

        //5.TableAPI查询
        Table ResultTable = tEnv.from("t_order")
                //.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
                .window(Tumble.over(lit(5).second())
                        .on($("createTime"))
                        .as("tumbleWindow"))
                .groupBy($("tumbleWindow"), $("userId"))
                .select(
                        $("userId"),
                        $("userId").count().as("totalCount"),
                        $("money").max().as("maxMoney"),
                        $("money").min().as("minMoney"));


        //6.将SQL的执行结果转换成DataStream再打印出来
        DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        //7.excute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }

}
2.4 案例4(SQL消费Kafka)

需求:从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

代码实现:

  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
/**
 * 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:30 下午
 */
public class Demo4 {

    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'input_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'properties.group.id' = 'testGroup',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json'\n" +
                        ")"
        );
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'output_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'format' = 'json',\n" +
                        "  'sink.partitioner' = 'round-robin'\n" +
                        ")"
        );

        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";

        Table ResultTable = tEnv.sqlQuery(sql);

        DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        tEnv.executeSql("insert into output_kafka select * from " + ResultTable);


        //7.excute
        env.execute();
    }
}
03 Flink SQL常用算子 3.1 SELECT

SELECT :用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

示例:

  • SELECT * FROM Table;// 取出表中的所有列
  • SELECT name,age FROM Table;// 取出表中 nameage两列

与此同时 SELECT语句中可以使用函数和别名,例如我们上面提到的 WordCount中:

SELECT word, COUNT(word) FROM table GROUP BY word; 
3.2 WHERE

WHERE :用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

示例:

  • SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
  • SELECT * FROM Table WHERE age = 20;

WHERE是从原数据中进行过滤,那么在WHERE条件中,Flink SQL同样支持 =、、、>=、

关注
打赏
1662376985
查看更多评论
0.3140s