赞
踩
目录
Apache Kafka是一个开源分布式事件流平台,也是一种高吞吐量的分布式发布订阅消息系统,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka需要jdk1.8+环境。
tar -zxf kafka_2.12-3.4.0.tgz -C ~/opt/
Kafka可以使用Zookeeper或KPaft两种模式启动。Kafka3.x开始提供KRaft(Kafka Raft,依赖Java 8+ )模式,最新的3.6版本中,Kafka依然兼容zookeeper。
1.启动Zookeeper,可以使用Kafka自带的zookeeper或者使用已部署好的zookeeper环境都可以,如果使用已部署好的zookeeper,需要修改kafka的配置文件(config/server.properties),指定zookeeper地址(zookeeper.connect=localhost:2181,localhost:2182)。这里直接使用Kafka自带zookeeper启动。
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.启动Kafka服务
./bin/kafka-server-start.sh -daemon config/server.properties
1.生成集群ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
2.格式化存储目录,可以修改Kafka 运行日志(数据)存放的路径(config/kraft/server.properties),默认存放在/tmp下。
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
3. 启动Kafka服务
./bin/kafka-server-start.sh -daemon config/kraft/server.properties
参考2.3一节。
./bin/kafka-server-stop.sh
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1
–partitions:分区数量,
–replication-factor:副本数量,不能大于broker数量
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 3
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test [--offset latest]
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group group_1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group group_1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group group_1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group group_2
参考3.4节,第3、第4点。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_1
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group_1 --delete
在搭建kafka集群前,需要先搭建、启动好zookeeper集群,具体操作可以参考:搭建环境03:安装zookeeper-CSDN博客
tar -zxf kafka_2.12-3.4.0.tgz -C ~/opt/
修改kafka 服务的配置文件(config/server.properties),修改broker.id和指定zookeeper集群地址:
#broker 的全局唯一编号,不能重复,只能是数字
broker.id=131#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker 对外暴露的地址
advertised.listeners=PLAINTEXT://192.168.179.131:9092#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/home/weisx/opt/kafka_zookeeper/kafka-logs
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka_zookeeper,方便管理)
zookeeper.connect=192.168.179.131:2181,192.168.179.132:2181,192.168.179.133:2181/kafka_zookeeper
将上一步修改后的kafka同步到其他服务器上,然后修改broker.id和advertised.listeners。
scp -r kafka_zookeeper 192.168.179.132:~/opt/
scp -r kafka_zookeeper 192.168.179.133:~/opt/
修改192.168.179.132服务器上的broker.id=132,advertised.listeners=PLAINTEXT://192.168.179.132:9092
修改192.168.179.133服务器上的broker.id=133,advertised.listeners=PLAINTEXT://192.168.179.133:9092
依次在集群服务器上启动kafka
./bin/kafka-server-start.sh -daemon config/server.properties
新建topic,验证集群:
依次在集群服务器上关闭kafka
./bin/kafka-server-stop.sh
KRaft模式不依赖外部框架(zookeeper),只需要数个(奇数)Kafka节点就可以直接构成集群,集群中的Kafka节点既可以是Controller节点也可以是Broker节点,还可以是混合节点(同时担任Broker和Controller角色)。
tar -zxf kafka_2.12-3.4.0.tgz -C ~/opt/
修改kafka 服务的配置文件(config/kraft/server.properties),修改node.id和配置节点角色等信息:
#混合节点
process.roles=broker, controller#节点 ID,集群中每个节点id不能重复
node.id=131#controller 服务协议别名
controller.listener.names=CONTROLLER#broker 服务协议别名
inter.broker.listener.name=PLAINTEXT#Controller 列表,其配置格式为id1@host1:port1,id2@host2:port2,id3@host3:port3...
controller.quorum.voters=131@192.168.179.131:9093,132@192.168.179.132:9093,133@192.168.179.133:9093#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker 对外暴露的地址
advertised.listeners=PLAINTEXT://192.168.179.131:9092#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/home/weisx/opt/kafka_kraft/kafka-logs
将上一步修改后的kafka同步到其他服务器上,然后修改node.id和advertised.listeners。
scp -r kafka_kraft 192.168.179.132:~/opt/
scp -r kafka_kraft 192.168.179.133:~/opt/
修改192.168.179.132服务器上的node.id=132,
advertised.listeners=PLAINTEXT://192.168.179.132:9092
修改192.168.179.133服务器上的node.id=133,
advertised.listeners=PLAINTEXT://192.168.179.133:9092
生成集群ID,使用该集群ID在各个服务器中格式化存储目录
#131服务器操作
bin/kafka-storage.sh random-uuid
bin/kafka-storage.sh format -t 3Ww7omfAQMm3bCb1HRI5_Q -c config/kraft/server.properties
#132服务器操作
bin/kafka-storage.sh format -t 3Ww7omfAQMm3bCb1HRI5_Q -c config/kraft/server.properties
#133服务器操作
bin/kafka-storage.sh format -t 3Ww7omfAQMm3bCb1HRI5_Q -c config/kraft/server.properties
依次在集群服务器上启动kafka
./bin/kafka-server-start.sh -daemon config/kraft/server.properties
新建topic,验证集群:
依次在集群服务器上关闭kafka
./bin/kafka-server-stop.sh
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。