赞
踩
正式开篇之前,先简单介绍下该需求产生的背景:
随着部署在生产环境中的kafka消费端应用越来越多
通过人工的方式(去机房使用Kafka监控工具或者命令行查看消费端是否有数据积压状况)越来越让我烦躁,不仅浪费宝贵的工作时间还会受到各种各样的操作限制影响心情.
于是,决定采用自动监控的方式,通过一套程序监控所有生产环境kafka消费端的运行状况.
基于以上需求, 我的程序需要具备如下功能:
1. 能通过配置的方式监控任意集群,任意主题,所有消费者组的消费者消费积压情况
2. 每隔一段时间,通过邮件的方式将所有消费者组积压的偏移量反馈给我,以便我第一时间了解各个程序的运行状况
以下博客正文里所陈述的内容及代码,均经过相关测试,请大家放心使用
本篇博客要点如下:
一.本文所涉及代码的github链接
二.必要的依赖导入
三.消费端监控程序详细代码介绍
本文所涉及到的所有代码和配置均上传到了github,
配置敏感的部分做了下修改,
代码下载后只需要根据实际情况修改相关配置就能实现同样的效果
根据需求描述:
我们首先需要导入kafka相关的api-->用来获取消费者积压的偏移量
需要导入邮件相关的api-->用于将程序的运行情况以邮件的形式告知
同时,还需要引入lombok-->为了打印日志的方便
最终,导入的依赖如下
<dependencies> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> <scope>provided</scope> </dependency> </dependencies>
首先是程序执行的入口, 需求里主要的功能通过该类实现
import java.util.*; import java.util.Map.Entry; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.network.BlockingChannel; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import pers.xmr.bigdata.basic.EmailSendInfo; import pers.xmr.bigdata.basic.EmailSender; import pers.xmr.bigdata.basic.Property; /** * @author xmr * @date 2020/1/6 10:04 * @description 获取Kafka指定消费者组堆积的偏移量, 并发送邮件预警 */ public class KafkaOffsetTools { public static void main(String[] args) throws InterruptedException { String topic = Property.getProperty("topic"); String broker = Property.getProperty("broker"); int port = 9092; String groups = Property.getProperty("group"); String clientId = Property.getProperty("clientId"); int correlationId = 0; while (true) { List<String> seeds = new ArrayList<>(); seeds.add(broker); KafkaOffsetTools kot = new KafkaOffsetTools(); TreeMap<Integer, PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic); List<TopicAndPartition> partitions = new ArrayList<>(); for (Entry<Integer, PartitionMetadata> entry : metadatas.entrySet()) { int partition = entry.getKey(); TopicAndPartition testPartition = new TopicAndPartition(topic, partition); partitions.add(testPartition); } String[] groupArgs = groups.split(","); StringBuilder sb = new StringBuilder(); BlockingChannel channel = new BlockingChannel(broker, port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000); String servers = Property.getProperty("servers"); for (String group : groupArgs) { long sum = 0L; long sumOffset = 0L; long lag = 0L; KafkaConsumer<String, String> kafkaConsumer = kot.getKafkaConsumer(group, topic, servers); for (Entry<Integer, PartitionMetadata> entry : metadatas.entrySet()) { int partition = entry.getKey(); try { channel.connect(); TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); OffsetFetchRequest fetchRequest = new OffsetFetchRequest(group, partitions, (short) 1, correlationId, clientId); channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); OffsetAndMetadata committed = kafkaConsumer.committed(new TopicPartition(topic, partition)); long partitionOffset = committed.offset(); sumOffset += partitionOffset; String leadBroker = entry.getValue().leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); sum += readOffset; System.out.println("group: " + group + " " + partition + ":" + readOffset); consumer.close(); } catch (Exception e) { e.printStackTrace(); channel.disconnect(); } } System.out.println("logSize:" + sum); System.out.println("offset:" + sumOffset); lag = sum - sumOffset; System.out.println("lag:" + lag); sb.append("消费者组 " + group + " 积压的偏移量为: " + lag); sb.append("\n"); } String title = topic + " 消费者消费情况"; kot.sendMail(title, sb.toString()); Thread.sleep(60000 * 30); } } /** * 获取Kafka消费者实例 * * @param group 消费者组 * @param topic 主题名 * @param servers 服务器列表 * @return KafkaConsumer<String, String> */ private KafkaConsumer<String, String> getKafkaConsumer(String group, String topic, String servers) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("max.poll.records", 100); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); return consumer; } private KafkaOffsetTools() { } /** * 获取该消费者组每个分区最后提交的偏移量 * * @param consumer 消费者组对象 * @param topic 主题 * @param partition 分区 * @param whichTime 最晚时间 * @param clientName 客户端名称 * @return 偏移量 */ private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * 获取每个partation的元数据信息 * * @param seedBrokers 服务器列表 * @param port 端口号 * @param topic 主题名 * @return */ private TreeMap<Integer, PartitionMetadata> findLeader(List<String> seedBrokers, int port, String topic) { TreeMap<Integer, PartitionMetadata> map = new TreeMap<>(); loop: for (String seed : seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup" + new Date().getTime()); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return map; } /** * 给定邮件主题和内容发送邮件 * * @param title 邮件主题 * @param content 邮件内容 */ private void sendMail(String title, String content) { EmailSendInfo mailInfo = new EmailSendInfo(); String host = Property.getProperty("host"); String username = Property.getProperty("username"); String password = Property.getProperty("password"); String sendEmailUsers = Property.getProperty("sendEmailUsers"); String ccEmialUsers = Property.getProperty("ccEmailUsers"); mailInfo.setMailServerHost(host); mailInfo.setValidate(true); mailInfo.setUserName(username); mailInfo.setPassword(password); mailInfo.setFromAddress(username); mailInfo.setToAddress(sendEmailUsers); mailInfo.setCcAddress(ccEmialUsers); mailInfo.setSubject(title); mailInfo.setContent(content); //发送文体格式邮件 EmailSender.sendTextMail(mailInfo); } }
考虑到程序的通用性,我把可能不同的部分都抽取出来,作为配置,因此需要一个类来解析配置文件
import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.Set; @Slf4j public class Property { private static final Logger logger = LoggerFactory.getLogger(Property.class); private static Properties props; static{ loadProps(); } synchronized static private void loadProps(){ logger.info("开始加载properties文件内容......."); props = new Properties(); InputStream in = null; try { // 取出application.properties文件参数 InputStream ins = Property.class.getClassLoader().getResourceAsStream("application.properties"); Properties propss = new Properties(); propss.load(ins); String active = propss.getProperty("spring.profiles.active"); String propertiesName = "application-" + active + ".properties"; in = Property.class.getClassLoader().getResourceAsStream(propertiesName); props.load(in); //转码处理 Set<Object> keyset = props.keySet(); for (Object objectKey : keyset) { String key = (String) objectKey; //属性配置文件自身的编码 String propertiesFileEncode = "utf-8"; String newValue = new String(props.getProperty(key).getBytes(StandardCharsets.ISO_8859_1), propertiesFileEncode); props.setProperty(key, newValue); } } catch (Exception e) { log.error("配置文件加载异常:"+e.toString()); } finally { try { if(null != in) { in.close(); } } catch (IOException e) { log.error("properties文件流关闭出现异常"); } } log.info("加载properties文件内容完成..........."); log.info("properties文件内容:" + props); } public static String getProperty(String key){ if(null == props) { loadProps(); } return props.getProperty(key); } }
最终,我们是通过邮件的收发来汇报消费者组消费积压情况,因此还需要写一个类用来发送邮件
import javax.mail.*; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import java.util.Date; import java.util.Properties; public class EmailSender { public static boolean sendTextMail(EmailSendInfo mailInfo) { boolean sendStatus = false;//发送状态 // 判断是否需要身份认证 EmailAuthenticator authenticator = null; Properties pro = mailInfo.getProperties(); if (mailInfo.isValidate()) { // 如果需要身份认证,则创建一个密码验证器 authenticator = new EmailAuthenticator(mailInfo.getUserName(), mailInfo.getPassword()); } // 根据邮件会话属性和密码验证器构造一个发送邮件的session Session sendMailSession = Session.getInstance(pro, authenticator); //【调试时使用】开启Session的debug模式 sendMailSession.setDebug(true); try { // 根据session创建一个邮件消息 MimeMessage mailMessage = new MimeMessage(sendMailSession); // 创建邮件发送者地址 Address from = new InternetAddress(mailInfo.getFromAddress()); // 设置邮件消息的发送者 mailMessage.setFrom(from); String toaddr=mailInfo.getToAddress(); String ccaddr=mailInfo.getCcAddress(); mailMessage.addRecipients(Message.RecipientType.TO,toaddr); mailMessage.addRecipients(Message.RecipientType.CC,ccaddr); // 设置邮件消息的主题 mailMessage.setSubject(mailInfo.getSubject(), "UTF-8"); // 设置邮件消息发送的时间 mailMessage.setSentDate(new Date()); // 设置邮件消息的主要内容 String mailContent = mailInfo.getContent(); mailMessage.setText(mailContent, "UTF-8"); // 发送邮件 Transport.send(mailMessage); sendStatus = true; } catch (MessagingException ex) { ex.printStackTrace(); } return sendStatus; } }
因篇幅所限,还有两个实体类这里不再给出,感兴趣的童鞋可以点击如下链接:
github代码链接,自行下载
控制台打印情况 :
其中logSize为各个分区偏移量汇总
offset为各个分区已消费偏移量汇总
lag为消费者组还未消费的偏移量(也就是我们最终想要邮件告知的结果)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a5g07nk4-1578383141717)(https://i.loli.net/2020/01/07/zioK8dNVqaOey3f.png)]
邮件展示内容如下:
这里,给大家安利一个很好的代码示例的网站
包含JavaSE,以及大数据各种组件API使用等
都可以在该网站找到相应示例,为我们的工作学习,提供了很大便利
最后要反思一下自己:
被这个折磨人的运维工作困扰已久,才想起来花几个小时写个监控程序来帮我工作.
作为一个程序员,属实惭愧.
今后所有不能让精神或者物质获得满足的所有重复性劳动,都应该考虑交由程序来完成
至于自己,喝喝茶,偷偷懒,逛逛论坛,就可以了~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。