赞
踩
flink作为一个计算引擎,是缺少存储介质的,那么数据从哪儿来,到哪儿去,就需要连接器了,链接各种类型数据库,各种类型组件进行数据的抽取、计算、存储等,下面来看看flink都有哪些connector,怎么使用的?

看看目前支持的connector:
这是官方给出的:

有些支持数据源,有些不支持数据源,有些支持无边界流式处理,有些不支持,具体看上图。
我们目前市面上用的比较多的数据库,大概是以下几种:
# 支持jdbc
mysql mongodb postgresql oracle db2 sybase sqlserver hive
# 不支持jdbc
hbase es 文件 消息队列(kafka rabbitmq rocketmq)
CREATE TABLE MyUserTable (
-- declare the schema of the table
`user` BIGINT,
`message` STRING
) WITH (
-- declare the external system to connect to
'connector' = 'kafka',
'topic' = 'topic_name',
'scan.startup.mode' = 'earliest-offset', -- 还有可选从最近offset开始消费:latest-offset
'properties.bootstrap.servers' = 'localhost:9092', --kafka broker连接串
'format' = 'json' -- declare a format for this system
)
注意hbase目前只支持1.4和2.2版本

-- register the HBase table 'mytable' in Flink SQL CREATE TABLE hTable ( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT>, family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'mytable', 'zookeeper.quorum' = 'localhost:2181' ); -- use ROW(...) construction function construct column families and write data into the HBase table. -- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6] INSERT INTO hTable SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
jdbc连接需要添加对应的driver到flink lib里
mysql:点这里
postgresql:点这里
oracle:点这里下载ojdbc8.jar
这是常用的,其他的在网上都能搜得到
-- register a MySQL table 'users' in Flink SQL
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users',
'driver' = 'com.jdbc.mysql.Driver',
'username' = 'xxx',
'password' = 'xxx'
);
es只能做sink不能做source
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
可以是服务器本地文件,也可以是hdfs文件,区别就是文件路径描述符的区别:
CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, ... part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'path' = 'hdfs:///path/to/whatever', -- required: path to a directory 'format' = '...', -- required: file system connector requires to specify a format, -- Please refer to Table Formats -- section for more details 'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition -- column value is null/empty string -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly -- reduce the number of file for filesystem sink but may lead data skew, the default value is false. 'sink.shuffle-by-partition.enable' = '...', ... )
另外还有几种特殊的connector:
datagen会按照字段指定的类型,随机生成对应的数据
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)
每一个写入该表的数据,都会标准输出到日志里
CREATE TABLE print_table (
f0 INT,
f1 INT,
f2 STRING,
f3 DOUBLE
) WITH (
'connector' = 'print'
);
这个connector会吞噬一切数据,往这个表里写的数据都会消失,主要用于测试性能。
CREATE TABLE blackhole_table (
f0 INT,
f1 INT,
f2 STRING,
f3 DOUBLE
) WITH (
'connector' = 'blackhole'
);
参考官网链接:
flink connectors
其实每个connector都支持指定类型的format格式方式,下期文章介绍如何指定格式化,可以指定那些格式化。
点个关注呗。

不积跬步无以至千里,不积小流无以成江海。
欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:

喜欢宠物的朋友可以关注:【电巴克宠物Pets】

想知道狗狗怕蚊子吗?扫二维码查看,有惊喜。

一起学习,一起进步。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。