赞
踩
在 Kafka 中,SASL 是一种重要的安全协议,用于提供基于身份验证的访问控制。Kafka 使用 SASL 来支持各种身份验证机制,如:
具体信息可以参考官网:kafka安全机制官网-2.7
这里采用SCRAM用于kafka安全机制的实现,而不是采用其他方式实现,主要有如下原因:
安装顺序:jdk–>zookeeper–>kafka
小知识:kafka版本命名约定
kafka: 这部分指的是 Apache Kafka,一个开源的分布式事件流平台。Kafka 提供了一种可靠的、可扩展的发布-订阅消息系统,可以处理大规模的实时数据流。
2.12: 这表示 Scala 的版本。在 Kafka 的情况下,2.12意味着它是使用 Scala 2.12 编译的。Scala 是一种运行在 Java 虚拟机上的多范式编程语言,被用于 Kafka 的实现。
2.7.1: 这是 Kafka 的版本号。在这个例子中,版本号是 2.7.1。版本号通常表示软件的发布版本,新版本通常包含新功能、改进和修复之前版本的 bug。
cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib
Kafka从2.0.0版本开始就不再支持JDK7及以下版本
export JAVA_HOME=/opt/jdk解压后的文件名
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
source /etc/profile
java -version
ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。
export ZOOKEEPER_HOME=/opt/zookeeper解压后的文件名
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
# ZooKeeper服务器心跳时间,单位为ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader与follower心跳检测最大客忍时间,响应超过syncLimit*tickTime,leader认为
# fo11ower“死掉”,从服务器列表中别除fol1ower
syncLimit=5
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# ZooKeeper对外服务端口
clientPort=2181
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log
zkServer.sh start
zkServer.sh status
以上是关于ZooKeeper单机模式的安装与配置,一般在生产环境中使用的都是集群模式,集群模式的配置也比较简单,相比单机模式而言只需要修改一些配置即可。下面以3台机器为例来配置一个ZooKeeper集群。首先在这3台机器的**/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,示例如下(3个IP地址分别对应3台机器)
然后在这3台机器的zoo.cfg**文件中添加以下配置:
zookeeper和kafka在默认情况下,是没有开启安全认证的,那么任意客户端可以在不需要任何身份认证的情况下访问zookeeper和kafka下的各节点,甚至可以进行节点的增加,修改以及删除的动作。注意,前面的动作是基于客户端能访问服务端所在的网络,如果进行了物理隔绝或者做了防火墙限制,那前述内容就不一定成立。但是,在某些对安全加固要求比较严格的客户或者生产环境中,那就必须开启安全认证才行。除了最基本的身份认证以外,还有针对每个节点的权限访问,但本文不涉及该话题。
进入正题,先从zookeeper开始配置,zookeeper官网提供了认证配置的参考,点击下方官网地址,即可查看详情。配置分两种情况:
如果是非集群模式下,仅配置客户端和服务端的双向认证即可。集群模式下,则需要客户端和服务端的认证以及zookeeper服务器之间的双向认证。
Zookeeper 使用的是Java自带的认证和授权服务(简称:JAAS),详细内容请看官网,该链接是 Java 8 的 JAAS 的介绍。这里为zookeeper和kafka分别在对应配置文件下创建jass配置文件为(文件名可以随意):
注意:本节中的客户端指的kafka,服务端指的是zookeeper
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret”
user_kafka="kafkasecret";
};
# 强制进行SASL认证
sessionRequireClientSASLAuth=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"
Client{
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafkasecret";
};
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"
quorum.auth.enableSasl=true # 打开sasl开关, 默认是关的
quorum.auth.learnerRequireSasl=true # ZK做为leaner的时候, 会发送认证信息
quorum.auth.serverRequireSasl=true # 设置为true的时候,learner连接的时候需要发送认证信息,否则拒绝
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=20 # 建议设置成ZK节点的数量乘2
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_test="test";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="test"
password="test";
};
QuorumServer 和 QuorumLearner 都是配置的ZK节点之间的认证配置
# broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
# broker对外提供的服务入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,为了方便演示,我们假设Kafka和ZooKeeper都安装在本机
zookeeper.connect=localhost:2181/kafka
如果是单机模式,那么修改完上述配置参数之后就可以启动服务。如果是集群模式,那么只需要对单机模式的配置文件做相应的修改即可:确保集群中每个broker的broker.id配置参数的值不一样,以及listeners配置参数也需要修改为与broker对应的IP地址或域名,之后就可以各自启动服务。注意,在启动 Kafka 服务之前同样需要确保 zookeeper.connect参数所配置的ZooKeeper服务已经正确启动。
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin
注意:在配置kafka的server.properties对于zookeeper的连接我们采用的是zookeeper的CHROOT,所以上述命令中也需要指定对应路径,不然启动的kafka时会获取不到生成的SCRAM认证信息!!!
小知识:
ZooKeeper 中的 CHROOT 是指将 ZooKeeper 的命名空间限定在一个特定的路径下。这就是说,ZooKeeper 的所有数据和操作都将在指定的路径下进行,而不是整个 ZooKeeper 服务器上。CHROOT 功能允许在一个 ZooKeeper 集群上运行多个独立的 ZooKeeper 实例,每个实例都有自己的命名空间。
在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通过配置项 chroot 来设置。例如:
chroot=/myapp
在这个例子中,ZooKeeper 就会将其根路径设置为 /myapp,而不是默认的根路径。这样,对于 ZooKeeper 中的所有路径,都将以 /myapp 为根进行解释。这就好比把 ZooKeeper 变成了一个容器,其内部的所有路径都相对于 /myapp 这个容器。
CHROOT 的使用场景包括:
- 隔离命名空间: 允许多个应用在同一个 ZooKeeper 集群上使用不同的命名空间,防止彼此之间的命名冲突。
- 模拟多个独立环境: 允许在同一个 ZooKeeper 集群上模拟多个独立的环境,每个环境有自己的数据和配置。
要注意的是,如果你在使用 CHROOT,ZooKeeper 客户端在连接到 ZooKeeper 服务器时,也需要指定相应的 CHROOT 路径。例如,如果 CHROOT 设置为 /myapp,那么客户端在连接时需要指定 “/myapp” 作为根路径。
总的来说,CHROOT 提供了一种简单而有效的方式,使得在同一个 ZooKeeper 集群上可以支持多个隔离的命名空间。
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
# 启用ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置本例中admin为超级用户;在Zookeeper的“/kafka/config/users”下存在用户
super.users=User:admin
# 同时启用SCRAM和PLAIN机制
sasl.enabled.mechanisms=SCRAM-SHA-256
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.64.102:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092
如果是集群,上述配置每个节点都应该配置一份!!!
bin/kafka-server-start.sh config/server.properties
如果要在后台运行Kafka服务,那么可以在启动命令中加入-daemon参数或&字符,示例如下:
bin/kafka-server-start.sh -daemon config/server.properties
或者
bin/kafka-server-start.sh config/server.properties &
package com.kafka.adminclient; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import java.util.Collections; import java.util.Map; /** * @Author: Jiangxx * @Date: 2023/11/24 * @Description: */ public class KafkaUserOperator { private final AdminClient adminClient; public KafkaUserOperator(AdminClient adminClient) { this.adminClient = adminClient; } public boolean createScramUser(String username, String password) { boolean res = false; //指定一个协议ScramMechanism,迭代次数iterations还没搞清楚干嘛的,设置太小会报错 ScramCredentialInfo scramCredentialInfo = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000); //创建Scram用户凭证,用户不存在,会先创建用户 UserScramCredentialAlteration userScramCredentialUpsertion = new UserScramCredentialUpsertion(username, scramCredentialInfo, password); AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion)); for (Map.Entry<String, KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); } catch (Exception exc) { System.err.println("返回信息:" + exc.getMessage()); } res = !future.isCompletedExceptionally(); } return res; } public boolean deleteScramUser(String username) { boolean res = false; //删除Scram用户凭证,删除后用户无权限操作kafka,zk中用户节点还会存在 UserScramCredentialAlteration userScramCredentialDeletion = new UserScramCredentialDeletion(username, ScramMechanism.SCRAM_SHA_256); AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion)); for (Map.Entry<String, KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); } catch (Exception exc) { System.err.println("返回信息:" + exc.getMessage()); } res = !future.isCompletedExceptionally(); } return res; } }
package com.kafka.adminclient; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * @Author: Jiangxx * @Date: 2023/11/24 * @Description: */ public class KafkaTopicOperator { private final Logger logger = LoggerFactory.getLogger(KafkaTopicOperator.class); private final AdminClient adminClient; public KafkaTopicOperator(AdminClient adminClient) { this.adminClient = adminClient; } /** * 创建系统对应的topic * * @param topicName 主题名称 * @param partitions 分区 * @param replicationFactor 副本 * @param retention 数据有效期 * @return boolean */ public boolean createTopic(String topicName, Integer partitions, Integer replicationFactor, Integer retention) { boolean res = false; Set<String> topics = getTopicList(); if (!topics.contains(topicName)) { partitions = partitions == null ? 1 : partitions; replicationFactor = replicationFactor == null ? 1 : replicationFactor; NewTopic topic = new NewTopic(topicName, partitions, replicationFactor.shortValue()); long param = retention * 24 * 60 * 60 * 1000; Map<String, String> configs = new HashMap<>(); configs.put("retention.ms", String.valueOf(param)); topic.configs(configs); CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic)); for (Map.Entry<String, KafkaFuture<Void>> e : createTopicsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); } catch (Exception exc) { logger.warn("创建topic参数异常,返回信息:{}", exc.getMessage()); } res = !future.isCompletedExceptionally(); } } else { res = true; logger.warn("该主题已存在,主题名称:{}", topicName); } return res; } /** * 修改topic数据有效期 * * @param topicName 主题名称 * @param retention 天数 * @return boolean */ public boolean updateTopic(String topicName, Integer retention) { if (retention < 0) { return false; } boolean res = false; Map<ConfigResource, Config> alertConfigs = new HashMap<>(); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); //转换为毫秒 long param = retention * 24 * 60 * 60 * 1000; ConfigEntry configEntry = new ConfigEntry("retention.ms", String.valueOf(param)); Config config = new Config(Collections.singletonList(configEntry)); alertConfigs.put(configResource, config); AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs); for (Map.Entry<ConfigResource, KafkaFuture<Void>> e : alterConfigsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); } catch (Exception exc) { logger.warn("修改topic参数异常,返回信息:{}", exc.getMessage()); } res = !future.isCompletedExceptionally(); } return res; } /** * 删除topic * * @param topicName 主题 * @return boolean */ public boolean deleteTopic(String topicName) { boolean res = false; Set<String> topics = getTopicList(); if (topics.contains(topicName)) { DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName)); for (Map.Entry<String, KafkaFuture<Void>> e : deleteTopicsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); } catch (Exception exc) { logger.warn("删除topic参数异常,返回信息:{}", exc.getMessage()); } res = !future.isCompletedExceptionally(); } } else { logger.info("topic不存在,名称:{}", topicName); res = true; } return res; } /** * 获取主题列表 * * @return Set */ public Set<String> getTopicList() { Set<String> result = null; ListTopicsResult listTopicsResult = adminClient.listTopics(); try { result = listTopicsResult.names().get(); } catch (Exception e) { logger.warn("获取主题列表失败,失败原因:{}", e.getMessage()); e.printStackTrace(); } return result; } }
package com.kafka.adminclient; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateAclsResult; import org.apache.kafka.clients.admin.DeleteAclsResult; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.acl.*; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Map; /** * @Author: Jiangxx * @Date: 2023/11/24 * @Description: */ public class AclOperator { private final Logger logger = LoggerFactory.getLogger(AclOperator.class); private final AdminClient adminClient; public AclOperator(AdminClient adminClient) { this.adminClient = adminClient; } /** * 添加权限 * * @param resourceType 资源类型 * @param resourceName 资源名称 * @param username 用户名 * @param operation 权限名称 */ public void addAclAuth(String resourceType, String resourceName, String username, String operation) { ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL); AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW); AclBinding aclBinding = new AclBinding(resource, accessControlEntry); CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding)); for (Map.Entry<AclBinding, KafkaFuture<Void>> e : createAclsResult.values().entrySet()) { KafkaFuture<Void> future = e.getValue(); try { future.get(); boolean success = !future.isCompletedExceptionally(); if (success) { logger.info("创建权限成功"); } } catch (Exception exc) { logger.warn("创建权限失败,错误信息:{}", exc.getMessage()); exc.printStackTrace(); } } } /** * 删除权限 * * @param resourceType 资源类型 * @param resourceName 资源名称 * @param username 用户名 * @param operation 权限名称 */ public void deleteACLAuth(String resourceType, String resourceName, String username, String operation) { ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL); AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW); AclBinding aclBinding = new AclBinding(resource, accessControlEntry); DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter())); for (Map.Entry<AclBindingFilter, KafkaFuture<DeleteAclsResult.FilterResults>> e : deleteAclsResult.values().entrySet()) { KafkaFuture<DeleteAclsResult.FilterResults> future = e.getValue(); try { future.get(); boolean success = !future.isCompletedExceptionally(); if (success) { logger.info("删除权限成功"); } } catch (Exception exc) { logger.warn("删除权限失败,错误信息:{}", exc.getMessage()); exc.printStackTrace(); } } } private AclOperation getOperation(String operation) { AclOperation aclOperation = null; switch (operation) { case "CREATE": aclOperation = AclOperation.CREATE; break; case "WRITE": aclOperation = AclOperation.WRITE; break; case "READ": aclOperation = AclOperation.READ; break; default: break; } return aclOperation; } private ResourceType getResourceType(String type) { ResourceType resourceType = null; switch (type) { case "Group": resourceType = ResourceType.GROUP; break; case "Topic": resourceType = ResourceType.TOPIC; break; default: break; } return resourceType; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。