当前位置:   article > 正文

kafka 快速上手

kafka 快速上手

 下载 Apache Kafka

  演示window 安装

   编写启动脚本,脚本的路径根据自己实际的来

启动说明

先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper

巧记: 铲屎官(zookeeper)总是第一个到,最后一个走

启动zookeeper

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

启动kafka  

call bin/windows/kafka-server-start.bat config/server.properties

 测试脚本,主要用于创建主题 ‘test-topic’

  1. # 创建主题(窗口1
  2. bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --create
  3. # 查看主题
  4. bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
  5. bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe
  6. # 修改某主题的分区
  7. bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2
  8. # 生产消息(窗口2)向test-topic主题发送消息
  9. bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
  10. >hello kafka
  11. # 消费消息(窗口3)消费test-topic主题的消息
  12. bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
  1. package com.ldj.kafka.admin;
  2. import org.apache.kafka.clients.admin.AdminClient;
  3. import org.apache.kafka.clients.admin.AdminClientConfig;
  4. import org.apache.kafka.clients.admin.CreateTopicsResult;
  5. import org.apache.kafka.clients.admin.NewTopic;
  6. import java.util.*;
  7. /**
  8. * User: ldj
  9. * Date: 2024/6/13
  10. * Time: 0:00
  11. * Description: 创建主题
  12. */
  13. public class AdminTopic {
  14. public static void main(String[] args) {
  15. Map<String, Object> adminConfigMap = new HashMap<>();
  16. adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  17. AdminClient adminClient = AdminClient.create(adminConfigMap);
  18. /**
  19. * 使用kafka默认的分区算法创建分区
  20. */
  21. NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);
  22. NewTopic topic2 = new NewTopic("topic-02", 2, (short) 2);
  23. CreateTopicsResult addResult1 = adminClient.createTopics(Arrays.asList(topic1, topic2));
  24. /**
  25. * 手动为主题(topic-03)分配分区
  26. * topic-03主题下的0号分区有2个副本,它们中的一个在节点id=1中,一个在节点id=2中;
  27. * list里第一个副本就是leader(主写),后面都是follower(主备份)
  28. * 例如:0分区,nodeId=1的节点里的副本是主写、2分区,nodeId=3的节点里的副本是主写
  29. */
  30. Map<Integer, List<Integer>> partition = new HashMap<>();
  31. partition.put(0, Arrays.asList(1, 2));
  32. partition.put(1, Arrays.asList(2, 3));
  33. partition.put(2, Arrays.asList(3, 1));
  34. NewTopic topic3 = new NewTopic("topic-03", partition);
  35. CreateTopicsResult addResult2 = adminClient.createTopics(Collections.singletonList(topic3));
  36. //DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));
  37. adminClient.close();
  38. }
  39. }
  1. package com.ldj.kafka.producer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.ldj.kafka.model.UserEntity;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerConfig;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.clients.producer.RecordMetadata;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.Objects;
  12. import java.util.concurrent.Future;
  13. /**
  14. * User: ldj
  15. * Date: 2024/6/12
  16. * Time: 21:08
  17. * Description: 生产者
  18. */
  19. public class KfkProducer {
  20. public static void main(String[] args) throws Exception {
  21. //生产者配置
  22. Map<String, Object> producerConfigMap = new HashMap<>();
  23. producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  24. producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  25. producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  26. //批量发送
  27. producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2);
  28. //消息传输应答安全级别 0-消息到达broker(效率高,但不安全) 1-消息在leader副本持久化(折中方案) -1/all -消息在leader和flower副本都持久化(安全,但效率低)
  29. producerConfigMap.put(ProducerConfig.ACKS_CONFIG, "all");
  30. //ProducerState 缓存5条数据,重试数据会与5条数据做比较,结论只能保证一个分区的数据幂等性,跨会话幂等性需要通过事务操作解决(重启后全局消息id的随机id会发生改变)
  31. //消息发送失败重试次数,重试会导致消息重复!!(考虑幂等性),消息乱序(判断偏移量是否连续,错乱消息回到在缓冲区重新排序)!!
  32. producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3);
  33. //kafka有消息幂等性处理(全局唯一消息id/随机id-分区-偏移量),默认false-不开启
  34. producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  35. //解决跨会话幂等性,还需结合事务操作,忽略
  36. //producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id");
  37. //创建生产者
  38. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);
  39. //TODO 事务初始化方法
  40. //producer.initTransactions();
  41. //构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
  42. try {
  43. //TODO 开启事务
  44. //producer.beginTransaction();
  45. for (int i = 0; i < 10; i++) {
  46. UserEntity userEntity = new UserEntity()
  47. .setUserId(2436687942335620L + i)
  48. .setUsername("lisi")
  49. .setGender(1)
  50. .setAge(18);
  51. ProducerRecord<String, String> record = new ProducerRecord<>(
  52. "test-topic",
  53. userEntity.getUserId().toString(),
  54. JSON.toJSONString(userEntity));
  55. //发送数据到Broker
  56. Future<RecordMetadata> future = producer.send(record, (RecordMetadata var1, Exception var2) -> {
  57. if (Objects.isNull(var2)) {
  58. System.out.printf("[%s]消息发送成功!", userEntity.getUserId());
  59. } else {
  60. System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());
  61. }
  62. });
  63. //TODO 提交事务
  64. //producer.commitTransaction();
  65. //注意没有下面这行代码,是异步线程从缓冲区读取数据异步发送消息,反之是同步发送,必须等待回调消息返回才会往下执行
  66. System.out.printf("发送消息[%s]----", userEntity.getUserId());
  67. RecordMetadata recordMetadata = future.get();
  68. System.out.println(recordMetadata.offset());
  69. }
  70. } finally {
  71. //TODO 终止事务
  72. //producer.abortTransaction();
  73. //关闭通道
  74. producer.close();
  75. }
  76. }
  77. }
  1. package com.ldj.kafka.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Collections;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * User: ldj
  13. * Date: 2024/6/12
  14. * Time: 21:10
  15. * Description: 消费者
  16. */
  17. public class KfkConsumer {
  18. public static void main(String[] args) {
  19. //消费者配置
  20. Map<String, Object> consumerConfigMap = new HashMap<>();
  21. consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  22. consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  23. consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  24. //所属消费组
  25. consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");
  26. //创建消费者
  27. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);
  28. //消费主题的消息 ConsumerRebalanceListener
  29. consumer.subscribe(Collections.singletonList("test-topic"));
  30. try {
  31. while (true) {
  32. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
  33. //数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
  34. for (ConsumerRecord<String, String> record : records) {
  35. System.out.println(record.value());
  36. }
  37. }
  38. } finally {
  39. //关闭消费者
  40. consumer.close();
  41. }
  42. }
  43. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/729234
推荐阅读
相关标签
  

闽ICP备14008679号