当前位置:   article > 正文

Apache Doris实时数据分析保姆级使用教程

apache diros

点击上方蓝色字体,选择“设为星标”

回复"面试"获取更多惊喜

7914bc679a1384d96acd6ab0ffd7b81b.png

《大数据面试提升私教训练营第6期低调报名开启~》

Doris安装

集群部署

官网下载地址:

https://doris.apache.org/zh-CN/downloads/downloads.html

d02e8f8f81ebb7eebf664e0dbafa20fe.png

选择二进制下载,源码下载需要自己编译。解压doris文件:

tar -zxvf apache-doris-1.0.0-incubating-bin.tar.gz -C /opt/module/

集群规划

4c7291ca727df7181437ee395bfe7b47.png
FE部署

修改配置文件vim conf/fe.conf

meta_dir = /opt/module/doris-meta

ee5d73955b95af63c1a589e90d2f1cbe.png

集群中分发存储路径和FE配置文件,启动FE。

  1. # 创建meta文件夹存储路径
  2. mkdir /opt/module/doris-meta
  3. # 三台机器都要执行
  4. sh bin/start_fe.sh --daemon
54234c6b0da45cc1bfda0705575ea670.png
BE部署

修改配置文件vim conf/be.conf

  1. # storage_root_path配置存储目录,可以用;来指定多个目录,每个目录后可以跟逗号,指定大小默认GB
  2. storage_root_path = /opt/module/doris_storage1,10;/opt/module/doris_storage2
4138ba7b23a7d80b91e610977aa5fb22.png

集群中分发存储路径和BE配置文件,启动BE

  1. # 创建storage_root_path存储路径
  2. mkdir /opt/module/doris_storage1
  3. mkdir /opt/module/doris_storage2
  4. # 三台机器都要执行
  5. sh bin/start_be.sh --daemon
177e38c5c8ca0ba64ae9b5ac9474f8e3.png
访问Doris PE节点

doris可以使用mysql客户端访问,如果未安装,则需要安装mysql-client。

  1. # 第一次访问不需要密码,可以自行设置密码
  2. mysql -hdoris1 -P 9030 -uroot
  3. # 修改密码
  4. set password for 'root' = password('root');
188b672bb887cd9e45d9cc477600a227.png
添加BE节点

通过mysql客户端登入后,添加be节点,port为be上的heartbeat_service_port端口,默认9050

  1. mysql> ALTER SYSTEM ADD BACKEND "hadoop102:9050";
  2. mysql> ALTER SYSTEM ADD BACKEND "hadoop103:9050";
  3. mysql> ALTER SYSTEM ADD BACKEND "hadoop104:9050";

通过mysql客户端,检测be节点状态,alive必须为true

mysql> SHOW PROC '/backends';
c171a87c5604842bf4f11743707a3c8a.png
BROKER部署

可选,非必须部署,启动BROKER

  1. # 三台集群都要启动
  2. sh bin/start_broker.sh --daemon
ab77b2fc0a31328ef116989fc24decdb.png

使用mysql客户端访问pe,添加broker节点

mysql> ALTER SYSTEM ADD BROKER broker_name "hadoop102:8000","hadoop103:8000","hadoop104:8000";查看broker状态

mysql> SHOW PROC "/brokers";

扩容缩容

Doris可以很方便的扩容和缩容FE、BE、Broker实例。通过页面访问进行监控,访问8030,账户为root,密码默认为空不用填写,除非上述设置了密码使用密码登录http://hadoop102:8030

9035f68dca412653c6d9b646291fee6f.png

FE 扩容和缩容

FE 节点的扩容和缩容过程,不影响当前系统运行。

使用mysql登录客户端后,可以使用sql命令查看FE状态,目前就一台FE。

mysql> SHOW PROC '/frontends';
fc495fe804b6aeb5f9662ba519989212.png 8b8083edf018990888fb85d663ebe00d.png

