当前位置:   article > 正文

一文搞懂Spring AMQP

convertandsend(object message, messagepostprocessor messagepostprocessor)

1. RabbitAdmin

  • rabbitAdmin能够完成队列,交换器的声明创建、删除、清空等操作。

  • 使用config配置的方式创建如下:

  1. @Bean
  2. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory ) {
  3. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  4. rabbitAdmin.setAutoStartup(true);
  5. return rabbitAdmin;
  6. }
  • 简单使用:创建一个direct交换器,绑定了一个队列

  1. @Test
  2. public void test1(){
  3. DirectExchange direct_1 = new DirectExchange("direct_1", true, false);
  4. rabbitAdmin.declareExchange(direct_1);
  5. Queue direct_q_1 = new Queue("direct_q_1", true);
  6. rabbitAdmin.declareQueue(direct_q_1);
  7. Binding binding = BindingBuilder.bind(direct_q_1).to(direct_1).with("direct");
  8. rabbitAdmin.declareBinding(binding);
  9. }

2. MessageConvert

  • 消息转换器,在发送消息和接收消息的时候将消息内容转换成指定的格式。

  • 默认的消息转换器是SimpleMessageConverter,此转换器的功能就是将发送的消息体转换成字节数组(Object,String,Serializable),rabbitTemplate中会用到消息转换器的方法如下:

  1. void convertAndSend(Object message) throws AmqpException;
  2. void convertAndSend(String routingKey, Object message) throws AmqpException;
  3. void convertAndSend(String exchange, String routingKey, Object message)
  4.     throws AmqpException;
  5. void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
  6.     throws AmqpException;
  7. void convertAndSend(String routingKey, Object message,
  8.     MessagePostProcessor messagePostProcessor) throws AmqpException;
  9. void convertAndSend(String exchange, String routingKey, Object message,
  10.     MessagePostProcessor messagePostProcessor) throws AmqpException;
  11. Object receiveAndConvert() throws AmqpException;
  12. Object receiveAndConvert(String queueName) throws AmqpException;
  • 实现自己的消息转换器后调用rabbitTemplate的API(public void setMessageConverter(MessageConverter messageConverter))设置即可。

  • 在与SpringBoot整合时,可以注入自己的消息转换器,amqp提供了Jackson2JsonMessageConverter,使用JackSon将消息内容转换为json字符串,配置如下:

  1. /**
  2. * 注入JackSon的MessageConverter,用于收发消息的格式化成json数据
  3. * @param ObjectMapper 这个是jackson自动注入的,详情请看JacksonAutoConfiguration
  4. */
  5. @Bean
  6. public Jackson2JsonMessageConverter messageConverter(ObjectMapper ){
  7. return new Jackson2JsonMessageConverter(objectMapper);
  8. }
  9. /**
  10. * 重新注入RabbitTemplate,并且设置相关属性
  11. */
  12. @Bean
  13. @Primary
  14. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter, CustomConfirmCallBack confirmCallBack, CustomReturnCallBack returnCallBack){
  15. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  16. template.setMandatory(true);
  17. //设置消息转换器
  18. template.setMessageConverter(messageConverter);
  19. template.setReturnCallback(returnCallBack);
  20. template.setConfirmCallback(confirmCallBack);
  21. return template;
  22. }

3. TTL

  • TTL表示消息或者队列的生命周期,在消息发送或者队列创建的时候可以设置消息的存活时间,如果此条消息或者队列中的所有消息到达指定时间的还是没有被消费,那么消息将会被清空或者存入死信队列中。

3.1 消息TTL
  • 在发送消息的时候指定的TTL(MessageProperties)API如下:

    • public void setExpiration(String expiration):单位毫秒

3.2 队列TTL
  • 在创建队列的时候指定过期时间,属性名称是x-message-ttl,设置在argument

4. 生产端消息ack和nack

  • 消息确认机制,生产者发送消息可能因为网络、交换机不存在等其他问题导致消息投递失败,消息ack机制可以在消息投递之后针对失败或者成功做一些业务的处理。

  • ConfirmCallback保证的是消息能否到达exchange,只要是成功到达了交换机,ack为true

