当前位置:   article > 正文

6-rocketmq-springboot整合

enable-msg-trace

官方手册

https://github.com/apache/rocketmq-spring/wiki/用户手册

引包

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. //send message synchronously
  10. rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
  11. //send spring message
  12. rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
  13. //send messgae asynchronously
  14. rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
  15. @Override
  16. public void onSuccess(SendResult var1) {
  17. System.out.printf("async onSucess SendResult=%s %n", var1);
  18. }
  19. @Override
  20. public void onException(Throwable var1) {
  21. System.out.printf("async onException Throwable=%s %n", var1);
  22. }
  23. });
  24. //Send messages orderly
  25. rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
  26. //rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
  27. }
  28. @Data
  29. @AllArgsConstructor
  30. public class OrderPaidEvent implements Serializable{
  31. private String orderId;
  32. private BigDecimal paidMoney;
  33. }
  34. }

接收消息

rongtong edited this page on 25 Dec 2019 · 1 revision

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

  1. @SpringBootApplication
  2. public class ConsumerApplication{
  3. public static void main(String[] args){
  4. SpringApplication.run(ConsumerApplication.class, args);
  5. }
  6. @Slf4j
  7. @Service
  8. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
  9. public class MyConsumer1 implements RocketMQListener<String>{
  10. public void onMessage(String message) {
  11. log.info("received message: {}", message);
  12. }
  13. }
  14. @Slf4j
  15. @Service
  16. @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
  17. public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
  18. public void onMessage(OrderPaidEvent orderPaidEvent) {
  19. log.info("received orderPaidEvent: {}", orderPaidEvent);
  20. }
  21. }
  22. }

事务消息

rongtong edited this page on 25 May · 2 revisions

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. try {
  10. // Build a SpringMessage for sending in transaction
  11. Message msg = MessageBuilder.withPayload(..)...;
  12. // In sendMessageInTransaction(), the first parameter transaction name ("test")
  13. // must be same with the @RocketMQTransactionListener's member field 'transName'
  14. rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
  15. } catch (MQClientException e) {
  16. e.printStackTrace(System.out);
  17. }
  18. }
  19. // Define transaction listener with the annotation @RocketMQTransactionListener
  20. @RocketMQTransactionListener
  21. class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  22. @Override
  23. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  24. // ... local transaction process, return bollback, commit or unknown
  25. return RocketMQLocalTransactionState.UNKNOWN;
  26. }
  27. @Override
  28. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  29. // ... check transaction status and return bollback, commit or unknown
  30. return RocketMQLocalTransactionState.COMMIT;
  31. }
  32. }
  33. }

消息轨迹

rongtong edited this page on 25 Dec 2019 · 1 revision

Producer 端要想使用消息轨迹,需要多配置两个配置项:

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group
  4. rocketmq.producer.enable-msg-trace=true
  5. rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

  1. @Service
  2. @RocketMQMessageListener(
  3. topic = "test-topic-1",
  4. consumerGroup = "my-consumer_test-topic-1",
  5. enableMsgTrace = true,
  6. customizedTraceTopic = "my-trace-topic"
  7. )
  8. public class MyConsumer implements RocketMQListener<String> {
  9. ...
  10. }

注意:

默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。

ACL功能

rongtong edited this page on 25 Dec 2019 · 1 revision

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group
  4. rocketmq.producer.access-key=AK
  5. rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

  1. @Service
  2. @RocketMQMessageListener(
  3. topic = "test-topic-1",
  4. consumerGroup = "my-consumer_test-topic-1",
  5. accessKey = "AK",
  6. secretKey = "SK"
  7. )
  8. public class MyConsumer implements RocketMQListener<String> {
  9. ...
  10. }

注意:

可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-keyrocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

请求 应答语义支持

rongtong edited this page on 21 Feb · 2 revisions

RocketMQ-Spring 提供 请求/应答 语义支持。

  • Producer端

发送Request消息使用SendAndReceive方法

注意

同步发送需要在方法的参数中指明返回值类型

