当前位置:   article > 正文

rocketmq消息注解基于springboot的简单应用及默认详细配置_rocketmq springboot 配置

rocketmq springboot 配置

概述

        rocketmq消息组件在springboot框架中的应用,使用rocketmq的整合包进行编码实现。

编码参考

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.3</version>
  5. </dependency>
  • 基本配置
  1. rocketmq:
  2. name-server: 192.168.15.175:9876;192.168.15.77:9876
  3. producer:
  4. group: ta-cipher-encode

rocketmq.name-server:  rocketmq集群地址,单点或集群

rocketmq.producer.group : 生产者组名,用于标识一组具有相同功能的生产者

  • 其他配置

Name Server相关配置:
        rocketmq.namesrv.domain:Name Server的域名,用于自动发现Name Server地址。

Producer相关配置:
        rocketmq.producer.group:生产者组名,用于标识一组具有相同功能的生产者。
        rocketmq.producer.send-msg-timeout:发送消息的超时时间,默认为3秒。
        rocketmq.producer.compress-msg-body-over-howmuch:消息体大于指定字节大小时启用压缩。

Consumer相关配置:
        rocketmq.consumer.group:消费者组名,用于标识一组具有相同功能的消费者。
        rocketmq.consumer.consume-thread-min:消费者线程池的最小线程数。
        rocketmq.consumer.consume-thread-max:消费者线程池的最大线程数。
        rocketmq.consumer.consume-message-batch-max-size:批量消费消息时每次拉取的最大消息数量。
        rocketmq.consumer.pull-interval:拉取消息间隔时间,默认为0,表示尽可能快地拉取消息。

Message相关配置:
        rocketmq.message.max-size:消息的最大大小,默认为4MB。
        rocketmq.message.compress-level:消息压缩级别,可选值为0(不压缩)到9(最高压缩率)。
        rocketmq.message.timeout:消息的过期时间,默认为3天。

集群模式相关配置:
        rocketmq.broker.cluster.name:Broker集群的名称。
        rocketmq.broker.cluster.slave-read-only:Slave节点是否只读,默认为true。

  • 生产端代码
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @Slf4j
  7. public class RmqProvdier{
  8. @Autowired
  9. private RocketMQTemplate rocketMQTemplate;
  10. public boolean send(String message) {
  11. try {
  12. // 发送消息
  13. rocketMQTemplate.convertAndSend("ta-cipher-persist", message);
  14. } catch (Exception e) {
  15. log.error("send message:{}", message, e);
  16. return false;
  17. }
  18. return true;
  19. }
  20. }
  • 消费端代码
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @Slf4j
  7. @RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
  8. public class RocketMQConsumer implements RocketMQListener<String> {
  9. @Override
  10. public void onMessage(String message) {
  11. try {
  12. log.info("message:{}", message);
  13. } catch (Exception e) {
  14. log.error("errorMessage:{}", message);
  15. // 抛出异常会重新消费消息
  16. throw new RuntimeException("Message processing failed", e);
  17. }
  18. }
  19. }

@RocketMQMessageListener参数

  • topic 消费这从那个topic中读取消息
  • consumerGroup 当前消费者的消费组
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/713026
推荐阅读
相关标签
  

闽ICP备14008679号