增加FE节点,FE分为Leader,Follower和Observer三种角色。默认一个集群只能有一个Leader,可以有多个Follower和Observer.其中Leader和Follower组成一个Paxos选择组,如果Leader宕机,则剩下的Follower会成为Leader,保证HA。Observer是负责同步Leader数据的不参与选举。如果只部署一个FE,则FE默认就是Leader

第一个启动的FE自动成为Leader。在此基础上,可以添加若干Follower和Observer。添加Follower或Observer。使用mysql-client连接到已启动的FE,并执行:在doris2部署Follower,doris3上部署Observer

  1. # 执行其中的一个即可,注解如下
  2. # follower/observer_host IP节点位置
  3. # edit_log_port fe.conf配置文件中可以查询到
  4. # ALTER SYSTEM ADD FOLLOWER "follower_host:edit_log_port";
  5. ALTER SYSTEM ADD FOLLOWER "hadoop103:9010";
  6. # ALTER SYSTEM ADD OBSERVER "observer_host:edit_log_port";
  7. ALTER SYSTEM ADD OBSERVER "hadoop104:9010";
a5341a8ba846062579081a4c807c96d5.png

需要重启配置节点的FE,并添加如下参数启动

  1. # --helper参数指定leader地址和端口号
  2. sh bin/start_fe.sh --helper hadoop102:9010 --daemon
  3. sh bin/start_fe.sh --helper hadoop102:9010 --daemon

全部启动完毕后,再通过mysql客户端,查看FE状况

mysql> SHOW PROC '/frontends';
e8d49b33fd3f6b0325f328a66ef5ab6d.png

使用以下命令删除对应的FE节点ALTER SYSTEM DROP FOLLOWER[OBSERVER] "fe_host:edit_log_port";删除Follower FE时,确保最终剩余的Follower(包括 Leader)节点为奇数

  1. ALTER SYSTEM DROP FOLLOWER "hadoop103:9010";
  2. ALTER SYSTEM DROP OBSERVER "hadoop104:9010";

BE 扩容和缩容

增加BE节点,就像上面安装一样在mysql客户端,使用ALTER SYSTEM ADD BACKEND

删除BE节点,使用ALTER SYSTEM DROP BACKEND "be_host:be_heartbeat_service_port";

具体文档请查看官网。

Doris操作手册

创建用户

  1. # 连接doris
  2. mysql -hhadoop102 -P 9030 -uroot
  3. # 创建用户
  4. mysql> create user 'test' identified by 'test';
  5. # 退出使用test即可登录
  6. mysql> exit;
  7. mysql -hhadoop102 -P 9030 -utest -ptest

表操作

  1. # 创建数据库
  2. mysql> create database test_db;
  3. # 赋予test用户test库权限
  4. mysql> grant all  on test_dn to test;
  5. # 使用数据库
  6. mysql> use test_db;
分区表

分区表分为单分区和复合分区

单分区表,建立一张student表。分桶列为id,桶数为10,副本数为1

  1. CREATE TABLE student
  2. (
  3. id INT,
  4. name VARCHAR(50),
  5. age INT,
  6. count  BIGINT SUM DEFAULT '0'
  7. )
  8. AGGREGATE KEY (id,name,age)
  9. DISTRIBUTED BY HASH(id) buckets 10
  10. PROPERTIES("replication_num" = "1");

复合分区表,第一级称为Partition,即分区。用户指定某一维度列做为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。第二级称为Distribution,即分桶。用户可以指定一个或多个维度列以及桶数进行HASH分布

  1. #创建student2表,使用dt字段作为分区列,并且创建3个分区发,分别是:
  2. #P202007 范围值是是小于2020-08-01的数据
  3. #P202008 范围值是2020-08-012020-08-31的数据
  4. #P202009 范围值是2020-09-012020-09-30的数据
  5. CREATE TABLE student2
  6. (
  7. dt DATE,
  8. id INT,
  9. name VARCHAR(50),
  10. age INT,
  11. count  BIGINT SUM DEFAULT '0'
  12. )
  13. AGGREGATE KEY (dt,id,name,age)
  14. PARTITION BY RANGE(dt)
  15. (
  16.   PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
  17.   PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
  18.   PARTITION p202009 VALUES LESS THAN ('2020-10-01')
  19. )
  20. DISTRIBUTED BY HASH(id) buckets 10
  21. PROPERTIES("replication_num" = "1");