异步发送需要在回调的接口中指明返回值类型

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. // 同步发送request并且等待String类型的返回值
  10. String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
  11. System.out.printf("send %s and receive %s %n", "request string", replyString);
  12. // 异步发送request并且等待User类型的返回值
  13. rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
  14. @Override public void onSuccess(User message) {
  15. System.out.printf("send user object and receive %s %n", message.toString());
  16. }
  17. @Override public void onException(Throwable e) {
  18. e.printStackTrace();
  19. }
  20. }, 5000);
  21. }
  22. @Data
  23. @AllArgsConstructor
  24. public class User implements Serializable{
  25. private String userName;
  26. private Byte userAge;
  27. }
  28. }
  • Consumer端

需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。

  1. @SpringBootApplication
  2. public class ConsumerApplication{
  3. public static void main(String[] args){
  4. SpringApplication.run(ConsumerApplication.class, args);
  5. }
  6. @Service
  7. @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
  8. public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
  9. @Override
  10. public String onMessage(String message) {
  11. System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
  12. return "reply string";
  13. }
  14. }
  15. @Service
  16. @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
  17. public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
  18. public void onMessage(User user) {
  19. System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
  20. User replyUser = new User("replyUserName",(byte) 10);
  21. return replyUser;
  22. }
  23. }
  24. @Data
  25. @AllArgsConstructor
  26. public class User implements Serializable{
  27. private String userName;
  28. private Byte userAge;
  29. }
  30. }

常见问题

rongtong edited this page on 25 Dec 2019 · 1 revision

  1. 生产环境有多个nameserver该如何连接?

    rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

  2. rocketMQTemplate在什么时候被销毁?

    开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。

  3. 启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

    RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGrouptopic一一对应。

  4. 发送的消息内容体是如何被序列化与反序列化的?

    RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]

  5. 如何指定topic的tags?

    RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName:前面表示topic的名称,后面表示tags名称。

    注意:

    tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

  6. 发送消息时如何设置消息的key?

    可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msgheaders来完成。示例:

    1. Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
    2. rocketMQTemplate.send("topic-test", message);

    同理还可以根据上面的方式来设置消息的FLAGWAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。

    注意:

    在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。

  7. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

    消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

    1. @Slf4j
    2. @Service
    3. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    4. public class MyConsumer2 implements RocketMQListener<MessageExt>{
    5. public void onMessage(MessageExt messageExt) {
    6. log.info("received messageExt: {}", messageExt);
    7. }
    8. }
  8. 如何指定消费者从哪开始消费消息,或开始消费的位置?

    消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

    1. @Slf4j
    2. @Service
    3. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    4. public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    5. @Override
    6. public void onMessage(String message) {
    7. log.info("received message: {}", message);
    8. }
    9. @Override
    10. public void prepareStart(final DefaultMQPushConsumer consumer) {
    11. // set consumer consume message from now
    12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    13. consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    14. }
    15. }

    同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。

  9. 如何发送事务消息?

    在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。 注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致

  10. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?

    第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。

    1. // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
    2. @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
    3. , ... // 定义其他属性,如果有必要。
    4. )
    5. public class ExtRocketMQTemplate extends RocketMQTemplate {
    6. //类里面不需要做任何修改
    7. }

    第二步: 使用这个非标RocketMQTemplate

    1. @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean.
    2. private RocketMQTemplate extRocketMQTemplate;

    接下来就可以正常使用这个extRocketMQTemplate了。

  11. 如何使用非标的RocketMQTemplate发送事务消息?

    首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明为非标的RocketMQTemplate的Bean name(若不设置则默认为标准的RocketMQTemplate),比如非标的RocketMQTemplate Bean name为“extRocketMQTemplate",则代码如下:

    1. @RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
    2. class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    3. @Override
    4. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    5. // ... local transaction process, return bollback, commit or unknown
    6. return RocketMQLocalTransactionState.UNKNOWN;
    7. }
    8. @Override
    9. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    10. // ... check transaction status and return bollback, commit or unknown
    11. return RocketMQLocalTransactionState.COMMIT;
    12. }
    13. }

    然后使用extRocketMQTemplate调用sendMessageInTransaction()来发送事务消息。

  12. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的'rocketmq.name-server'属性值 ?

    1. @Service
    2. @RocketMQMessageListener(
    3. nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
    4. topic = "test-topic-1",
    5. consumerGroup = "my-consumer_test-topic-1",
    6. enableMsgTrace = true,
    7. customizedTraceTopic = "my-trace-topic"
    8. )
    9. public class MyNameServerConsumer implements RocketMQListener<String> {
    10. ...
    11. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/49675
推荐阅读
相关标签
  

闽ICP备14008679号