赞
踩
消息队列(Message Queue)
通用的使用场景可以简单地描述为
消息(Message)
消息队列(Message Queue)
解耦
冗余
扩展性
灵活性 & 峰值处理能力
可恢复性
顺序保证
缓冲
异步通信
举个例子
如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模型
面对这些需求,如何高效、稳定的完成数据的生产和消费呢?
最大的特性
现在已被多家大型公司作为多种类型的数据管道和消息系统使用。
一个典型的 kafka 集群包含
kafka
建议 partition 的数量要小于等于集群 broker 的数量,这样消息数据就可以均匀的分布在各个broker 中
Kafka 发布消息通常有两种模式
Kafka 中的 Producer 和 consumer 采用的两种模式
分布式协调技术
脑裂
分布式系统
分布式锁
分布式锁的实现者
经过验证的优势
在主从分布式系统中
解决方法
传统解决单点故障方法隐患
网络问题
master 启动
master 故障
master 恢复
集群主要角色有
server 分为三个角色
每个角色的含义
Leader:领导者角色
follower:跟随着角色
observer:观察者角色
observer
client:客户端角色
修改数据的流程
写的流程
Kafka
四层负载均衡
使用 Zookeeper 进行负载均衡
Offset 在 Zookeeper 中由一个专门节点进行记录,节点路径为
消费到消费者分组
对消费者分组中的消费者的变化注册监听
对 Broker 服务器变化注册监听
进行消费者负载均衡
192.168.10.101
zookeeper 和 kafka 源码包上传XShell
设置 hosts 文件
单节点不需要关闭防火墙、内核机制
首先下载 java 依赖
[root@kafka1 ~]# yum -y install java
解压
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
移动 zookeeper 软件
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
进入目录
[root@kafka1 ~]# cd /etc/zookeeper/conf
复制配置文件
[root@kafka1 conf]# mv zoo_sample.cfg zoo.cfg
修改配置文件
- [root@kafka1 conf]# vim zoo.cfg
- dataDir=/etc/zookeeper/zookeeper-data
进入 zookeeper 目录
[root@kafka1 conf]# cd /etc/zookeeper/
创建目录
[root@kafka1 kafka]# mkdir zookeeper-data
启动服务
[root@kafka1 zookeeper]# ./bin/zkServer.sh start
查看服务状态
[root@kafka1 zookeeper]# ./bin/zkServer.sh status
解压 kafka
[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
移动软件
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
进入目录
[root@kafka1 ~]# cd /etc/kafka/
修改配置文件
- [root@kafka1 kafka]# vim config/server.properties
- //60行
- log.dirs=/etc/kafka/kafka-logs
创建目录
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
启动脚本开启服务
[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties &
检查两个端口的开启状态
- [root@kafka1 kafka]# netstat -anpt | grep 2181
- tcp6 0 0 :::2181 :::* LISTEN 2029/java
- tcp6 0 0 ::1:35678 ::1:2181 ESTABLISHED 2109/java
- tcp6 0 0 ::1::2181 ::1:35678 ESTABLISHED 2029/java
- [root@kafka1 kafka]# netstat -anpt | grep 9092
- tcp6 0 0 :::9092 :::* LISTEN 2109/java
- tcp6 0 0 127.0.0.1:38952 127.0.0.1:9092 ESTABLISHED 2109/java
- tcp6 0 0 127.0.0.1:9092 127.0.0.1:38952 ESTABLISHED 2109/java
启动时先启动 zookeeper,关闭时先关闭 kafka
如果要关闭 kafka
[root@kafka1]# ./kafka-server-stop.sh
如果关不了,就 kill 杀死该进程
先创建 topic
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
列出 topic
- [root@localhost bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181
- test
生产消息
- [root@localhost bin]# ./kafka-console-producer.sh --broker-list kafka1:9092 -topic test
- >zhangsan
- >lisi
消费消息(打开一个新的终端,一边生产消息,一边查看消费消息)
- [root@localhost bin]# ./kafka-console-producer.sh --bootstrap-server kafka1:9092 -topic test
- zhangsan
- lisi
删除消息
[root@localhost bin]# ./kafka-topic.sh --delete --zookeeper kafka1:2181 --topic test
这里恢复快照
主机
kafka1:192.168.10.101
kafka2:192.168.10.102
kafka3:192.168.10.103
上传 zookeeper 和 kafka 源码包至 XShell
修改主机 hosts 文件(所有主机都配置)
修改主机名
- [root@localhost ~]# hostnamectl set-hostname kafka1
- [root@localhost ~]# bash
- [root@localhost ~]# hostnamectl set-hostname kafka2
- [root@localhost ~]# bash
- [root@localhost ~]# hostnamectl set-hostname kafka3
- [root@localhost ~]# bash
同步会话,修改文件
- [root@kafka1 ~]# vim /etc/hosts
- //添加主机地址
- 192.168.10.101 kafka1
- 192.168.10.102 kafka2
- 192.168.10.103 kafka3
关闭防火墙、内核机制
- [root@kafka1 ~]# systemctl stop firewalld
- [root@kafka1 ~]# setenforce 0
下载 java 依赖
[root@kafka1 ~]# yum -y install java
解压 zookeeper
[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
移动
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
进入目录
[root@kafka1 ~]# cd /etc/zookeeper
创建数据目录
[root@kafka1 ~]# mkdir zookeeper-data
切换目录
[root@kafka1 ~]# cd conf/
复制文件
[root@kafka1 conf]# cp zoo_sample.cfg zoo.cfg
修改配置文件
- [root@kafka1 conf~]# vim zoo.cfg
- dataDir=/etc/zookeeper/zookeeper-data
- server.1=192.168.10.101:2888:3888
- server.2=192.168.10.102:2888:3888
- server.3=192.168.10.103:2888:3888
切换到 zookeeper-data 目录
[root@kafka1 conf]# cd /etc/zookeeper/zookeeper-data
创建节点 id 文件(按 server 编号设置这个 id,三个机器不同)
节点1
- [root@kafka1 zookeeper-data]# echo 1 > myid
- [root@kafka1 zookeeper-data]# cat myid
- 1
节点2
- [root@kafka1 zookeeper-data]# echo 2 > myid
- [root@kafka1 zookeeper-data]# cat myid
- 2
节点3
- [root@kafka1 zookeeper-data]# echo 3 > myid
- [root@kafka1 zookeeper-data]# cat myid
- 3
同步会话,切换目录
- [root@kafka1 zookeeper-data]# cd
- [root@kafka1 ~]# cd /etc/zookeeper
- [root@kafka1 zookeeper]# cd bin
三个节点启动服务
- [root@kafka1 bin]# ./zkServer.sh start
- /usr/bin/java
- ZooKeeper JMX enabled by default
- Using config: /etc/zookeeper/bin/../conf/zoo.cfg
- Starting zookeeper ... STARTED
查看状态
- [root@kafkal bin]# ./zkServer.sh status
- /usr/bin/iava
- ZooKeeper JMX enabled by default
- Using config: /etc/zookeeper/bin/../conf/zoo.cfg
- Client port found: 2181. client address: localhost.
- Mode: followerr
kafka 的部署
kafka 的安装(三个节点的配置相同)
解压 kafka
[root@kafak1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
移动
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
切换目录
- [root@kafka1 ~]# cd /etc/kafka
- [root@kafka1 kafka]# cd config/
三个节点修改配置文件
- [root@kafka1 config]# vim server.properties
- //21行 修改,注意其他两个的id分别是2和3
- broker.id=1
- //31行 修改,其他节点改成各自的IP地址
- listeners=PLAINTEXT://192.168.10.101:9092
- //60行 修改
- log.dirs=/etc/kafka/kafka-logs
- //65行 分片数量,不能超过节点数
- num.partitions=1
- zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:2181
9092 是 kafka 的监听端口
三个节点创建 logs 目录
[root@kafka1 config]# mkdir /etc/kafka/kafka-logs
切换目录
- [root@kafka1 config]# cd ..
- [root@kafka1 kafka]# cd bin
三个节点启动服务
- [root@kafka1 bin]# ./kafka-server-start.sh /etc/kafka/config/server.properties
- [1] 2212
如果不能启动,可以将/etc/kafka/kafka-logs 中的数据清除再试试
检查端口
- [root@kafka1 bin]# netstat -anpt | grep 9092
- tcp6 0 0 192.168.10.101:9092 :::* LISTEN 2121/java
- 212/java
- tcp6 0 0 192.168.10.101:9092 192.168.10.102:58058 ESTABLISHED 2121/java
- [root@kafka1 bin]# netstat -anpt | grep 2181
- tcp6 0 0 :::2181 :::* LISTEN 2121/java
- tcp6 0 0 192.168.10.101:2181 192.168.10.101:34214 ESTABLISHED 2212/java
- tcp6 0 0 192.168.10.101:34214 192.168.10.101:2181 ESTABLISHED 2121/java
- [root@kafka1 bin]# netstat -anpt | grep 2888
- tcp6 0 0 192.168.10.101:59564 192.168.10.103:2888 ESTABLISHED 2121/java
- [root@kafka1 bin]# netstat -anpt | grep 3888
- tcp6 0 0 192.168.10.101:3888 :::* LISTEN 2121/java
- tcp6 0 0 192.168.10.101:3888 192.168.10.103:49496 ESTABLISHED 2121/java
- tcp6 0 0 192.168.10.101:3888 192.168.10.102:58838 ESTABLISHED 2121/java
三个节点取消同步
切花目录
- [root@kafka1 bin]# cd
- [root@kafka1 ~]# cd/etc/kafka/bin
创建 topic 目录
- [root@kafka1 bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
- Created topic test.
列出 topic,(任意一个节点)
- [root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181
- test
- [root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka2:2181
- test
- [root@kafka2 bin]# ./kafka-topics.sh --list --zookeeper kafka3:2181
- test
生产消息
- [root@kafka1 bin]# ./kafka-console-producer.sh --broker-list kafkal:9092 -topic test
- >hyx
- >zhangsan
- >lisi
任意一个节点查看消费消息
- [root@kafka3 bin]# ./kafka-console-consumer.sh --bootstrap-server kafkal:9092 --topic test
- hyx
- zhangsan
- lisi
错误提示
Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
解决方法
删除所有日志文件
[root@kafka1 kafka]# rm -rf /tmp/kafka-logs/*
pkill 杀死 kafka 的进程号
[root@kafka1 kafka]# netstat -anpt | grep 9092
启动服务
[root@kafka2 kafka]# ./bin/kafka-server-start.sh config/server.properties &
小阿轩yx-zookeeper+kafka群集
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。