赞
踩
之前安装好了RocketMQ,这一篇就简单记录一下Spring boot是怎么集成RocketMQ的,如果有需要安装RocketMQ的同学看这一篇,Linux在线安装RocketMQ,如果没有linux环境的同学也可以本地启动,只需要有java环境即可。
如果没有项目先创建一个spring boot项目
引入RocketMQ的依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
导入依赖后需要在项目的yml配置文件中加入RocketMQ的相关配置,配置内容如下:
server:
port: 8181
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: 111.xxx.xxx.xxx:9876
producer:
group: test-grop
前两项都无需关注,后面rocketmq内的配置需要注意一下,主要是name-server配置rocketmq的外网服务的ip跟端口,分组就写个测试分组。
还有其他的配置都已经有了默认值,默认值可以去类中看,根据实际需要进行自定义配置。
生产者发送消息一般都是通过rocketMQTemplate来发送消息,原理是在引入starter包后会有自动配置类RocketMQAutoConfiguration,里面定义了几个bean
如果不重写覆盖bean会默认使用这些bean的内容。如果没有建立topic可以手动执行命令创建topic。
./mqadmin updateTopic -n localhost:9876 -t test_topic
写一个controller类,来进行生产者发送消息的模拟。
@RestController public class RocketController { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送同步消息 */ @GetMapping("/rocket") public void rocket() { SendResult sendResult = rocketMQTemplate.syncSend("test_topic", "哈喽啊"); System.out.println("同步内容"+sendResult.toString()); } /** * 发送过滤消息 */ @GetMapping("/rocket/tag2") public void rocketTag1() { rocketMQTemplate.convertAndSend("test_topic:tag1", "哈喽啊,只有Tag1的消费者可以接受"); } /** * 异步消息 */ @GetMapping("/rocket/tag1") public void rocketTag2() { rocketMQTemplate.asyncSend("test-topic", "哈喽啊,这是一条异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); } }
发送消息内容可以选择同步发送,异步发送,单向发送,发送过滤消息等发送方式
消费者的消费策略可以在RocketMQMessageListener注解上可以进行配置,主要是 topic、consumerGroup、selectorExpression 这三个参数。
@RocketMQMessageListener(
topic = "test_topic",
consumerGroup = "consumer-group-test",
selectorExpression = "tag1 || tag2")
@Component
public class TestConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("receive message: " + msg);
}
}
分析一下参数内容
具体的其他配置项也可以去RocketMQMessageListener源码中查看。
在生产者中定义了三种消息的发生方式,其中一个过滤发送方式,一个同步发送消息一个同步发送消息,都执行一下。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IM7okXLX-1666347780897)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/73cc518423fc47b99356b418273839a7~tplv-k3u1fbpfcp-watermark.image?)]
日志的第一条是同步消息发送产生的确认回执消息,第二条是异步消息产生的,第三天则是消费者产生的,因为消费者筛选了tag,所以只有过滤消息发送的tag1或者tag2的消息消费者才消费。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。