赞
踩
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<rocketmq.version>2.0.4</rocketmq.version>
rocketmq:
name-server: 192.168.1.8:9876 # 访问地址
producer:
group: Pro_Group # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2


package com.base.framework.iot.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Slf4j
@Component
public class MQConsumerService {
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
// @Service
// @RocketMQMessageListener(topic = “RLT_TEST_TOPIC”, selectorExpression = “tag1”, consumerGroup = “Con_Group_One”)
// public class ConsumerSend implements RocketMQListener {
// // 监听到消息就会执行此方法
// @Override
// public void onMessage(User user) {
// log.info(“监听到消息:user={}”, JSON.toJSONString(user));
// }
// }
// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,不然生产者发送一条消息,这两个都会去消费,类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行 @Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two") public class ConsumerSend2 implements RocketMQListener<String> { @Override public void onMessage(String str) { log.info("监听到消息:str={}", str); } } // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型 @Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three") public class Consumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); String msg = new String(body); log.info("监听到消息:msg={}", msg); } }
}
package com.base.framework.iot.rocketmq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
mq生产者
*/
@Slf4j
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
// 建议正常规模项目统一用一个TOPIC
private static final String topic = “RLT_TEST_TOPIC”;
// 直接注入使用,用于发送消息到broker服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
/**
/**
发送异步消息(适合对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应)
*/
public void sendAsyncMsg(String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
}
});
}
/**
/**
/**
}
package com.base.framework.iot.rocketmq;
import com.base.framework.common.response.Result;
import com.github.xiaoymin.knife4j.annotations.ApiSort;
import io.swagger.annotations.Api;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
rocketmq 前端控制层
@author lingchen
@since 2021-01-26
*/
@Api(tags = “rocketmq相关”)
@ApiSort(10)
@Validated
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private MQProducerService mqProducerService;
// @GetMapping("/send")
// public void send() {
// User user = User.getUser();
// mqProducerService.send(user);
// }
@GetMapping("/sendTag")
public Result<SendResult> sendTag() {
SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
return Result.success(sendResult);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。