复合分区表,第一级称为Partition,即分区。用户指定某一维度列做为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。第二级称为Distribution,即分桶。用户可以指定一个或多个维度列以及桶数进行HASH分布.

  1. #创建student2表,使用dt字段作为分区列,并且创建3个分区发,分别是:
  2. #P202007 范围值是是小于2020-08-01的数据
  3. #P202008 范围值是2020-08-012020-08-31的数据
  4. #P202009 范围值是2020-09-012020-09-30的数据
  5. CREATE TABLE student2
  6. (
  7. dt DATE,
  8. id INT,
  9. name VARCHAR(50),
  10. age INT,
  11. count  BIGINT SUM DEFAULT '0'
  12. )
  13. AGGREGATE KEY (dt,id,name,age)
  14. PARTITION BY RANGE(dt)
  15. (
  16.   PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
  17.   PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
  18.   PARTITION p202009 VALUES LESS THAN ('2020-10-01')
  19. )
  20. DISTRIBUTED BY HASH(id) buckets 10
  21. PROPERTIES("replication_num" = "1");

数据模型

AGGREGATE KEY

AGGREGATE KEY相同时,新旧记录将会进行聚合操作

AGGREGATE KEY模型可以提前聚合数据,适合报表和多维度业务

UNIQUE KEY

UNIQUE KEY相同时,新记录覆盖旧记录。目前UNIQUE KEY和AGGREGATE KEY的REPLACE聚合方法一致。适用于有更新需求的业务。

DUPLICATE KEY

只指定排序列,相同的行并不会合并。适用于数据无需提前聚合的分析业务

数据导入

为适配不同的数据导入需求,Doris系统提供5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的方式(异步、同步)

Broker load

Broker load是一个导入的异步方式,支持的数据源取决于Broker进程支持的数据源

基本原理:用户在提交导入任务后,FE(Doris系统的元数据和调度节点)会生成相应的PLAN(导入执行计划,BE会执行导入计划将输入导入Doris中)并根据BE(Doris系统的计算和存储节点)的个数和文件的大小,将Plan分给多个BE执行,每个BE导入一部分数据。BE在执行过程中会从Broker拉取数据,在对数据转换之后导入系统。所有BE均完成导入,由FE最终决定是否导入是否成功。

测试导入HDFS数据到Doris

编写测试文件,上传到HDFS.

9fa2dca7e945eeeddc7b0cab02fc5b6d.png

创建doris表,测试导入

  1. CREATE TABLE student
  2. (
  3. id INT,
  4. name VARCHAR(50),
  5. age INT,
  6. count  BIGINT SUM DEFAULT '0'
  7. )
  8. AGGREGATE KEY (id,name,age)
  9. DISTRIBUTED BY HASH(id) buckets 10
  10. PROPERTIES("replication_num" = "1");

编写diros导入sql,更多参数请看官网

  1. LOAD LABEL test_db.label1
  2. (
  3.     DATA INFILE("hdfs://bigdata:8020/student")
  4.     INTO TABLE student
  5.     COLUMNS TERMINATED BY ","
  6.     (id,name,age,count)
  7.     SET
  8.     (
  9.         id=id,
  10.         name=name,
  11.         age=age,
  12.         count=count
  13.     )
  14. )
  15. WITH BROKER broker_name
  16. (
  17.     "username"="root"
  18. )
  19. PROPERTIES
  20. (
  21.     "timeout" = "3600"
  22. );

查看doris导入状态

  1. use test_db;
  2. show load;
7130520dae0a8e2d48915335bc85ed65.png

查看数据导入是否成功

fc89ea1f68d57ddc954263d3b15ef04d.png
Routine Load

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能

从Kafka导入数据到Doris

创建kafka主题

kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic test

