赞
踩
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
-- 切测试库 use test_db; -- 创建测试表flinktest CREATE TABLE flinktest ( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(siteid, citycode, username) DISTRIBUTED BY HASH(siteid) BUCKETS 10 PROPERTIES("replication_num" = "1"); -- 插入样例数据 insert into flinktest values (1,1,'jim',2), (2,1,'grace',2), (3,2,'tom',2), (4,3,'bush',3), (5,3,'helen',3); -- 查看表数据情况 select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 1 | 1 | jim | 2 | | 5 | 3 | helen | 3 | | 4 | 3 | bush | 3 | | 3 | 2 | tom | 2 | | 2 | 1 | grace | 2 | +--------+----------+----------+------+
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
代码示例:
package com.zenitera.bigdata.doris; import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; public class Flink_stream_read_doris { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); Properties props = new Properties(); props.setProperty("fenodes", "hdt-dmcp-ops01:8130"); props.setProperty("username", "root"); props.setProperty("password", "123456"); props.setProperty("table.identifier", "test_db.flinktest"); env .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema())) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } } /* 代码控制台输出: [4, 3, bush, 3] [2, 1, grace, 2] [1, 1, jim, 2] [5, 3, helen, 3] [3, 2, tom, 2] */
Flink 读写 Doris 数据主要有两种方式
代码示例:
package com.zenitera.bigdata.doris; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisSink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; /** * 使用 Flink 将 JSON 数据 写到Doris数据库 */ public class Flink_stream_write_doris_json { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("strip_outer_array", "true"); env .fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}") .addSink(DorisSink.sink( new DorisExecutionOptions.Builder() .setBatchIntervalMs(2000L) .setEnableDelete(false) .setMaxRetries(3) .setStreamLoadProp(pro) .build(), new DorisOptions.Builder() .setFenodes("hdt-dmcp-ops01:8130") .setUsername("root") .setPassword("123456") .setTableIdentifier("test_db.flinktest") .build()) ); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } } /* 代码执行前: 5 rows select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 1 | 1 | jim | 2 | | 5 | 3 | helen | 3 | | 4 | 3 | bush | 3 | | 3 | 2 | tom | 2 | | 2 | 1 | grace | 2 | +--------+----------+----------+------+ 代码执行后: 6 rows select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 2 | 1 | grace | 2 | | 3 | 2 | tom | 2 | | 5 | 3 | helen | 3 | | 1 | 1 | jim | 2 | | 10 | 1001 | ww | 100 | | 4 | 3 | bush | 3 | +--------+----------+----------+------+ */
代码示例:
package com.zenitera.bigdata.doris; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisSink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.*; public class Flink_stream_write_doris_rowdata { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()}; String[] fields = {"siteid", "citycode", "username", "pv"}; env .fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}") .map(json -> { JSONObject obj = JSON.parseObject(json); GenericRowData rowData = new GenericRowData(4); rowData.setField(0, obj.getIntValue("siteid")); rowData.setField(1, obj.getShortValue("citycode")); rowData.setField(2, StringData.fromString(obj.getString("username"))); rowData.setField(3, obj.getLongValue("pv")); return rowData; }) .addSink(DorisSink.sink( fields, types, new DorisExecutionOptions.Builder() .setBatchIntervalMs(2000L) .setEnableDelete(false) .setMaxRetries(3) .build(), new DorisOptions.Builder() .setFenodes("hdt-dmcp-ops01:8130") .setUsername("root") .setPassword("123456") .setTableIdentifier("test_db.flinktest") .build()) ); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } } /* 代码执行前: 6 rows select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 2 | 1 | grace | 2 | | 3 | 2 | tom | 2 | | 5 | 3 | helen | 3 | | 1 | 1 | jim | 2 | | 10 | 1001 | ww | 100 | | 4 | 3 | bush | 3 | +--------+----------+----------+------+ 代码执行后: 7 rows select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 1 | 1 | jim | 2 | | 2 | 1 | grace | 2 | | 3 | 2 | tom | 2 | | 5 | 3 | helen | 3 | | 10 | 1001 | ww | 100 | | 100 | 1002 | wang | 100 | | 4 | 3 | bush | 3 | +--------+----------+----------+------+ */
Doris测试表:
use test_db; truncate table flinktest; insert into flinktest values (1,1,'aaa',1), (2,2,'bbb',2), (3,3,'ccc',3); select * from flinktest; +--------+----------+----------+------+ | siteid | citycode | username | pv | +--------+----------+----------+------+ | 2 | 2 | bbb | 2 | | 1 | 1 | aaa | 1 | | 3 | 3 | ccc | 3 | +--------+----------+----------+------+ 3 rows in set (0.01 sec)
Flink-SQL代码示例:
package com.zenitera.bigdata.doris; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Flink_SQL_doris { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table flink_0518(" + " siteid int, " + " citycode int, " + " username string, " + " pv bigint " + ")with(" + " 'connector' = 'doris', " + " 'fenodes' = 'hdt-dmcp-ops01:8130', " + " 'table.identifier' = 'test_db.flinktest', " + " 'username' = 'root', " + " 'password' = '123456' " + ")"); tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) "); } @Data @NoArgsConstructor @AllArgsConstructor public static class Flink_0518 { private Integer siteid; private Integer citycode; private String username; private Long pv; } }
执行代码,执行完成后查看Doris对应表数据进行验证:
select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 3 | 3 | ccc | 3 |
| 2 | 2 | bbb | 2 |
| 1 | 1 | aaa | 1 |
| 4 | 4 | wangting | 4 |
+--------+----------+----------+------+
4 rows in set (0.01 sec)
package com.zenitera.bigdata.doris; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Flink_SQL_doris_read { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql("create table flink_0520(" + " siteid int, " + " citycode SMALLINT, " + " username string, " + " pv bigint " + ")with(" + " 'connector' = 'doris', " + " 'fenodes' = 'hdt-dmcp-ops01:8130', " + " 'table.identifier' = 'test_db.flinktest', " + " 'username' = 'root', " + " 'password' = '123456' " + ")"); tEnv.sqlQuery("select * from flink_0520").execute().print(); } } /* 控制台输出信息: +----+-------------+----------+---------------+---------+ | op | siteid | citycode | username | pv | +----+-------------+----------+---------------+---------+ | +I | 1 | 1 | aaa | 1 | | +I | 3 | 3 | ccc | 3 | | +I | 2 | 2 | bbb | 2 | | +I | 4 | 4 | wangting | 4 | +----+-------------+----------+---------------+---------+ 4 rows in set */
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。