当前位置:   article > 正文

Zookeeper kafka集群 密码认证_kafka账号密码认证

kafka账号密码认证

环境准备:

centos7*3

数据库(提供给可视化)

apache-zookeeper-3.5.8-bin.tar.gz

kafka_2.13-3.1.0.tar.gz

efak-web-3.0.1-bin.tar.gz

jdk_1.8.131

关闭selinux,关闭防火墙或开放对应端口 云服务器需要在云平台安全组开放对应端口

链接:https://pan.baidu.com/s/10gpbmQkSfr1zqhNGyDBT3g

提取码:prbw

链接直接点,如果打不开,就复制到浏览器再打开

搭建zookeeper集群:

上传并解压zookeeper的tar包 然后到conf目录下 找到zoo_sample.cfg改为zoo.cfg 然后打开编辑 编辑如下:

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/data/zookeeper/zookeeper_data #这里位置可以自定义
  5. clientPort=2181
  6. server.1=服务器1的IP:2888:3888
  7. server.2=服务器2的IP:2888:3888
  8. server.3=服务器3的IP:2888:3888
  9. #有几台服务器就写几台服务器的IP

因为要做集群为了区分各服务器,需要生成Myid文件作为服务器的唯一标识

cd 到配置文件里定义dataDir位置的目录里

  1. root@localhost:~# cd /data/zookeeper/zookeeper_data
  2. root@localhost:~# echo 1 >> myid
  3. root@localhost:~# cat myid
  4. 1

把1这个数字写入myid文件,只要这一个数字就足够了。

注意其他两台机器,myid为2 为3,这个数字是和配置文件中配置的server.1或者server.2对应的。server.1的myid就是1,server.2的myid就是2,以此类推一定不要搞错。

以上操作需要在三台机器上都要进行,唯一需要注意为Myid文件 不要重复

以上操作完以后,就可以启动zookeeper

  1. root@localhost:~# cd /data/zookeeper/bin
  2. root@localhost:~# ./zkServer.sh start # 启动zookeeper
  3. root@localhost:~# ./zkServer.sh status # 查看zookeeper状态
  4. # 查看状态看到 Mode: follower 或者 Mode: lead
  5. # 3台服务器都要进行 上述操作 1台状态为lead 2台状态为follower 就算成功启动zookeeper

搭建kafka集群

上传并解压kafka的tar包 然后到conf目录下 找到server.properties改为server.properties.bak 然后打开server.properties编辑 编辑如下:

  1. #服务器id,这个是每个机器的唯一id。每个机器都是独一无二的,习惯性就按顺序来。
  2. broker.id=1
  3. #监听端口,这里也可以用hostname代替,只要host里面配置好了都可以
  4. listeners = PLAINTEXT://服务器1的IP:9092
  5. #存放当前数据的目录,这里所有机器尽量都一样,这样操作起来比较容易
  6. log.dirs=/data/kafka/kafka-logs
  7. #连接Zookeeper
  8. zookeeper.connect=服务器1:2181,服务器2:2181,服务器3:2181
  9. #以上操作3台几服务器都需要操作 唯一区别为 服务器的Id 以及 监听端口的IP
  10. #例如服务器2
  11. #broker.id=2
  12. #listeners = PLAINTEXT://服务器2的IP:9092
  13. #其余不变

创建log.dirs的目录 各位可以自定义修改 如果目录文件夹没有 需要创建出来 三台机器都需要操作

下面可以启动kafka

  1. root@localhost:~# cd /data/kafka
  2. root@localhost:~# nohup ./bin/kafka-server-start.sh config/server.properties &

正常启动以后,在打印的日志里最后有这样一句话INFO [KafkaServer id=1] started 这样就表示启动成功

三台服务器,均需要启动