4.1 实现步骤

  • 设置connectionFacotry的发布确认模式为ConfirmType.CORRELATED,代码如下:

  1. //设置消息发送ack,默认none
  2. connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
  • 自定义RabbitTemplate.ConfirmCallback的实现类,重写其中的方法,如下:

  1. /**
  2. * @Description 消息确认回调,在消息发出之后回调
  3. * @Author CJB
  4. * @Date 2020/2/21 15:36
  5. */
  6. @Component
  7. public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
  8. /**
  9. *
  10. * @param correlationData 发送消息时携带的参数,在业务上能够唯一识别,比如主键id等
  11. * @param ack 消息是否发送成功的标志,true成功,false失败
  12. * @param cause 消息发送失败的原因
  13. */
  14. @Override
  15. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  16. System.err.println(correlationData.getId()+"---->"+ack+"--->"+cause);
  17. //消息投递失败执行逻辑,比如消息入库,设置失败标记等操作
  18. if (!ack){
  19. System.err.println("消息投递失败");
  20. }
  21. }
  • 在RabbitTemplate中设置自定义的ack回调的类:

template.setConfirmCallback(myConfirmCallback);

5. 生产端的消息Return机制

  • 用于处理一些路由不可达的消息,比如发送消息时指定的路由投递不到相应的队列,此时ReturnListener就会监听到这些消息进行处理。

5.1 实现步骤

  • 设置ConnectionFactorypublisherReturns为true:

  1. //设置开启发布消息的Return监听
  2. connectionFactory.setPublisherReturns(true);

  • 设置RabbitTemplatemandatory为true,或者mandatory-expression执行的结果为true。

template.setMandatory(true);
  • 自定义实现RabbitTemplate.ReturnCallback

  1. /**
  2. * @Description ReturnListener的监听,处理发送消息时路由不可达的消息
  3. * @Author CJB
  4. * @Date 2020/2/21 17:04
  5. */
  6. @Component
  7. public class MyReturnCallBack implements RabbitTemplate.ReturnCallback {
  8. /**
  9. * 在消息路由不可达会回调此方法,用于处理这些消息,比如记录日志,消息补偿等等操作
  10. * @param message 投递的消息
  11. * @param replyCode 响应的状态吗
  12. * @param replyText 响应的文本
  13. * @param exchange 交换机
  14. * @param routingKey 路由键
  15. */
  16. @Override
  17. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  18. System.err.println("message:"+new String(message.getBody()));
  19. System.err.println("replyCode:"+replyCode);
  20. System.err.println("replyText:"+replyText);
  21. System.err.println("exchange:"+exchange);
  22. System.err.println("routingKey:"+routingKey);
  23. }
  24. }

  • 在RabbitTemplate中设置自己实现的监听器:

template.setReturnCallback(myReturnCallBack);

6. 消费端的消息异步监听

  • 异步监听消息需要设置一个监听器,一旦监听的队列中有消息发送,此监听器将会起作用。

6.1 实现步骤

  • 注入SimpleMessageListenerContainer

  1. @Bean
  2. public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  4. //添加监听的队列
  5. container.addQueueNames("queue1");
  6. //设置消费者ack消息的模式,默认是自动,此处设置为手动
  7. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  8. //设置消费者的并发数量
  9. container.setConcurrentConsumers(1);
  10. //设置单个消费请求能够处理的消息条数,默认250
  11. container.setPrefetchCount(250);
  12. //设置最大的并发数量
  13. container.setMaxConcurrentConsumers(10);
  14. //设置消费者的tag的生成策略,队列的名字+"_"+UUID
  15. container.setConsumerTagStrategy(queue -> queue+"_"+ UUID.randomUUID().toString());
  16. //设置消息监听器
  17. container.setMessageListener(customMessageListener1());
  18. return container;
  19. }
  • 自定义一个消息监听器MessageListener的实现类,此处有两个接口:

    • MessageListener:实现该接口,重写其中的方法,不过此种的实现没有channel对象。

    • ChannelAwareMessageListener:其中重写的方法除了Message对象,还提供了Channel对象,用于手动ack等操作。

  1. /**
  2. * 自定义Message监听器
  3. * @return
  4. */
  5. @Bean
  6. public MessageListener customMessageListener(){
  7. return msg-> System.err.println("消费者:"+new String(msg.getBody()));
  8. }
  9. @Bean
  10. public ChannelAwareMessageListener customMessageListener1(){
  11. return (msg,chanel)->{
  12. long deliveryTag = msg.getMessageProperties().getDeliveryTag();
  13. try{
  14. System.err.println("message:"+new String(msg.getBody()));
  15. System.err.println("properties:"+ deliveryTag);
  16. //.....执行系列的逻辑
  17. //逻辑顺利执行完成之后执行ack
  18. chanel.basicAck(deliveryTag,false);
  19. }catch (Exception ex){
  20. //记录日志等操作
  21. //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
  22. chanel.basicNack(deliveryTag,false,false);
  23. }
  24. };
  25. }

