赞
踩
工作了这么多年,rocketmq还没有用过,由于现在的工作中涉及到了,周六吃完午饭就开始搞,结果到现在3点钟才把环境弄好,测试代码搞起。
整个流程分成两步
参考文章:
https://blog.csdn.net/baidu_33256174/article/details/129599300
~/ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
apache/rocketmq 4.5.0 04b1d4c1d001 About an hour ago 1.28GB
bitnami/mysql 5.7 4164f4e78f8e 2 days ago 415MB
redis latest 37a7207b0faf 3 days ago 149MB
apacherocketmq/rocketmq-dashboard latest eae6c5db5d11 20 months ago 738MB
candice0630/rocketmq-console-ng 2.0 c3494a6e4d86 2 years ago 357MB
apacherocketmq/rocketmq 4.5.0 fa3c9d27c922 3 years ago 493MB
~/
参考 https://www.jianshu.com/p/6ad529a16677
偷懒直接 docker pull candice0630/rocketmq-console-ng:2.0
文章提供的步骤如下
broker.conf
文件# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功 状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址、物理ip,不能用127.0.0.1、localhost、docker内网ip brokerIP1 = 192.168.18.102
rocketmq.yml
文件version: '3' services: namesrv: image: apache/rocketmq:4.5.0 container_name: rmqnamesrv ports: - 9876:9876 command: sh mqnamesrv broker: image: apache/rocketmq:4.5.0 container_name: rmqbroker ports: - 10909:10909 - 10911:10911 - 10912:10912 volumes: - /Users/lixi/rocketmq/broker.conf:/home/rocketmq/rocketmq-4.5.0/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.5.0/conf/broker.conf depends_on: - namesrv mqconsole: image: candice0630/rocketmq-console-ng:2.0 container_name: rmqdashboard ports: - 8181:8080 environment: JAVA_OPTS: -Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false depends_on: - namesrv
使用docker-compose -f rocketmq.yml up -d
启动后,正常会有下面的容器
http://localhost:8181
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>lrocket</artifactId> <version>0.0.1-SNAPSHOT</version> <name>lrocket</name> <description>lrocket</description> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
# rocketmq 配置项,对应 RocketMQProperties 配置类 rocketmq: name-server: 127.0.0.1:9876 # RocketMQ Namesrv # Producer 配置项 producer: group: demo-producer-group # 生产者分组 send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 secret-key: # Secret Key enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 # Consumer 配置项 consumer: listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。 test-consumer-group: topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
package com.example.lrocket.controler; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.time.LocalDateTime; @RestController public class RocketController { @Resource private RocketMQTemplate rocketMQTemplate; // 延时消息,RocketMQ支持这几个级别的延时消息,自定义需要修改broker配置文件 // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h @GetMapping("/rocket/delayMsg/send") public String rocketDelayMsgSend() { LocalDateTime currentDateTime = LocalDateTime.now(); rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentDateTime.toString()).build(), 2000, 3); return currentDateTime.toString(); } @GetMapping("/rocket/send") public String rocketMsgSend() { LocalDateTime currentDateTime = LocalDateTime.now(); rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentDateTime.toString()).build(), 2000, 0); return currentDateTime.toString(); } }
package com.example.lrocket.listener; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; @Component public class RokcetServiceListener { @Service @RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2") public class Consumer1 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consumer1 rocket收到消息: " + s); } } //MessageModel.BROADCASTING 广播消息模式 @Service @RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2", selectorExpression = "tag-2", messageModel = MessageModel.BROADCASTING) public class Consumer2 implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("consumer2 rocket收到消息:" + s); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。