您当前的位置: 首页 >  flink

杨林伟

暂无认证

  • 8浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink教程(04)- Flink入门案例

杨林伟 发布时间:2022-03-07 14:01:40 ,浏览量:8

文章目录
  • 01 引言
  • 02 开发前准备
    • 2.1 API
    • 2.2 编程模型
  • 03 入门案例
    • 3.1 项目搭建
    • 3.2 代码实现
      • 3.2.1 基于DataSet
      • 3.2.2 基于DataStream
      • 3.2.3 Lambda版
      • 3.2.3 在Yarn上运行(待验证)
  • 05 文末

01 引言

在前面的博客,我们已经可以基本把Flink的环境搭建起来了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》

Flink的环境既然有了,那么本文开始讲解Flink的入门案例。

02 开发前准备

在写入门案例之前,需要知道一些概念,即:API 和编程模型。

2.1 API

在 《Flink教程(02)- Flink入门》讲述过Flink是由以下组件栈组成的:

  • 物理部署层
  • RuntimeTime核心层
  • API & Libraires
  • 扩展库

Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便,越往下越底层,使用起来难度越大。

在这里插入图片描述在这里插入图片描述

注意:在Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,所以会优先使用DataStream流式API,既支持无界数据处理/流处理,也支持有界数据处理/批处理!

参阅文献:

  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
  • https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC
  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
2.2 编程模型

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示: 在这里插入图片描述

03 入门案例

现在提一个需求:使用Flink实现WordCount

3.1 项目搭建

首先pom文件添加依赖(为了方便以后的项目演示,这里的pom文件依赖了将来要讲解的库):



    4.0.0

    com.ylw
    flink-demo
    1.0.0

    Flink demo

    
        
            Yang Lin Wei
            https://yanglinwei.blog.csdn.net/
        
    

    
    
        
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            apache
            https://repository.apache.org/content/repositories/snapshots/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
    

    
        UTF-8
        UTF-8
        1.8
        1.8
        1.8
        2.12
        1.12.0
    
    
        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            ${flink.version}
        
        
        
            org.apache.flink
            flink-table-planner_2.12
            ${flink.version}
        
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
        

        

        
        
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-sql-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-jdbc_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-csv
            ${flink.version}
        
        
            org.apache.flink
            flink-json
            ${flink.version}
        

        
        
        
        


        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
            
                
                    flink-streaming-java_2.11
                    org.apache.flink
                
                
                    flink-runtime_2.11
                    org.apache.flink
                
                
                    flink-core
                    org.apache.flink
                
                
                    flink-java
                    org.apache.flink
                
            
        

        
            org.apache.flink
            flink-connector-hive_2.12
            ${flink.version}
        
        
            org.apache.hive
            hive-metastore
            2.1.0
        
        
            org.apache.hive
            hive-exec
            2.1.0
        

        
            org.apache.flink
            flink-shaded-hadoop-2-uber
            2.7.5-10.0
        

        
            org.apache.hbase
            hbase-client
            2.1.0
        

        
            mysql
            mysql-connector-java
            5.1.38
            
        

        
        
            io.vertx
            vertx-core
            3.9.0
        
        
            io.vertx
            vertx-jdbc-client
            3.9.0
        
        
            io.vertx
            vertx-redis-client
            3.9.0
        

        
        
            org.slf4j
            slf4j-log4j12
            1.7.7
            runtime
        
        
            log4j
            log4j
            1.2.17
            runtime
        

        
            com.alibaba
            fastjson
            1.2.44
        

        
            org.projectlombok
            lombok
            1.18.2
            provided
        

        
        
        
        

    

    
        src/main/java
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
                
                    1.8
                    1.8
                    
                
            
            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.18.1
                
                    false
                    true
                    
                        **/*Test.*
                        **/*Suite.*
                    
                
            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                            
                                
                                    
                                    
                                
                            
                        
                    
                
            
        
    

然后配置log4j.properties文件:

log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3.2 代码实现

这里我们按照官方的入门案例来讲解:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

开发步骤:

  • step1:准备环境-env
  • step2:准备数据-source
  • step3:处理数据-transformation
  • step4:输出结果-sink
  • step5:触发执行-execute

创建环境可以使用如下3种方式:

