- 01 引言
- 02 Flink Table&SQL 案例
- 2.1 案例1(DataStream SQL统计)
- 2.2 案例2(DataStream Table&SQL统计)
- 2.3 案例3(SQL与滚动窗口)
- 2.4 案例4(SQL消费Kafka)
- 03 Flink SQL常用算子
- 3.1 SELECT
- 3.2 WHERE
- 3.3 DISTINCT
- 3.4 GROUP BY
- 3.5 UNION 和 UNION ALL
- 3.6 JOIN
- 3.7 Group Window
- 3.7.1 Tumble Window滚动窗口
- 3.7.2 Hop Window滑动窗口
- 3.7.3 Session Window会话时间窗口
- 04 文末
在前面的博客,我们学习了Flink
的TableAP和SQL
,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
- 《Flink教程(14)- Flink高级API(容错机制)》
- 《Flink教程(15)- Flink高级API(并行度)》
- 《Flink教程(16)- Flink Table与SQL》
本文主要讲的是Flink Table
与SQL
的一些案例。
需求:将DataStream
注册为Table
和View
并进行SQL
统计。
代码如下:
import static org.apache.flink.table.api.Expressions.$;
/**
* 案例1:将DataStream注册为Table和View并进行SQL统计
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:07 下午
*/
public class Demo1 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
//3.注册表
// convert DataStream to Table
Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
// register DataStream as Table
tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
//4.执行查询
System.out.println(tableA);
// union the two tables
Table resultTable = tEnv.sqlQuery(
"SELECT * FROM " + tableA + " WHERE amount > 2 " +
"UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2"
);
//5.输出结果
DataStream resultDS = tEnv.toAppendStream(resultTable, Order.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
public Long user;
public String product;
public int amount;
}
}
运行结果:
需求:使用SQL
和Table
两种方式对DataStream
中的单词进行统计。
示例代码如下(SQL方式):
import static org.apache.flink.table.api.Expressions.$;
/**
* 使用SQL和Table两种方式对DataStream中的单词进行统计。
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:13 下午
*/
public class Demo02 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));
//4.执行查询
Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
//5.输出结果
//toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
//DataStream resultDS = tEnv.toAppendStream(resultTable, WC.class);
DataStream resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
运行结果: 示例代码如下(Table方式):
/**
* 使用SQL和Table两种方式对DataStream中的单词进行统计
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:15 下午
*/
public class Demo02Table {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStream input = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//3.注册表
Table table = tEnv.fromDataStream(input);
//4.执行查询
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.filter($("frequency").isEqual(2));
//5.输出结果
DataStream resultDS = tEnv.toRetractStream(resultTable, WC.class);
resultDS.print();
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}
需求:使用Flink SQL
来统计5秒内 每个用户的订单总数、订单的最大金额、订单的最小金额,也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额。
上面的需求使用流处理的Window
的基于时间的滚动窗口就可以搞定!
编码步骤:
- 创建环境
- 使用自定义函数模拟实时流数据
- 设置事件时间和
Watermaker
- 注册表
- 执行
sql
-可以使用sql
风格或table
风格 - 输出结果
- 触发执行
SQL方式实现:
/**
* SQL方式实现
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:19 下午
*/
public class Demo3SQL {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStreamSource orderDS = env.addSource(new RichSourceFunction() {
private Boolean isRunning = true;
@Override
public void run(SourceContext ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//3.Transformation
DataStream watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
);
//4.注册表
tEnv.createTemporaryView("t_order", watermakerDS,
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//5.执行SQL
String sql = "select " +
"userId," +
"count(*) as totalCount," +
"max(money) as maxMoney," +
"min(money) as minMoney " +
"from t_order " +
"group by userId," +
"tumble(createTime, interval '5' second)";
Table ResultTable = tEnv.sqlQuery(sql);
//6.Sink
//将SQL的执行结果转换成DataStream再打印出来
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;
}
}
Table实现方式:
/**
* Table方式
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:26 下午
*/
public class Demo3Table {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
DataStreamSource orderDS = env.addSource(new RichSourceFunction() {
private Boolean isRunning = true;
@Override
public void run(SourceContext ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//3.Transformation
DataStream watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.getCreateTime())
);
//4.注册表
tEnv.createTemporaryView("t_order", watermakerDS,
$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//查看表约束
tEnv.from("t_order").printSchema();
//5.TableAPI查询
Table ResultTable = tEnv.from("t_order")
//.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
.window(Tumble.over(lit(5).second())
.on($("createTime"))
.as("tumbleWindow"))
.groupBy($("tumbleWindow"), $("userId"))
.select(
$("userId"),
$("userId").count().as("totalCount"),
$("money").max().as("maxMoney"),
$("money").min().as("minMoney"));
//6.将SQL的执行结果转换成DataStream再打印出来
DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
//7.excute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;
}
}
2.4 案例4(SQL消费Kafka)
需求:从Kafka
中消费数据并过滤出状态为success
的数据再写入到Kafka
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
/**
* 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
*
* @author : YangLinWei
* @createTime: 2022/3/8 2:30 下午
*/
public class Demo4 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
TableResult inputTable = tEnv.executeSql(
"CREATE TABLE input_kafka (\n" +
" `user_id` BIGINT,\n" +
" `page_id` BIGINT,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'input_kafka',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")"
);
TableResult outputTable = tEnv.executeSql(
"CREATE TABLE output_kafka (\n" +
" `user_id` BIGINT,\n" +
" `page_id` BIGINT,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'output_kafka',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'format' = 'json',\n" +
" 'sink.partitioner' = 'round-robin'\n" +
")"
);
String sql = "select " +
"user_id," +
"page_id," +
"status " +
"from input_kafka " +
"where status = 'success'";
Table ResultTable = tEnv.sqlQuery(sql);
DataStream resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
tEnv.executeSql("insert into output_kafka select * from " + ResultTable);
//7.excute
env.execute();
}
}
03 Flink SQL常用算子
3.1 SELECT
SELECT :用于从 DataSet/DataStream
中选择数据,用于筛选出某些列。
示例:
SELECT * FROM Table;
// 取出表中的所有列SELECT name,age FROM Table;
// 取出表中name
和age
两列
与此同时 SELECT
语句中可以使用函数和别名,例如我们上面提到的 WordCount
中:
SELECT word, COUNT(word) FROM table GROUP BY word;
3.2 WHERE
WHERE :用于从数据集/流中过滤数据,与 SELECT
一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
示例:
SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
SELECT * FROM Table WHERE age = 20;
WHERE
是从原数据中进行过滤,那么在WHERE
条件中,Flink SQL
同样支持 =、、、>=、
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?