FlinkSQL--Flink整合hive
- 创业
- 2025-08-12 22:45:02

1、整合 # 1、将依赖包上传到flink的lib目录下 flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar # 2、重启flink集群 yarn application -list yarn application -kill application_1699579932721_0003 yarn-session.sh -d # 3、重新进入sql命令行 sql-client.sh 2、Hive catalog
catalog(元数据) ---> database ---> table ---> 数据 --- > 列
-- 1、开启hive的元数据服务 nohup hive --service metastore & -- 2、创建hive catalog CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf' ); -- 查看所有的catalog -- default_catalog: 默认的元数据,将元数据保存在内存中 show catalogs; --3、切换catalog use catalog myhive; --4、在flink中就可以使用hive中已经创建好的表 select * from student; -- 可以从catalog开始定位一张表 select * from myhive.`default`.student; -- 将flink的表结构保存到hive catalog中 -- hive中可以看到flink创建的流表,但是在hive中不能查询flink的流表 create database flink; use flink; -- 创建flink动态表 CREATE TABLE students_kafka ( `offset` BIGINT METADATA VIRTUAL, -- 偏移量 `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用 sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); 3、Hive functions在Flink中的使用hive中的函数:
-- 加载hive函数 LOAD MODULE hive WITH ('hive-version' = '3.1.2'); -- 使用hive的函数 select split('java,spark',',');FlinkSQL--Flink整合hive由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“FlinkSQL--Flink整合hive”