当前位置:   article > 正文

rocketMQ 第一天 spring boot集成 rocketMQ demo_rocke mq

rocke mq

首先判断 rocketMQ 是否启动成功 

 pom.xml 文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.huangquan</groupId>
  7. <artifactId>rocketmq</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <parent>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-parent</artifactId>
  12. <version>2.1.1.RELEASE</version>
  13. <relativePath/> <!-- lookup parent from repository -->
  14. </parent>
  15. <properties>
  16. <!--rocketmq版本-->
  17. <rocketmq.version>4.1.0-incubating</rocketmq.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <!--添加RocketMQ包 -->
  30. <dependency>
  31. <groupId>org.apache.rocketmq</groupId>
  32. <artifactId>rocketmq-common</artifactId>
  33. <version>4.4.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.rocketmq</groupId>
  37. <artifactId>rocketmq-client</artifactId>
  38. <version>4.4.0</version>
  39. </dependency>
  40. </dependencies>
  41. </project>

基础配置   

  1. # 消费者的组名
  2. apache.rocketmq.consumer.PushConsumer=PushConsumer
  3. # 生产者的组名
  4. apache.rocketmq.producer.producerGroup=Producer
  5. # NameServer地址 如果是 公共的服务器 比如 阿里云服务器 要写公共ip
  6. apache.rocketmq.namesrvAddr=电脑ip:9876

生产者 

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  3. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  4. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import org.apache.rocketmq.remoting.common.RemotingHelper;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. /**
  11. * Created by zhisheng_tian on 2018/2/6
  12. */
  13. @Component
  14. public class RocketMQServer {
  15. /**
  16. * 消费者的组名
  17. */
  18. @Value("${apache.rocketmq.consumer.PushConsumer}")
  19. private String consumerGroup;
  20. /**
  21. * NameServer 地址
  22. */
  23. @Value("${apache.rocketmq.namesrvAddr}")
  24. private String namesrvAddr;
  25. @PostConstruct
  26. public void defaultMQPushConsumer() {
  27. //消费者的组名
  28. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
  29. //指定NameServer地址,多个地址以 ; 隔开
  30. consumer.setNamesrvAddr(namesrvAddr);
  31. try {
  32. //订阅PushTopic下Tag为push的消息
  33. consumer.subscribe("TopicTest", "push");
  34. //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  35. //如果非第一次启动,那么按照上次消费的位置继续消费
  36. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  37. consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
  38. try {
  39. for (MessageExt messageExt : list) {
  40. System.out.println("messageExt: " + messageExt);//输出消息内容
  41. String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
  42. System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
  47. }
  48. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
  49. });
  50. consumer.start();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }

 消费者 

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.client.producer.SendResult;
  3. import org.apache.rocketmq.common.message.Message;
  4. import org.apache.rocketmq.remoting.common.RemotingHelper;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.util.StopWatch;
  8. import javax.annotation.PostConstruct;
  9. /**
  10. * Created by zhisheng_tian on 2018/2/6
  11. */
  12. @Component
  13. public class RocketMQClient {
  14. /**
  15. * 生产者的组名
  16. */
  17. @Value("${apache.rocketmq.producer.producerGroup}")
  18. private String producerGroup;
  19. /**
  20. * NameServer 地址
  21. */
  22. @Value("${apache.rocketmq.namesrvAddr}")
  23. private String namesrvAddr;
  24. @PostConstruct
  25. public void defaultMQProducer() {
  26. //生产者的组名
  27. DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
  28. //指定NameServer地址,多个地址以 ; 隔开
  29. producer.setNamesrvAddr(namesrvAddr);
  30. try {
  31. /**
  32. * Producer对象在使用之前必须要调用start初始化,初始化一次即可
  33. * 注意:切记不可以在每次发送消息时,都调用start方法
  34. */
  35. producer.start();
  36. //创建一个消息实例,包含 topic、tag 和 消息体
  37. //如下:topic 为 "TopicTest",tag 为 "push"
  38. Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
  39. StopWatch stop = new StopWatch();
  40. stop.start();
  41. for (int i = 0; i < 10; i++) {
  42. SendResult result = producer.send(message);
  43. System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
  44. }
  45. stop.stop();
  46. System.out.println("----------------发送10条消息耗时:" + stop.getTotalTimeMillis());
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. producer.shutdown();
  51. }
  52. }
  53. }

测试方法

  1. import com.rocketmq.config.RocketMQClient;
  2. import org.apache.rocketmq.client.producer.SendCallback;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. /**
  9. * 生产者
  10. */
  11. @RestController
  12. public class ProducerController {
  13. @Autowired
  14. private RocketMQClient rocketMQClient;
  15. @GetMapping("/syncNews")
  16. public String syncNews(){
  17. rocketMQClient.defaultMQProducer();
  18. System.out.println("sssssssssss");
  19. return "success";
  20. }
  21. }

如果启动报 RemotingTooMuchRequestException: sendDefaultImpl call timeout

 即 是消费者 配置问题 ,加上 这局代码 就可以了 

 如果还报错 可以参考 整合RocketMq提示RemotingTooMuchRequestException: sendDefaultImpl call timeout

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/49678
推荐阅读
相关标签