当前位置:   article > 正文

数据湖paimon连接flink、mysql和hive_paimon 参数

paimon 参数

一、启动flink客户端并测试

1、环境准备

flink版本:1.16.2

lib下需要的依赖包:

antlr-runtime-3.5.2.jar
commons-beanutils-1.9.3.jar
commons-pool2-2.4.3.jar
druid-1.1.19.jar
fastjson-1.2.57.jar
flink-cep-1.16.2.jar
flink-connector-files-1.16.2.jar
flink-connector-hive_2.12-1.16.2.jar
flink-connector-jdbc-1.16.2.jar
flink-connector-kafka-1.16.2.jar
flink-connector-starrocks-1.2.7_flink-1.15.jar
flink-csv-1.16.2.jar
flink-dist-1.16.2.jar
flink-json-1.16.2.jar
flink-scala_2.12-1.16.2.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-shaded-zookeeper-3.5.9.jar
flink-table-api-java-uber-1.16.2.jar
flink-table-planner-loader-1.16.2.jar
flink-table-runtime-1.16.2.jar
hive-exec-2.3.4.jar
kafka-clients-2.4.1.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar
mysql-connector-java-8.0.19.jar
paimon-flink-1.16.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在这里插入图片描述

2、启动flink客户端(yarn session模式)
-- 首先要配置yarn地址,export HADOOP_CLASSPATH=`hadoop classpath`
-- https://nightlies.apache.org/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html
bin/yarn-session.sh -yn 3 -ys 3 -yjm 2048 -ytm 5120 -ynm flink_session_testn -d & 
  • 1
  • 2
  • 3
-d : 后台启动
-ynm: 应用名称
  • 1
  • 2
3、启动flink客户端
bin/sql-client.sh embedded -s yarn-session
  • 1
4、测试是否通
1) kafka测试

create table dwd_crisps_biz_interview_rt
(
    id                    BIGINT COMMENT '主键id',
    inviter_id            BIGINT COMMENT '面谈人id',
    inviter_org_id        BIGINT COMMENT '面谈人部门id',
    accompany_id          BIGINT COMMENT '陪面谈人id',
    record_type           INT COMMENT '记录类型',
    association_id        BIGINT COMMENT '关联业务id',
    association_no        VARCHAR(64) COMMENT '关联业务编号',
    create_time           TIMESTAMP COMMENT '创建时间',
    confirm_complete_time TIMESTAMP COMMENT '完成时间',
    invite_time           TIMESTAMP COMMENT '约定时间',
    process_time          as PROCTIME(),
    PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    'connector' = 'kafka'
    ,'topic' = 'bigdata_dwd_db_crm_biz_topic'
    ,'properties.bootstrap.servers' = 'localhost:9092'
    ,'properties.group.id' = 'flink_client_sql_group_id222'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'canal-json'
    ,'value.canal-json.ignore-parse-errors' = 'true'
    ,'value.canal-json.table.include' = '^dwd_crisps_biz_interview_rt'
);

select * from dwd_crisps_biz_interview_rt;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
2)jdbc测试
 create table crisps_ads_goods_detail (
        goods_id bigint,
        sect_code string,
        spu_id bigint,
        goods_json string,
        load_time timestamp
      ) with (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://localhost:3306/crisps_bigdata_ads?useUnicode=true&characterEncoding=UTF-8&useAffectedRows=true&useSSL=false&serverTimezone=UTC',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 'crisps_ads_goods_detail',
       'driver' = 'com.mysql.cj.jdbc.Driver',
       'scan.fetch-size' = '200'
	);
 
 select * from crisps_ads_goods_detail;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

二、使用paimon

1、创建paimon的catalog
CREATE CATALOG paimon_catalog WITH (
	'type'='paimon',
	'warehouse'='hdfs://hadoop01:4007/data-lake/paimon/catlog/mycatlog'
);
  • 1
  • 2
  • 3
  • 4
2、切换paimon的catalog
use catalog paimon_catalog;
  • 1
3、建paimon类型下的表
CREATE TABLE word_count_detail (
	word STRING PRIMARY KEY NOT ENFORCED,
	cnt BIGINT
);
  • 1
  • 2
  • 3
  • 4
4、从Kafka里面写数据到paimon表

注意:实时写数据到paimon表一定要开启checkpoint时间,如果不设置就不会做checkpoint写到paimon表

设置10

 SET 'execution.checkpointing.interval' = '10 s';
  • 1

写数据

insert into word_count_detail select association_no as word,1 as cnt from default_catalog.default_database.dwd_crisps_biz_interview_rt;

  • 1
  • 2
5、批量查询写入的数据量
-- 设置批量查询
SET 'sql-client.execution.result-mode' = 'tableau';
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';


SELECT count(*) s  FROM word_count_detail;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

参数解释

execution.runtime-mode:可视化结果模式

表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
	SET 'sql-client.execution.result-mode' = 'table';
	

变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流:
	SET 'sql-client.execution.result-mode' = 'changelog';
	
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
	SET 'sql-client.execution.result-mode' = 'tableau';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

三、Paimon使用hive的catlog

1、创建关联hive的catalog
-- thrift://192.168.4.82:7004在hive/conf/hive-site.xml中查找
-- 7000 7001 7004

CREATE CATALOG paimon_hive_catalog WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://hadoop01:7004',
    'warehouse' = 'hdfs://hadoop01:4007/usr/hive/warehouse'
);

USE CATALOG paimon_hive_catalog;

select count(*) s from  paimon_hive_catalog.crisps_dwd.v2_dwd_trd_order_goods;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

四、flink连接hive

1、建立hive的catalog
SET 'sql-client.execution.result-mode' = 'tableau';
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'crisps_dwd',
    'hive-conf-dir' = '/usr/local/service/hive/conf'
); 
select order_goods_id,cus_order_no  from  hive_catalog.crisps_dwd.v2_dwd_trd_order_goods limit 11;
select count(*) s from  crisps_dwd.v2_dwd_trd_order_goods;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/946812
推荐阅读
相关标签
  

闽ICP备14008679号