您当前的位置: 首页 >  sql

杨林伟

暂无认证

  • 3浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)

杨林伟 发布时间:2022-03-09 09:24:14 ,浏览量:3

文章目录
  • 01 引言
  • 02 FlinkSQL 整合Hive
    • 2.1 介绍
    • 2.2 集成Hive的基本方式
    • 2.3 准备工作
    • 2.4 SQL CLI
    • 2.5 代码演示
  • 03 文末

01 引言

在前面的博客,我们学习了FlinkFile Sink了,有兴趣的同学可以参阅下:

  • 《Flink教程(01)- Flink知识图谱》
  • 《Flink教程(02)- Flink入门》
  • 《Flink教程(03)- Flink环境搭建》
  • 《Flink教程(04)- Flink入门案例》
  • 《Flink教程(05)- Flink原理简单分析》
  • 《Flink教程(06)- Flink批流一体API(Source示例)》
  • 《Flink教程(07)- Flink批流一体API(Transformation示例)》
  • 《Flink教程(08)- Flink批流一体API(Sink示例)》
  • 《Flink教程(09)- Flink批流一体API(Connectors示例)》
  • 《Flink教程(10)- Flink批流一体API(其它)》
  • 《Flink教程(11)- Flink高级API(Window)》
  • 《Flink教程(12)- Flink高级API(Time与Watermaker)》
  • 《Flink教程(13)- Flink高级API(状态管理)》
  • 《Flink教程(14)- Flink高级API(容错机制)》
  • 《Flink教程(15)- Flink高级API(并行度)》
  • 《Flink教程(16)- Flink Table与SQL》
  • 《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》
  • 《Flink教程(18)- Flink阶段总结》
  • 《Flink教程(19)- Flink高级特性(BroadcastState)》
  • 《Flink教程(20)- Flink高级特性(双流Join)》
  • 《Flink教程(21)- Flink高级特性(End-to-End Exactly-Once)》
  • 《Flink教程(22)- Flink高级特性(异步IO)》
  • 《Flink教程(23)- Flink高级特性(Streaming File Sink)》
  • 《Flink教程(24)- Flink高级特性(File Sink)》

本文主要讲解Flink SQL 整合Hive。

02 FlinkSQL 整合Hive 2.1 介绍

参考:

  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
  • https://zhuanlan.zhihu.com/p/338506408

使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,接下来将以最新的Flink1.12版本为例,实现Flink集成Hive。

2.2 集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据:Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。
  • 利用 Flink 来读写 Hive 的表:Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

2.3 准备工作

1.添加hadoop_classpath

vim /etc/profile

增加如下配置

export HADOOP_CLASSPATH=`hadoop classpath`

刷新配置

source /etc/profile

2.下载jar并上传至flink/lib目录 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/ 在这里插入图片描述 3.修改hive配置

vim /export/server/hive/conf/hive-site.xml

        hive.metastore.uris
        thrift://node3:9083

4.启动hive元数据服务

nohup /export/server/hive/bin/hive --service metastore &
2.4 SQL CLI

1.修改flinksql配置

vim /export/server/flink/conf/sql-client-defaults.yaml 

增加如下配置

catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /export/server/hive/conf
     default-database: default

2.启动flink集群

/export/server/flink/bin/start-cluster.sh

3.启动flink-sql客户端

/export/server/flink/bin/sql-client.sh embedded

4.执行sql:

show catalogs;
use catalog myhive;
show tables;
select * from person;
2.5 代码演示
/**
 * Flink SQL 整合hive
 *
 * @author : YangLinWei
 * @createTime: 2022/3/9 9:22 上午
 */
public class HiveDemo {

    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "./conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        //注册catalog
        tableEnv.registerCatalog("myhive", hive);
        //使用注册的catalog
        tableEnv.useCatalog("myhive");

        //向Hive表中写入数据
        String insertSQL = "insert into person select * from person";
        TableResult result = tableEnv.executeSql(insertSQL);

        System.out.println(result.getJobClient().get().getJobStatus());
    }
}
03 文末

本文主要讲解FlinkSQL整合Hive,谢谢大家的阅读,本文完!

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.2385s