7. 消费端的并发

  • 默认一个队列只有一个消费者监听,但是我们可以同时设置多个消费者监听这个消息,提高消息消费的效率。

  • SimpleMessageListenerContainer中的两个属性可以完成设置,如下:

    • concurrentConsumers:消费者的数量,默认1

    • maxConcurrentConsumers:最大消费者的数量。

8. 消费端限流(流量削峰)

  • 假设rabbitmq服务器有上万条信息没有处理,当开启一个消费端的话,那么就有可能出现服务器卡死的情况。

  • Rabbitmq提供了一种qos(服务质量保证)功能,即在非确认消息的前提下(手动确认消息),如果一定数目的消息(基于consumer或者channel的qos的设置)未被确认前(没有ack或者nack),不进行消费新的消息。

  • amqp实现如下:

    • SimpleMessageListener中有一个属性prefetchCount,该属性用来限制消费端的同时处理的请求,默认是250,使用spring AMQP直接设置即可,与SpringBoot整合,配置如下:

  1. spring:
  2. rabbitmq:
  3.   listener:
  4.     simple:
  5.       prefetch: 1

9. 消费端的 消息ack

  • 默认是自动ack的,即是在接收到这条消息之后无论有没有正确消费,这条消息都会从队列中删除。当然可以设置手动ack,即是在消费者接收消息,正确处理完成之后,手动确认ack,那么此条消息才会从队列中删除。

9.1 API(Channel)

  • void basicAck(long deliveryTag, boolean multiple):ack消息

    • ‍deliveryTag:Message中的属性

    • multiple:是否批量ack消息

  • void basicNack(long deliveryTag, boolean multiple, boolean requeue):nack消息

    • requeue:是否重回队列,如果设置了重回队列,那么这条消息会被重新进入队列中的最后一条消息,如果设置了false并且此队列设置了死信队列,那么将会被放入死信队列中。   

9.2 实现步骤

  • 设置消费者的确认模式为手动确认,使用的是SimpleMessageListenerContainer的API

  1. //设置消费者ack消息的模式,默认是自动,此处设置为手动
  2. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  
  • 消息异步监听的实现类是ChannelAwareMessageListener,通过自己的业务逻辑判断何时需要ack何时需要nack

  1. @Bean
  2. public ChannelAwareMessageListener customMessageListener1(){
  3. return (msg,chanel)->{
  4. long deliveryTag = msg.getMessageProperties().getDeliveryTag();
  5. try{
  6. System.err.println("message:"+new String(msg.getBody()));
  7. System.err.println("properties:"+ deliveryTag);
  8. //.....执行系列的逻辑
  9. //逻辑顺利执行完成之后执行ack
  10. chanel.basicAck(deliveryTag,false);
  11. }catch (Exception ex){
  12. //记录日志等操作
  13. //消息执行出现异常,nack,设置不重回队列,如果设置了死信队列,那么将会到死信队列中
  14. chanel.basicNack(deliveryTag,false,false);
  15. }
  16. };
  17. }

10. 消息重回队列

  • 重回队列的机制即是消息在nack之后如果设置了重回队列,那么此条消息将会被重新放入到此队列中的最后一条,之后将会被重新投递到消费端消费。

  • 重回队列的机制并不支持使用,如果是业务逻辑上的异常导致消息重回队列,那么重新消费也是没有多大意义。在实际的工作上可以采用补偿机制解决。

  • 此处不再说明实现的方式,个人认为很鸡肋的功能。

11. 死信队列

  • 消息变成死信的情况如下(前提:消息所在队列设置了死信队列):

    • 消息被拒绝(nack/reject)并且requeue=false(不设置重回队列)

    • 消息的TTL过期

    • 队列达到最大长度

  • 死信队列在rabbitmq中其实是一个exchange,只是普通的交换机和队列。

  • 想要消息被拒绝或者过期之后能够回到死信队列中,需要在队列声明的时候添加一个x-dead-letter-exchange,指定死信的交换机

  1. String exchange="a_exchange";
  2. String queueName="a_queue";
  3. TopicExchange topicExchange = new TopicExchange(exchange, true, true);
  4. Map<String,Object> arguments=new HashMap<>();
  5. //指定死信队列,dlx-exchange是死信交换机
  6. arguments.put("x-dead-letter-exchange", "dlx-exchange");
  7. //设置死信队列的路由键,需要根据这个路由键找到对应的队列
  8. arguments.put("x-dead-letter-routing-key", "dlx-key");
  9. Queue queue = new Queue(queueName, true, false, false, arguments);
  10. Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("test.#");
  11. rabbitAdmin.declareQueue(queue);
  12. rabbitAdmin.declareExchange(topicExchange);
  13. rabbitAdmin.declareBinding(binding);
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号