- 01 引言
- 02 开发前准备
- 2.1 API
- 2.2 编程模型
- 03 入门案例
- 3.1 项目搭建
- 3.2 代码实现
- 3.2.1 基于DataSet
- 3.2.2 基于DataStream
- 3.2.3 Lambda版
- 3.2.3 在Yarn上运行(待验证)
- 05 文末
在前面的博客,我们已经可以基本把Flink
的环境搭建起来了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
Flink
的环境既然有了,那么本文开始讲解Flink
的入门案例。
在写入门案例之前,需要知道一些概念,即:API
和编程模型。
在 《Flink教程(02)- Flink入门》讲述过Flink
是由以下组件栈组成的:
- 物理部署层
- RuntimeTime核心层
- API & Libraires
- 扩展库
Flink提供了多个层次的API
供开发者使用,越往上抽象程度越高,使用起来越方便,越往下越底层,使用起来难度越大。


注意:在Flink1.12
时支持流批一体,DataSetAPI
已经不推荐使用了,所以会优先使用DataStream
流式API
,既支持无界数据处理/流处理,也支持有界数据处理/批处理!
参阅文献:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
- https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
Flink
应用程序结构主要包含三部分,Source/Transformation/Sink
,如下图所示:
现在提一个需求:使用Flink
实现WordCount
。
首先pom
文件添加依赖(为了方便以后的项目演示,这里的pom
文件依赖了将来要讲解的库):
4.0.0
com.ylw
flink-demo
1.0.0
Flink demo
Yang Lin Wei
https://yanglinwei.blog.csdn.net/
aliyun
http://maven.aliyun.com/nexus/content/groups/public/
apache
https://repository.apache.org/content/repositories/snapshots/
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.12.0
org.apache.flink
flink-clients_2.12
${flink.version}
org.apache.flink
flink-scala_2.12
${flink.version}
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-scala_2.12
${flink.version}
org.apache.flink
flink-streaming-java_2.12
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_2.12
${flink.version}
org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}
org.apache.flink
flink-table-planner_2.12
${flink.version}
org.apache.flink
flink-table-planner-blink_2.12
${flink.version}
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}
org.apache.flink
flink-connector-jdbc_2.12
${flink.version}
org.apache.flink
flink-csv
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.bahir
flink-connector-redis_2.11
1.0
flink-streaming-java_2.11
org.apache.flink
flink-runtime_2.11
org.apache.flink
flink-core
org.apache.flink
flink-java
org.apache.flink
org.apache.flink
flink-connector-hive_2.12
${flink.version}
org.apache.hive
hive-metastore
2.1.0
org.apache.hive
hive-exec
2.1.0
org.apache.flink
flink-shaded-hadoop-2-uber
2.7.5-10.0
org.apache.hbase
hbase-client
2.1.0
mysql
mysql-connector-java
5.1.38
io.vertx
vertx-core
3.9.0
io.vertx
vertx-jdbc-client
3.9.0
io.vertx
vertx-redis-client
3.9.0
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
com.alibaba
fastjson
1.2.44
org.projectlombok
lombok
1.18.2
provided
src/main/java
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
1.8
1.8
org.apache.maven.plugins
maven-surefire-plugin
2.18.1
false
true
**/*Test.*
**/*Suite.*
org.apache.maven.plugins
maven-shade-plugin
2.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
然后配置log4j.properties
文件:
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3.2 代码实现
这里我们按照官方的入门案例来讲解:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
开发步骤:
- step1:准备环境-env
- step2:准备数据-source
- step3:处理数据-transformation
- step4:输出结果-sink
- step5:触发执行-execute
创建环境可以使用如下3种方式:
getExecutionEnvironment() //推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
3.2.1 基于DataSet
/**
* 需求:使用Flink完成WordCount-DataSet
*
* @author : YangLinWei
* @createTime: 2022/3/7 9:47 上午
*
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
*/
public class WordCount1 {
/**
* 老版本的批处理API如下,但已经不推荐使用了
*/
public static void main(String[] args) throws Exception {
//1.准备环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.准备数据-source
DataSet lineDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction extends Function, Serializable {
void flatMap(T value, Collector out) throws Exception;
}
*/
DataSet wordsDS = lineDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//3.2对集合中的每个单词记为1
/*
public interface MapFunction extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataSet wordAndOnesDS = wordsDS.map(new MapFunction() {
@Override
public Tuple2 map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
UnsortedGrouping groupedDS = wordAndOnesDS.groupBy(0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataSet aggResult = groupedDS.sum(1);
//3.5排序
DataSet result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
//env.execute();//'execute()', 'count()', 'collect()', or 'print()'.
}
}
运行结果:
/**
* 需求:使用Flink完成WordCount-DataStream
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:02 上午
*
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class WordCount2 {
/**
* 新版本的流批统一API,既支持流处理也支持批处理
*/
public static void main(String[] args) throws Exception {
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction extends Function, Serializable {
void flatMap(T value, Collector out) throws Exception;
}
*/
DataStream wordsDS = linesDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
//3.2对集合中的每个单词记为1
/*
public interface MapFunction extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataStream wordAndOnesDS = wordsDS.map(new MapFunction() {
@Override
public Tuple2 map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
//KeyedStream groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataStream result = groupedDS.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();//DataStream需要调用execute
}
}
运行结果:
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions
/**
* 需求:使用Flink完成WordCount-DataStream--使用lambda表达式
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:05 上午
*
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class WordCount3 {
public static void main(String[] args) throws Exception {
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
/*
public interface FlatMapFunction extends Function, Serializable {
void flatMap(T value, Collector out) throws Exception;
}
*/
//lambda表达式的语法:
// (参数)->{方法体/函数体}
//lambda表达式就是一个函数,函数的本质就是对象
DataStream wordsDS = linesDS.flatMap(
(String value, Collector out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING);
//3.2对集合中的每个单词记为1
/*
public interface MapFunction extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream wordAndOnesDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));*/
DataStream wordAndOnesDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
, TypeInformation.of(new TypeHint() {
})
);
//3.3对数据按照单词(key)进行分组
//0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组
//KeyedStream groupedDS = wordAndOnesDS.keyBy(0);
//KeyedStream groupedDS = wordAndOnesDS.keyBy((KeySelector) t -> t.f0);
KeyedStream groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加!
DataStream result = groupedDS.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();
}
}
输出结果:
step1:首先运行hadoop
cd /usr/local/Cellar/hadoop/3.3.1/sbin
./start-all.sh
step2:注意写入HDFS
存在权限问题,进行如下设置:
hadoop fs -chmod -R 777 /
step3:代码如下(注意代码添加了:System.setProperty("HADOOP_USER_NAME", "root")
):
/**
* 需求:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行
*
* @author : YangLinWei
* @createTime: 2022/3/7 10:11 上午
*
* 编码步骤:
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute//批处理不需要调用!流处理需要
*/
public class WordCount4 {
public static void main(String[] args) throws Exception {
//获取参数
ParameterTool params = ParameterTool.fromArgs(args);
String output = null;
if (params.has("output")) {
output = params.get("output");
} else {
output = "hdfs://127.0.0.1:8020/wordcount/output_" + System.currentTimeMillis();
}
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.准备数据-source
DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw");
//3.处理数据-transformation
DataStream result = linesDS
.flatMap(
(String value, Collector out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING)
.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT))
//.keyBy(0);
.keyBy((KeySelector) t -> t.f0)
.sum(1);
//4.输出结果-sink
result.print();
//如果执行报hdfs权限相关错误,可以执行 hadoop fs -chmod -R 777 /
System.setProperty("HADOOP_USER_NAME", "root");//设置用户名
//result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1);
result.writeAsText(output).setParallelism(1);
//5.触发执行-execute
env.execute();
}
}
step4:打包
step5:打包成功后jar包:
step5:提交至服务器指定的路径(参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html):
- 注意:本系统是mac系统,所有环境都在本机,所以就不上传了。
step6:Flink命令执行jar:
flink run -Dexecution.runtime-mode=BATCH \
-m yarn-cluster \
-yjm 1024 \
-ytm 1024 \
-c com.ylw.WordCount4 \
项目路径/flink-demo/target/original-flink-demo-1.0.0.jar\
--output hdfs://127.0.0.1:8020/wordcount/output_xx
在Web页面可以观察到提交的程序:
- http://node1:8088/cluster
- http://node1:50070/explorer.html#/
或者在Standalone
模式下使用web
界面提交
本文主要讲解了Flink
的入门例子,主要基于DataSet
和DataStream
来讲解,谢谢大家的阅读,本文完!