getExecutionEnvironment() //推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
3.2.1 基于DataSet
/**
 * 需求:使用Flink完成WordCount-DataSet
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 9:47 上午
 * 

* 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute */ public class WordCount1 { /** * 老版本的批处理API如下,但已经不推荐使用了 */ public static void main(String[] args) throws Exception { //1.准备环境-env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.准备数据-source DataSet lineDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction extends Function, Serializable { void flatMap(T value, Collector out) throws Exception; } */ DataSet wordsDS = lineDS.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { //value就是一行行的数据 String[] words = value.split(" "); for (String word : words) { out.collect(word);//将切割处理的一个个的单词收集起来并返回 } } }); //3.2对集合中的每个单词记为1 /* public interface MapFunction extends Function, Serializable { O map(T value) throws Exception; } */ DataSet wordAndOnesDS = wordsDS.map(new MapFunction() { @Override public Tuple2 map(String value) throws Exception { //value就是进来一个个的单词 return Tuple2.of(value, 1); } }); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 UnsortedGrouping groupedDS = wordAndOnesDS.groupBy(0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataSet aggResult = groupedDS.sum(1); //3.5排序 DataSet result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1); //4.输出结果-sink result.print(); //5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute //env.execute();//'execute()', 'count()', 'collect()', or 'print()'. } }

运行结果: 在这里插入图片描述

3.2.2 基于DataStream
/**
 * 需求:使用Flink完成WordCount-DataStream
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 10:02 上午
 * 

* 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute */ public class WordCount2 { /** * 新版本的流批统一API,既支持流处理也支持批处理 */ public static void main(String[] args) throws Exception { //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction extends Function, Serializable { void flatMap(T value, Collector out) throws Exception; } */ DataStream wordsDS = linesDS.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { //value就是一行行的数据 String[] words = value.split(" "); for (String word : words) { out.collect(word);//将切割处理的一个个的单词收集起来并返回 } } }); //3.2对集合中的每个单词记为1 /* public interface MapFunction extends Function, Serializable { O map(T value) throws Exception; } */ DataStream wordAndOnesDS = wordsDS.map(new MapFunction() { @Override public Tuple2 map(String value) throws Exception { //value就是进来一个个的单词 return Tuple2.of(value, 1); } }); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 //KeyedStream groupedDS = wordAndOnesDS.keyBy(0); KeyedStream groupedDS = wordAndOnesDS.keyBy(t -> t.f0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataStream result = groupedDS.sum(1); //4.输出结果-sink result.print(); //5.触发执行-execute env.execute();//DataStream需要调用execute } }

运行结果: 在这里插入图片描述

3.2.3 Lambda版

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions

/**
 * 需求:使用Flink完成WordCount-DataStream--使用lambda表达式
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 10:05 上午
 * 

* 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute */ public class WordCount3 { public static void main(String[] args) throws Exception { //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction extends Function, Serializable { void flatMap(T value, Collector out) throws Exception; } */ //lambda表达式的语法: // (参数)->{方法体/函数体} //lambda表达式就是一个函数,函数的本质就是对象 DataStream wordsDS = linesDS.flatMap( (String value, Collector out) -> Arrays.stream(value.split(" ")).forEach(out::collect) ).returns(Types.STRING); //3.2对集合中的每个单词记为1 /* public interface MapFunction extends Function, Serializable { O map(T value) throws Exception; } */ /*DataStream wordAndOnesDS = wordsDS.map( (String value) -> Tuple2.of(value, 1) ).returns(Types.TUPLE(Types.STRING, Types.INT));*/ DataStream wordAndOnesDS = wordsDS.map( (String value) -> Tuple2.of(value, 1) , TypeInformation.of(new TypeHint() { }) ); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 //KeyedStream groupedDS = wordAndOnesDS.keyBy(0); //KeyedStream groupedDS = wordAndOnesDS.keyBy((KeySelector) t -> t.f0); KeyedStream groupedDS = wordAndOnesDS.keyBy(t -> t.f0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataStream result = groupedDS.sum(1); //4.输出结果-sink result.print(); //5.触发执行-execute env.execute(); } }

输出结果: 在这里插入图片描述

3.2.3 在Yarn上运行(待验证)

step1:首先运行hadoop

cd /usr/local/Cellar/hadoop/3.3.1/sbin
./start-all.sh

在这里插入图片描述

step2:注意写入HDFS存在权限问题,进行如下设置:

hadoop fs -chmod -R 777  /

step3:代码如下(注意代码添加了:System.setProperty("HADOOP_USER_NAME", "root")):

/**
 * 需求:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 10:11 上午
 * 

* 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute//批处理不需要调用!流处理需要 */ public class WordCount4 { public static void main(String[] args) throws Exception { //获取参数 ParameterTool params = ParameterTool.fromArgs(args); String output = null; if (params.has("output")) { output = params.get("output"); } else { output = "hdfs://127.0.0.1:8020/wordcount/output_" + System.currentTimeMillis(); } //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation DataStream result = linesDS .flatMap( (String value, Collector out) -> Arrays.stream(value.split(" ")).forEach(out::collect) ).returns(Types.STRING) .map( (String value) -> Tuple2.of(value, 1) ).returns(Types.TUPLE(Types.STRING, Types.INT)) //.keyBy(0); .keyBy((KeySelector) t -> t.f0) .sum(1); //4.输出结果-sink result.print(); //如果执行报hdfs权限相关错误,可以执行 hadoop fs -chmod -R 777 / System.setProperty("HADOOP_USER_NAME", "root");//设置用户名 //result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1); result.writeAsText(output).setParallelism(1); //5.触发执行-execute env.execute(); } }

step4:打包 在这里插入图片描述

step5:打包成功后jar包: 在这里插入图片描述

step5:提交至服务器指定的路径(参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html):

  • 注意:本系统是mac系统,所有环境都在本机,所以就不上传了。

step6:Flink命令执行jar:

flink run -Dexecution.runtime-mode=BATCH \
-m yarn-cluster \
-yjm 1024 \
-ytm 1024 \
-c com.ylw.WordCount4 \
项目路径/flink-demo/target/original-flink-demo-1.0.0.jar\
--output hdfs://127.0.0.1:8020/wordcount/output_xx

在Web页面可以观察到提交的程序:

  • http://node1:8088/cluster
  • http://node1:50070/explorer.html#/

或者在Standalone模式下使用web界面提交

05 文末

本文主要讲解了Flink的入门例子,主要基于DataSetDataStream来讲解,谢谢大家的阅读,本文完!

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.3136s