启动kafka生产者生产数据

  1. kafka-console-producer.sh --broker-list bigdata:9092 --topic test
  2. # 数据格式
  3. {"id":"4","name":"czsqhh","age":"18","count":"50"}

在doris中创建对应表

  1. CREATE TABLE kafka_student
  2. (
  3. id INT,
  4. name VARCHAR(50),
  5. age INT,
  6. count  BIGINT SUM DEFAULT '0'
  7. )
  8. AGGREGATE KEY (id,name,age)
  9. DISTRIBUTED BY HASH(id) buckets 10
  10. PROPERTIES("replication_num" = "1");
3f20fc02919741a68f6081c567a4b2c8.png

创建导入作业,desired_concurrent_number指定并行度

  1. CREATE ROUTINE LOAD test_db.job1 on kafka_student
  2. PROPERTIES
  3. (
  4.     "desired_concurrent_number"="1",
  5.     "strict_mode"="false",
  6.     "format"="json"
  7. )
  8. FROM KAFKA
  9. (
  10.     "kafka_broker_list""bigdata:9092",
  11.     "kafka_topic" = "test",
  12.     "property.group.id" = "test"
  13. );

查看作业状态

SHOW ROUTINE LOAD;
bb6fc251c99d456a751afc5b8c4afba4.png

控制作业

  1. STOP ROUTINE LOAD For jobxxx :停止作业
  2. PAUSE ROUTINE LOAD For jobxxx:暂停作业
  3. RESUME ROUTINE LOAD For jobxxx:重启作业
数据导出

Drois导出数据到HDFS

其他参数详见官网

  1. EXPORT TABLE test_db.student
  2. PARTITION (student)
  3. TO "hdfs://bigdata:8020/doris/student/" 
  4. WITH BROKER broker_name
  5. (
  6.     "username" = "root"
  7. );
ecfdbd41f2c3cc3d21eae595b05128a9.png

Doris代码操作

Spark

引入依赖

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.spark</groupId>
  4.         <artifactId>spark-core_2.12</artifactId>
  5.         <version>3.0.0</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>org.apache.spark</groupId>
  9.         <artifactId>spark-yarn_2.12</artifactId>
  10.         <version>3.0.0</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>org.apache.spark</groupId>
  14.         <artifactId>spark-sql_2.12</artifactId>
  15.         <version>3.0.0</version>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>mysql</groupId>
  19.         <artifactId>mysql-connector-java</artifactId>
  20.         <version>5.1.27</version>
  21.     </dependency>
  22.     <dependency>
  23.         <groupId>org.apache.spark</groupId>
  24.         <artifactId>spark-hive_2.12</artifactId>
  25.         <version>3.0.0</version>
  26.     </dependency>
  27.     <dependency>
  28.         <groupId>org.apache.hive</groupId>
  29.         <artifactId>hive-exec</artifactId>
  30.         <version>1.2.1</version>
  31.     </dependency>
  32.     <dependency>
  33.         <groupId>org.apache.spark</groupId>
  34.         <artifactId>spark-streaming_2.12</artifactId>
  35.         <version>3.0.0</version>
  36.     </dependency>
  37.     <dependency>
  38.         <groupId>org.apache.spark</groupId>
  39.         <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  40.         <version>3.0.0</version>
  41.     </dependency>
  42.     <dependency>
  43.         <groupId>com.fasterxml.jackson.core</groupId>
  44.         <artifactId>jackson-core</artifactId>
  45.         <version>2.10.1</version>
  46.     </dependency>
  47.     <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
  48.     <dependency>
  49.         <groupId>com.alibaba</groupId>
  50.         <artifactId>druid</artifactId>
  51.         <version>1.1.10</version>
  52.     </dependency>
  53. </dependencies>

读取doris数据

  1. object ReadDoris {
  2.   def main(args: Array[String]): Unit = {
  3.     val sparkConf = new SparkConf().setAppName("testReadDoris").setMaster("local[*]")
  4.     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  5.     val df = sparkSession.read.format("jdbc")
  6.       .option("url""jdbc:mysql://bigdata:9030/test_db")
  7.       .option("user""root")
  8.       .option("password""root")
  9.       .option("dbtable""student")
  10.       .load()
  11.     df.show()
  12.     sparkSession.close();
  13.   }
  14. }