下面可以测试一下 查看一下topic列表

  1. root@localhost:~# cd /data/kafka
  2. root@localhost:~# ./bin/kafka-topics.sh --list --zookeeper 服务器IP:2181
  3. # 测试没问题就可以创建topic
  4. root@localhost:~# ./bin/kafka-topics.sh --create --zookeeper 服务器IP:2181 --replication-factor 1 --partitions 1 --topic my-topic
  5. Created topic "my-topic".
  6. # 浏览所有topic命令:
  7. root@localhost:~# ./bin/kafka-topics.sh --list --zookeeper 服务器IP:2181
  8. __consumer_offsets
  9. my-topic
  10. test_topic
  11. testtopic
  12. # 浏览指定topic命令:
  13. root@localhost:~# ./bin/kafka-topics.sh --describe --zookeeper 服务器IP:2181 --topic my-topic
  14. # 生产Console消息:
  15. root@localhost:~# ./bin/kafka-console-producer.sh --broker-list 服务器IP:9092 --topic my-topic
  16. >hello world
  17. >123
  18. >test
  19. # 消费Console消息:
  20. root@localhost:~# ./bin/kafka-console-consumer.sh --bootstrap-server 服务器IP:9092 --topic my-topic --from-beginning
  21. >hello world
  22. >123
  23. >test

Zookeeper验证:

基本上一套下来,就可以使用了。当然我们可以去用zkCli.sh去看下里面到底有没有我们的数据。我们刚刚创建的my-topic这个topic就在里面存着。

  1. root@localhost:~# cd /data/zookeeper/bin
  2. root@localhost:~# ./zkCli.sh
  3. [zk: localhost:2181(CONNECTED) 1] ls /
  4. [zookeeper,cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
  5. [zk: localhost:2181(CONNECTED) 2] ls /brokers/topicsc
  6. [__consumer_offsets,my-topic,test_topic,testtopic] #my-topic在这里

到此一个可用的集群已经搭建完毕,但是没有密码不安全

kafka添加SASL密码认证

首先将搭建好的kafka集群停掉,zookeeper不需要停

  1. root@localhost:~# cd /data/kafka
  2. root@localhost:~# ./kafka-server-stop.sh
  3. root@localhost:~# ps -ef | grep kafka

三台机器都需要上述操作 保证kafka集群完全停止

编辑kafka config目录下的server.properties修改为如下配置

  1. broker.id=1
  2. #默认监控端口,设置9092使用SASL_PLAINTEXT协议
  3. listeners = SASL_PLAINTEXT://服务器1的IP:9092   
  4. #advertised.listeners控制生产者与消费者接入的端口,如果不设置默认都用listeners,设置9092使用SASL_PLAINTEXT协议
  5. advertised.listeners=SASL_PLAINTEXT://服务器1的IP:9092
  6. #Broker内部联络使用的security协议
  7. security.inter.broker.protocol=SASL_PLAINTEXT
  8. #Broker内部联络使用的sasl协议,这里也可以配置多个,比如SCRAM-SHA-512,SCRAM-SHA-256并列使用
  9. sasl.enabled.mechanisms=PLAIN
  10. #Broker允许使用的sasl协议,这里也可以配多个PLAIN,SCRAM-SHA-512,SCRAM-SHA-256
  11. sasl.mechanism.inter.broker.protocol=PLAIN
  12. #设置zookeeper是否使用ACL
  13. zookeeper.set.acl=true
  14. #设置ACL类(低于 2.4.0 版本推荐使用 SimpleAclAuthorizer)
  15. #authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  16. #设置ACL类(高于 2.4.0 版本推荐使用 AclAuthorizer)
  17. authorizer.class.name=kafka.security.authorizer.AclAuthorizer
  18. allow.everyone.if.no.acl.found=true
  19. log.dirs=/data/kafka/kafka-logs
  20. zookeeper.connect=服务器1:2181,服务器2:2181,服务器3:2181
  21. log.retention.hours=720 #数据保存时间 这里设置的是720小时 也就是30天 根据自己情况来设置
  22. #以下为辅助设置,优化配置 可配可不配
  23. #每条最大消息设置为3MB,超过此size会报错,可以自由调整
  24. replica.fetch.max.bytes=3145728
  25. message.max.bytes=3145728
  26. #默认的备份数量,可以自由调整
  27. default.replication.factor=2
  28. #默认的partion数量,可以自由调整
  29. num.partitions=3
  30. #是否允许彻底删除topic,低版本这里设置为false则是隐藏topic
  31. delete.topic.enable=true
  32. #如果topic不存在,是否允许创建一个新的。这里特别推荐设置为false,否则可能会因为手滑多出很多奇奇怪怪的topic出来
  33. auto.create.topics.enable=false

以上操作3台服务器都需要更改 为了需要区别的就是开头的Id 以及 监听服务器端口的IP需要改成本机IP

创建Kafka认证文件

还是在kafka/config目录下,服务端密码和客户端连接密码。

所以直接vim kafka_server_jaas.conf创建一个认证文件

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="admin-2019"
  5. user_admin="admin-2019"
  6. user_producer="prod-2019"
  7. user_consumer="cons-2019";
  8. };

