赞
踩
本文目的是:教会你使用Spring Boot集成RocketMQ。
<!-- rocketmq 依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency> <!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
#rocketmq的配置
rocketmq:
name-server: 127.0.0.1:9876 # rocketmq访问地址
producer:
group: cxccccccc # 生产者组
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

@Autowired private MQProducerService producerService; @PostMapping("/test") @ResponseBody public Object test(@RequestBody LinkedHashMap<String,Object> params) { int type = Integer.parseInt(params.get("type").toString()); switch (type) { case 1: producerService.send(new AjaxResult()); break; case 2: // syncSend() 同步发送消息。 return producerService.sendMsg("{\"code\": 200,\"msg\": \"操作成功\",\"data\":{}}"); case 3: // asyncSend() 发送成功后会执行回调函数,执行响应的代码。 producerService.sendAsyncMsg("你好啊,这里是消息内容"); break; case 4: // 会延迟xx s后才会到MQ里面。 SendResult sendResult = producerService.sendDelayMsg("hello,this is the message waiting to consume.", 2); return sendResult; case 5: // 发送单向消息,发完就发完了,没有返回值。 producerService.sendOneWayMsg("今天是美好的一天"); break; case 6: // 发送的消息带有tag 标签、带有KEY业务标识。 SendResult sendResult1 = producerService.sendTagMsg("今天是蛋糕的一天"); return sendResult1; default: throw new IllegalStateException("Unexpected value: " + type); } return null; }
@Slf4j @Component public class MQProducerService { /** * UCS请求日志的rocketmq的主题 */ private static final String topic = "MSG_DingAttendance"; /** * 直接注入使用,用于发送消息到broker服务器 */ @Autowired private RocketMQTemplate rocketMQTemplate; /** * 普通发送(这里的参数对象ajaxResult可以随意定义,可以发送个对象,也可以是字符串等) * * @param ajaxResult */ public void send(AjaxResult ajaxResult) { rocketMQTemplate.convertAndSend(topic + ":tag1", ajaxResult); } /** * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失信息) * msgBody也可以是对象,SendResult为返回的发送结果) * * @param msgBody * @return */ public SendResult sendMsg(String msgBody) { SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build()); log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; } /** * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) * * @param msgBody */ public void sendAsyncMsg(String msgBody) { rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 System.out.println("sendAsyncMsg发送成功"); } @Override public void onException(Throwable throwable) { // 处理消息发送异常逻辑 System.out.println("sendAsyncMsg发送失败"); } }); } /** * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时) * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * * @param msgBody 消息体 * @param delayLevel 延时等级 */ public SendResult sendDelayMsg(String msgBody, int delayLevel) { return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), 3000, delayLevel); } /** * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志) * * @param msgBody */ public void sendOneWayMsg(String msgBody) { rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build()); } /** * 发送带tag的消息,直接在topic后面加上“:tag”,key在Header里面设置KEYS * * @param msgBody */ public SendResult sendTagMsg(String msgBody) { String key = String.valueOf(IdUtils.createSnowflake()); Message message = new Message(topic, key, msgBody.getBytes()); return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(message).setHeader("KEYS",key).build()); } }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。