b0bfd1c4ffb8df8f4d79ceeb276ad738.png

Flink

引入依赖

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.flink</groupId>
  4.         <artifactId>flink-java</artifactId>
  5.         <version>1.14.3</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>org.apache.flink</groupId>
  9.         <artifactId>flink-streaming-java_2.12</artifactId>
  10.         <version>1.14.3</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>org.apache.flink</groupId>
  14.         <artifactId>flink-clients_2.12</artifactId>
  15.         <version>1.14.3</version>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.projectlombok</groupId>
  19.         <artifactId>lombok</artifactId>
  20.         <version>1.18.16</version>
  21.     </dependency>
  22.     <dependency>
  23.         <groupId>org.apache.flink</groupId>
  24.         <artifactId>flink-connector-kafka_2.12</artifactId>
  25.         <version>1.14.3</version>
  26.     </dependency>
  27.     <dependency>
  28.         <groupId>org.apache.flink</groupId>
  29.         <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
  30.         <version>1.14.3</version>
  31.     </dependency>
  32.     <dependency>
  33.         <groupId>org.apache.bahir</groupId>
  34.         <artifactId>flink-connector-redis_2.12</artifactId>
  35.         <version>1.1-SNAPSHOT</version>
  36.     </dependency>
  37.     <dependency>
  38.         <groupId>org.apache.flink</groupId>
  39.         <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  40.         <version>1.14.3</version>
  41.     </dependency>
  42.     <dependency>
  43.         <groupId>org.apache.flink</groupId>
  44.         <artifactId>flink-table-planner_2.12</artifactId>
  45.         <version>1.14.3</version>
  46.     </dependency>
  47.     <dependency>
  48.         <groupId>org.apache.flink</groupId>
  49.         <artifactId>flink-table-common</artifactId>
  50.         <version>1.14.3</version>
  51.     </dependency>
  52.     <dependency>
  53.         <groupId>org.apache.flink</groupId>
  54.         <artifactId>flink-csv</artifactId>
  55.         <version>1.14.3</version>
  56.     </dependency>
  57.     <dependency>
  58.         <groupId>org.apache.flink</groupId>
  59.         <artifactId>flink-connector-jdbc_2.12</artifactId>
  60.         <version>1.14.3</version>
  61.     </dependency>
  62.     <dependency>
  63.         <groupId>mysql</groupId>
  64.         <artifactId>mysql-connector-java</artifactId>
  65.         <version>8.0.23</version>
  66.     </dependency>
  67.     <dependency>
  68.         <groupId>org.apache.flink</groupId>
  69.         <artifactId>flink-connector-kafka_2.12</artifactId>
  70.         <version>1.14.3</version>
  71.     </dependency>
  72. </dependencies>

读取数据

  1. public static void main(String[] args) {
  2.     EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
  3.     TableEnvironment tEnv = TableEnvironment.create(settings);
  4.     String sourceSql = "CREATE TABLE student (\n" +
  5.             "`id` Integer,\n" +
  6.             "`name` STRING,\n" +
  7.             "`age` Integer\n" +
  8.             ")WITH (\n" +
  9.             "'connector'='jdbc',\n" +
  10.             "'url' = 'jdbc:mysql://bigdata:9030/test_db',\n" +
  11.             "'username'='root',\n" +
  12.             "'password'='root',\n" +
  13.             "'table-name'='student'\n" +
  14.             ")";
  15.     tEnv.executeSql(sourceSql);
  16.     Table table = tEnv.sqlQuery("select * from student");
  17.     table.execute().print();
  18. }
0691418ec9fe671bbb64e8077152bc82.png

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

04d5fdf4f71019890c1ffb96b108c863.png

5ad5b4c8b29aff61dba6a9c032c346f4.png

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/202911
推荐阅读
相关标签
  

闽ICP备14008679号