当前位置:   article > 正文

Kafka在Windows安装运行及入门实例(JAVA)_kafka安装 windows 示例程序 java

kafka安装 windows 示例程序 java

一、安装JDK以及zooeleeper这里省略

二、安装与运行Kafka

下载

http://kafka.apache.org/downloads.html

下载后解压到任意一个目录,笔者的是D:\Java\Tool\kafka_2.11-0.10.0.1

1. 进入Kafka配置目录,D:\Java\Tool\kafka_2.11-0.10.0.1
2. 编辑文件“server.properties
3. 找到并编辑log.dirs=D:\Java\Tool\kafka_2.11-0.10.0.1\kafka-log,这里的目录自己修改成自己喜欢的
4. 找到并编辑zookeeper.connect=localhost:2181。表示本地运行
5. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

运行:

重要:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。

1.进入Kafka安装目录D:\Java\Tool\kafka_2.11-0.10.0.1
2.按下Shift+右键,选择“打开命令窗口”选项,打开命令行。
3.现在输入

.\bin\windows\kafka-server-start.bat .\config\server.properties  

并回车。


三、测试

上面的Zookeeper和kafka一直打开

(1)、创建主题

1.进入Kafka安装目录D:\Java\Tool\kafka_2.11-0.10.0.1
2.按下Shift+右键,选择“打开命令窗口”选项,打开命令行。
3.现在输入
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic linlin  


注意不要关了这个窗口!

(2)创建生产者

1.进入Kafka安装目录D:\Java\Tool\kafka_2.11-0.10.0.1
2.按下Shift+右键,选择“打开命令窗口”选项,打开命令行。
3.现在输入

.\binwindows\kafka-console-producer.bat --broker-list localhost:9092 --topic linlin


注意不要关了这个窗口!

(3)创建消费者

1.进入Kafka安装目录D:\Java\Tool\kafka_2.11-0.10.0.1
2.按下Shift+右键,选择“打开命令窗口”选项,打开命令行。
3.现在输入

.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic linlin


注意不要关了这个窗口!

然后在第2个窗口中输入内容,最后记得回车


四、Kafka入门实例

1.整个工程目录如下:


2.pom文件

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.lin</groupId>
  5. <artifactId>Kafka-Demo</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka_2.10</artifactId>
  11. <version>0.8.2.0</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.101tec</groupId>
  15. <artifactId>zkclient</artifactId>
  16. <version>0.10</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>com.alibaba</groupId>
  20. <artifactId>fastjson</artifactId>
  21. <version>1.2.4</version>
  22. </dependency>
  23. </dependencies>
  24. </project>

3.生产者 KafkaProducer

  1. package com.lin.demo.producer;
  2. import kafka.javaapi.producer.Producer;
  3. import kafka.producer.KeyedMessage;
  4. import kafka.producer.ProducerConfig;
  5. import java.util.Properties;
  6. public class KafkaProducer {
  7. private final Producer<String, String> producer;
  8. public final static String TOPIC = "TEST-TOPIC";
  9. private KafkaProducer() {
  10. Properties props = new Properties();
  11. //此处配置的是kafka的端口
  12. props.put("metadata.broker.list", "localhost:9092");
  13. //配置value的序列化类
  14. props.put("serializer.class", "kafka.serializer.StringEncoder");
  15. //配置key的序列化类
  16. props.put("key.serializer.class", "kafka.serializer.StringEncoder");
  17. props.put("request.required.acks", "-1");
  18. producer = new Producer<String, String>(new ProducerConfig(props));
  19. }
  20. void produce() {
  21. int messageNo = 1000;
  22. final int COUNT = 10000;
  23. while (messageNo < COUNT) {
  24. String key = String.valueOf(messageNo);
  25. String data = "hello kafka message " + key;
  26. producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
  27. System.out.println(data);
  28. messageNo++;
  29. }
  30. }
  31. public static void main(String[] args) {
  32. new KafkaProducer().produce();
  33. }
  34. }

运行,结果:

4.消费者 KafkaConsumer

  1. package com.lin.demo.consumer;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import com.lin.demo.producer.KafkaProducer;
  7. import kafka.consumer.ConsumerConfig;
  8. import kafka.consumer.ConsumerIterator;
  9. import kafka.consumer.KafkaStream;
  10. import kafka.javaapi.consumer.ConsumerConnector;
  11. import kafka.serializer.StringDecoder;
  12. import kafka.utils.VerifiableProperties;
  13. /**
  14. * Created by yz.shi on 2018/4/11.
  15. */
  16. public class KafkaConsumer {
  17. private final ConsumerConnector consumer;
  18. private KafkaConsumer() {
  19. Properties props = new Properties();
  20. //zookeeper 配置
  21. props.put("zookeeper.connect", "localhost:2181");
  22. //group 代表一个消费组
  23. props.put("group.id", "jwd-group");
  24. //zk连接超时
  25. props.put("zookeeper.session.timeout.ms", "4000");
  26. props.put("zookeeper.sync.time.ms", "200");
  27. props.put("rebalance.max.retries", "5");
  28. props.put("rebalance.backoff.ms", "1200");
  29. props.put("auto.commit.interval.ms", "1000");
  30. props.put("auto.offset.reset", "smallest");
  31. //序列化类
  32. props.put("serializer.class", "kafka.serializer.StringEncoder");
  33. ConsumerConfig config = new ConsumerConfig(props);
  34. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
  35. }
  36. void consume() {
  37. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  38. topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
  39. StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
  40. StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
  41. Map<String, List<KafkaStream<String, String>>> consumerMap =
  42. consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
  43. KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
  44. ConsumerIterator<String, String> it = stream.iterator();
  45. while (it.hasNext())
  46. System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + it.next().message() + "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
  47. }
  48. public static void main(String[] args) {
  49. new KafkaConsumer().consume();
  50. }
  51. }

运行结果:


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/777458
推荐阅读
相关标签
  

闽ICP备14008679号