主页 > 创业  > 

flinksqlClient提交hiveIceberg

flinksqlClient提交hiveIceberg

flink sqlClient提交hiveIceberg 环境准备sqlclient启动前准备启动sqlclientinit.sqlinsert.sql

环境准备 组件名版本flink客户端1.14.4-2.12hadoop集群3.1.4hive客户端3.1.2icebergiceberg-flink-runtime-1.14-0.13.2.jariceberg-hive依赖iceberg-hive-runtime-0.13.2.jar sqlclient启动前准备

sqlclient启动有两种方式,per-job、session。 session模式需先启动一个session,启动方式如下:

/home/hadoop/flink/bin/yarn-session.sh \ -t /home/hadoop/flink/sqlplugins \ -s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d

per-job模式需在flink客户端的flink-conf.yaml文件中添加如下参数: execution.target: yarn-per-job 注意:

flink-conf.yaml文件中还设置了其他内容如下 classloader.resolve-order: parent-first classloader.check-leaked-classloader: false #kerberos相关配置 security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab: /bigdata/apps/test/core.keytab security.kerberos.login.principal: hadoop security.kerberos.login.contexts: Client 启动sqlclient -- yarn session模式 /home/hadoop/flink/bin/sql-client.sh embedded \ -s appId \ -l /home/hadoop/flink/sqlplugins \ -i /home/hadoop/flink/script/init.sql \ -f /home/hadoop/flink/script/insert.sql \ shell -- yarn per-job模式 /home/hadoop/flink/bin/sql-client.sh embedded \ -l /home/hadoop/flink/sqlplugins \ -i /home/hadoop/flink/script/init.sql \ -f /home/hadoop/flink/script/insert.sql \ shell init.sql set 'sql-client.verbose'='true'; SET 'execution.checkpointing.interval' = '60s'; CREATE CATALOG ice_catalog WITH ( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://hdp02.bonc :9083', 'warehouse' = 'hdfs://beh001/tmp/', 'hive-conf-dir' = '/home/hadoop/flink/confdir', 'hadoop-conf-dir' = '/home/hadoop/flink/confdir' ); CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db; CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb ( deal_date string, chnl_id string, chnl_name string, region_code string, city_code string, chnl_third_class string, chnl_second_class string, chnl_first_class string, chnl_area_class string, chnl_eff_flag string, oper_id string, oper_name string, self_term_code string, air_term_code string, oper_eff_flag string, item_cls_type string, item_cls_desc string, item_grp_type string, item_grp_desc string, user_chnl_id string, user_chnl_name string, user_region_code string, user_city_code string, item_value1 decimal(14,2), item_value2 decimal(14,2), PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED ) WITH ( 'write.upsert.enabled' = 'true', 'write.metadata.previous-versions-max' = '10', 'write.metadata.delete-after-commit.enabled' = 'true', 'commit.manifest.min-count-to-merge' = '1', 'engine.hive.enabled' = 'true', 'table.dynamic-table-options.enabled' = 'true', 'format-version' = '2' ); CREATE TABLE csvSource ( deal_date string COMMENT '处理日期', chnl_id string COMMENT '渠道ID', chnl_name string COMMENT '渠道名称', region_code string COMMENT '归属地市代码', city_code string COMMENT '归属区县代码', chnl_third_class string COMMENT '渠道三级类型', chnl_second_class string COMMENT '渠道二级类型', chnl_first_class string COMMENT '渠道一级类型', chnl_area_class string COMMENT '渠道地域属性', chnl_eff_flag string COMMENT '渠道有效标志', oper_id string COMMENT '工号ID', oper_name string COMMENT '工号姓名', self_term_code string COMMENT '自助终端标志', air_term_code string COMMENT '空中充值标志', oper_eff_flag string COMMENT '工号有效标志', item_cls_type string COMMENT '指标大类代码', item_cls_desc string COMMENT '指标大类名称', item_grp_type string COMMENT '指标细项代码', item_grp_desc string COMMENT '指标细项名称', user_chnl_id string COMMENT '用户渠道ID', user_chnl_name string COMMENT '用户渠道名称', user_region_code string COMMENT '用户归属地市代码', user_city_code string COMMENT '用户归属区县代码', item_value1 decimal(14,2) COMMENT '指标值1', item_value2 decimal(14,2) COMMENT '指标值2' ) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://beh001/tmp/originData/csvSource.txt', 'format' = 'csv', 'csv.field-delimiter' = ',' ); insert.sql insert into ice_catalog.ice_db.ice_tb select deal_date , chnl_id , chnl_name , region_code , city_code , chnl_third_class , chnl_second_class , chnl_first_class , chnl_area_class , chnl_eff_flag , oper_id , oper_name , self_term_code , air_term_code , oper_eff_flag , item_cls_type , item_cls_desc , item_grp_type , item_grp_desc , user_chnl_id , user_chnl_name , user_region_code , user_city_code , item_value1, item_value2 from csvSource;
标签:

flinksqlClient提交hiveIceberg由讯客互联创业栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“flinksqlClient提交hiveIceberg