再创建一个用于命令行的认证文件kafka_client_jaas.conf

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="admin-2019";
  5. };

随后进到kafka的bin目录下 修改kafka-server-start.sh 启动脚本,指定密码认证连接路径,在export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G 后边添加

还需添加一行export JMX_PORT="9999" 后面用可视化需要用到 最终修改如下

  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  2. export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -Djava.security.auth.login.config=/data/kafka/config/kafka_server_jaas.conf"
  3. export JMX_PORT="9999"
  4. fi

修改生产者和消费者的启动脚本kafka-console-producer.shkafka-console-consumer.sh 的密码路径,在export KAFKA_HEAP_OPTS="-Xmx512M后边添加

-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"

以上操作都需在3台服务器上操作

随后可以启动kafka服务

  1. root@localhost:~# cd /data/kafka/bin
  2. root@localhost:~# ./kafka-server-start.sh -daemon config/server.properties
  3. # 验证,查看所有topic列表
  4. root@localhost:~# ./kafka-topics.sh --list --zookeeper 服务器IP:2181 security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

搭建kafka web可视化

上传并解压efak-web-3.0.1-bin.tar.gz的tar包 然后到conf目录下 备份system-config.properties 然后再修改 编辑如下:

  1. ######################################
  2. # multi zookeeper & kafka cluster list
  3. # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
  4. ######################################
  5. efak.zk.cluster.alias=cluster1
  6. cluster1.zk.list=服务器IP:2181,服务器IP:2181,服务器IP:2181 #这里填入的是zookeeper连接Ip以及端口
  7. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:218
  8. ######################################
  9. # kafka sasl authenticate
  10. ######################################
  11. cluster1.efak.sasl.enable=true
  12. cluster1.efak.sasl.protocol=SASL_PLAINTEXT
  13. cluster1.efak.sasl.mechanism=PLAIN
  14. cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-2019";
  15. cluster1.efak.sasl.client.id=
  16. cluster1.efak.blacklist.topics=
  17. cluster1.efak.sasl.cgroup.enable=false
  18. cluster1.efak.sasl.cgroup.topics=
  19. #cluster2.efak.sasl.enable=false
  20. #cluster2.efak.sasl.protocol=SASL_PLAINTEXT
  21. #cluster2.efak.sasl.mechanism=PLAIN
  22. #cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
  23. #cluster2.efak.sasl.client.id=
  24. #cluster2.efak.blacklist.topics=
  25. #cluster2.efak.sasl.cgroup.enable=false
  26. #cluster2.efak.sasl.cgroup.topics=
  27. # 这里设置的是sasl密码连接信息的配置 cluster2和cluster3都可以注释掉 因为我们只有1个集群 只是用了cluster1
  28. ######################################
  29. # kafka mysql jdbc driver address
  30. ######################################
  31. efak.driver=com.mysql.cj.jdbc.Driver
  32. efak.url=jdbc:mysql://数据库IP:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  33. efak.username=数据库账号
  34. efak.password=数据库密码
  35. # 由于web可视化需要存放一些简单数据 在本地安装一个数据库 创建库名为Ke即可

改完配置就可以启动可视化

  1. root@localhost:~# cd /data/efak-web-3.0.1/bin
  2. root@localhost:~# ./ke.sh start
  3. # 最后按照屏幕提示访问 ip:8048 就可以了

最后贴上参考文档:

Zookeeper + Centos7 详细安装教程_Smallc0de的博客-CSDN博客

kafka+zookeeper集群模式配置SSL认证 – 俗话曰的博客 (suhuayue.com)

Kafka安装配置(SASL/SCRAM动态认证)-CSDN博客

Kafka 如何给集群配置Scram账户认证_Smallc0de的博客-CSDN博客

kafka max.request.size过小的问题_Pianist Of Keyboard的博客-CSDN博客

kafka max.request.size_Pianist Of Keyboard的博客-CSDN博客

kafkaEagle工具安装_kafka-engle_邢一的博客-CSDN博客

zookeeper和kafka的SASL认证以及生产实践-阿里云开发者社区 (aliyun.com)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/983175
推荐阅读
相关标签
  

闽ICP备14008679号