当前位置:   article > 正文

RocketMQ4.4笔记 (三)springboot操作rocketmq_rocketmq enablemsgtrace配置不生效

rocketmq enablemsgtrace配置不生效

三、Springboot操作RocketMQ

1、引入依赖添加配置文件

pom.xml

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.0.3</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

application.yml

rocketmq:
  nameServer: 192.168.179.128:9876
  producer:
    group: demo-group
  • 1
  • 2
  • 3
  • 4

2、普通消费者生产者

1、添加配置Topic

demo:
  topic:
    signer: demo-topic-signer
  • 1
  • 2
  • 3

2、生产者

@Component
@Log4j2
public class RocketMqSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${demo.topic.signer:}")
    private String demoSignerTopic;

    public void sendDemoSignerMessage(String str) {
        rocketMQTemplate.convertAndSend(demoSignerTopic, str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、消费者

@Component
@RocketMQMessageListener(
    topic = "demo-topic-signer",	//指定topic
    consumerGroup = "demo-consumer-signer",	//指定消费组
    enableMsgTrace = false)			//是否启用消息轨迹,默认true	
public class RocketMqSignerReceiver implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("signer1:"+s);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、广播订阅模式

1、添加配置文件

demo:
  topic:
    fanout: demo-topic-fanout
  • 1
  • 2
  • 3

2、生产者

@Component
@Log4j2
public class RocketMqSender {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${demo.topic.fanout:}")
    private String demoFanoutTopic;

    public void sendDemoFanoutMessage(String str) {
        rocketMQTemplate.convertAndSend(demoFanoutTopic, str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3、消费者

可以配置多个消费者指定同一个消费组,同一个topic

@Component
@RocketMQMessageListener(
    topic = "demo-topic-fanout", 		//主题
    consumerGroup = "demo-consumer", 	//消费组
    enableMsgTrace = false, 			//是否启用消息轨迹
    messageModel = MessageModel.BROADCASTING)	//开启广播模式
public class RocketMqFanoutReceiver implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("q1:"+s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3、路由tag过滤模式

1、添加配置文件

demo:
  topic:
    signer: demo-topic-signer
    fanouttag: demo-topic-fanouttag
  • 1
  • 2
  • 3
  • 4

2、生产者

@Component
@Log4j2
public class RocketMqSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${demo.topic.fanouttag:}")
    private String demoFanoutTagTopic;
	//第一个参数 topic
    //第二个参数 tag
    //发送的格式为 topic:tag		只可以指定一个tag
    public void sendDemoFanoutTagMessage(String str, String expr) {
        log.info("发送的Topic={},内容={}", demoFanoutTagTopic + expr, str);
        rocketMQTemplate.convertAndSend(demoFanoutTagTopic + expr, str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3、消费组

可以配置多个消费组在同一个topic指定不同的tag来过滤消息获取

//消费者1
@Component
@RocketMQMessageListener(
    topic = "demo-topic-fanouttag", 
    consumerGroup = "demo-consumer-tag-1",
  	enableMsgTrace = false,
    selectorExpression = "v1||v3")	//可以指定多个tag 用||来做或运算
public class RocketMqFanoutTagReceiver implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("tag1:"+s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
//消费者2
@Component
@RocketMQMessageListener(
    topic = "demo-topic-fanouttag", 
    consumerGroup = "demo-consumer-tag-2",
  	enableMsgTrace = false,
    selectorExpression = "v1")	//可以指定多个tag 用||来做或运算
public class RocketMqFanoutTagReceiver implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("tag2:"+s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
//消费者3
@Component
@RocketMQMessageListener(
    topic = "demo-topic-fanouttag", 
    consumerGroup = "demo-consumer-tag-3",
  	enableMsgTrace = false,
    selectorExpression = "v2")	//可以指定多个tag 用||来做或运算
public class RocketMqFanoutTagReceiver implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("tag3:"+s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

测试

发送demo-topic-fanouttag:v1 消费者1和消费组2可以接收消息

发送demo-topic-fanouttag:v2 只有消费者3可以收到

发送demo-topic-fanouttag:v3 只有消费组1可以收到

RocketMQ4.4笔记 (一)入门以及安装
RocketMQ4.4笔记 (二)java操作rocketmq入门
RocketMQ4.4笔记 (三)springboot操作rocketmq
RocketMQ4.4笔记 (四)RocketMQ集群

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号