当前位置:   article > 正文

全面认识RabbitMQ各组件及消息传递机制_validatequeuenamelength

validatequeuenamelength

一、virtual host虚拟主机

每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。一个Broker里可以开有多个VirtualHost,它的作用是用作不同用户的权限分离。 这个特性在做多租户的时候较方便

1.vhost特性

(1)RabbitMQ默认的vhost是“/”开箱即用;

(2)多个vhost是隔离的,多个vhost无法通讯,并且不用担心命名冲突(队列和交换器和绑定),实现了多层分离;

(3)创建用户的时候必须指定vhost;

2.vhost操作

可以通过rabbitmqctl工具命令

创建vhost:

rabbitmqctl add_vhost[vhost_name]

删除vhost:

rabbitmqctl delete_vhost[vhost_name]

查看所有的vhost:

rabbitmqctl list_vhosts

二、connection连接

是Publisher/Consumer和Broker之间的TCP连接。

断开连接的操作只会在Publisher/Consumer端进行,Broker不会断开连接,除非出现网络故障或者Broker服务出现问题,Broker服务宕了。

三、channel信道

是建立在真实的TCP连接内的虚拟连接(是我们与RabbitMQ打交道的最重要的一个接口)。仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的,需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。AMQP的命令都是通过信道发送出去的(我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等)。

每条信道都会被指派一个唯一ID。在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。

注意

  1. 一个生产者或一个消费者与MQ服务器之间只有一条TCP连接

  1. RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

四、exchange(交换机)

生产者发消息发往交换机,交换机会自己投递消息到绑定的queue队列

这里有几个点需要注意

1.exchange根据什么规则把消息投递到哪些queue中?

exchange有4种类型

direct

对routing_key进行严格匹配,当消息来到的时候,只有exchange与某queue绑定的routing_key完全匹配才将消息投递到该queue

topic

对routing_key进行通配符模糊匹配,满足条件的queue都能收到消息,这里的routing_key以"."分隔,*匹配一个单词,#匹配多个单词,如果同一个queue满足多个条件不会被投递多次

headers

根据消息体内的headers属性匹配,绑定的时候可以制定键值对。不依赖routing_key匹配

fanout

转发消息到所有绑定队列,不依赖routing_key匹配。在不需要路由的时候,一般是使用的这个类型的exchange

x-delay-message

Rabbitmq实现了一个rabbitmq_delayed_message_exchange插件来实现延时队列x-delay-message。

2.exchange如何持久化?

如果不设置持久化,broker挂了,再重启,这个exchange就不存在了。

在客户端声明exchange的时候有个入参来控制是否持久化

Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

durable即是是否持久化,而autoDelete则是,当没有queue绑定的时候是否自动删除这个exchange

3.生产者ACK机制

rabbitmq生产者确保消息一定送达(事务或者confirm机制)

4.投递方法(basicPublish)中的mandatory和immediate

mandatory

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);

当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

immediate

当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者,那么这条消息不会放入队列中。

当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

换句话说,无法找到一个消费者时,消息返还给生产者。

五、queue队列

消息队列,先进先出,有缓存消息的能力

1.有3种类型的队列

持久化队列

这样消息会落盘,没有消费的消息重启后不会丢

临时队列

则没有持久化,堆积的数据在rabbitmq重启后会丢失

自动队列

当没有消费者消费这个队列的时候,队列会自动删除

2.队列如何持久化?

在channel声明队列的时候可以设置

  1. public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
  2.     validateQueueNameLength(queue);
  3.     return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
  4. }

durable

是否持久化队列。

exclusive

是否为独占(排他)队列。

exclusive队列的特点是:只对首次声明它的连接(Connection)可见,注意,是首次声明它的connection不是channel。会在其连接断开的时候自动删除。

autoDelete

是否自动删除,如果为是,当没有消费者消费这个队列的时候,这个队列会被自动删除。

注意,这里只是说持久化队列,持久化队列在rabbitmq重启后依旧存在,如果需要未消费的消息在重启后依旧存在,还需要持久化消息。

3.是否支持消息回溯?

不支持。rabbitmq不像kafka那样,并没有偏移量。rabbitmq即使是持久化队列+持久化消息,在被消费后该数据会被标记为删除,等待回收。

4.mirror queue镜像模式 (高可用)

RabbitMQ 以队列维度提供高可用的解决方案——镜像队列。

配置镜像队列规则后,新创建的队列按照规则成为镜像队列。每个镜像队列都包含一个主节点(Leader)和若干个从节点(Follower),其中只有主节点向外提供服务(生产消息和消费消息),从节点仅仅接收主节点发送的消息。

从节点会准确地按照主节点执行命令的顺序执行动作,所以从节点的状态与主节点应是一致的。

5.消费者ACK机制

  1. try{
  2.     //设置broker每次只从推送队列里面的1条消息到消费者,只有在确认这条消息"成功消费"后,才会继续推送
  3. channel.basicQos(0, 1, false);
  4. //成功确认 是否批量,true:将一次性ack所有小于deliveryTag的消息。
  5. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  6. } catch (Exception e) {
  7. String productIdStr = new String(message.getBody());
  8. String productIdStrReplace = productIdStr.replace("\"", "");
  9. log.error("【MQ消费异常】 参数:{},异常信息:{}", productId, e.getMessage(), e);
  10. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  11. }

6.持久化消息怎么搞

如果只是持久化队列,没有持久化消息,那么重启后,队列存在,消息不存在了。

持久化消息的设置在channel.basicPublish方法的入参中

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
  3. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

入参中的 BasicProperties props用于设置消息是否持久化

  1. public BasicProperties(
  2.             //消息类型如:text/plain
  3. String contentType,
  4.             //编码
  5. String contentEncoding,
  6. Map<String,Object> headers,
  7.             //消息是否持久化 1:nonpersistent 2:persistent
  8. Integer deliveryMode,
  9.             //优先级
  10.             Integer priority,
  11. String correlationId,
  12.             //反馈队列
  13. String replyTo,
  14.             //expiration到期时间
  15. String expiration,
  16. String messageId,
  17. Date timestamp,
  18. String type,
  19. String userId,
  20. String appId,
  21. String clusterId)

deliveryMode=1表示不需要持久化消息,deliveryMode=2表示需要持久化消息

7.消息是怎么分发给消费者的?

Round-robin dispatching 循环分发(默认)

若存在多个consumer,每个consumer的负载可能不同,有些处理的快,有些处理的慢,RabbitMQ并不管这些,只是简单的以round-robin的方式分配message,这可能造成某些consumer积压很多任务处理不完,而一些consumer长期处于饥饿状态

Fair dispatch 公平分发

通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它

channel.basic_qos(prefetch_count=1) 

注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

RabbitMQ注解

RabbitAdmin 是对 rabbitmq 管理的封装,比如对交换机,队列,绑定的定义

RabbitTemplate 是对消息的发送和接收的封装

@RabbitListener

指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。

@QueueBinding

将交换机和队列绑定

key = {"item.insert", "item.update"} 返回绑定的路由密钥或模式,多个元素将导致多个绑定

@Queue

声明队列 (durable = "true" 表示持久化的)

@Exchange

声明交换机(type = ExchangeTypes.TOPIC 表示交换机类型)

创建Connection

  1. @Data
  2. public abstract class ApiAbstractRabbitConfiguration {
  3. protected String addresses;
  4. protected String username;
  5. protected String password;
  6. protected String virtualhost;
  7. protected ConnectionFactory connectionFactory() {
  8. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  9. connectionFactory.setAddresses(addresses);
  10. connectionFactory.setUsername(username);
  11. connectionFactory.setPassword(password);
  12. connectionFactory.setVirtualHost(virtualhost);
  13. return connectionFactory;
  14. }
  15. }

初始化RabbitTemplate

  1. @Bean(name = "mtApiRabbitTemplate")
  2. @Primary
  3. public RabbitTemplate mtRabbitTemplate(@Qualifier("mtApiConnectionFactory") ConnectionFactory connectionFactory) {
  4. return new RabbitTemplate(connectionFactory);
  5. }

设置RabbitListener为手动确认消息

  1. @Bean(name = "mtApiFactory")
  2. public SimpleRabbitListenerContainerFactory mtFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("mtApiConnectionFactory") ConnectionFactory connectionFactory) {
  3. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  4. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  5. configurer.configure(factory, connectionFactory);
  6. return factory;
  7. }

绑定队列

  1. @Bean
  2. public Binding ordinaryOrderAutoCancelBindingNotify() {
  3. return BindingBuilder.bind(ordinaryOrderAutoCancelImmediateQueue()).to(apiDelayExchange()).with(DELAY_ROUTING_KEY_ORDINARY_ORDER_AUTOCANCEL).noargs();
  4. }

初始化RabbitAdmin,声明队列、交换机、绑定

  1. @Bean(value = "mtRabbitApi")
  2. public RabbitAdmin mtRabbitAdmin(@Qualifier("mtApiConnectionFactory") ConnectionFactory connectionFactory) {
  3. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  4. rabbitAdmin.declareQueue(ordinaryOrderAutoCancelImmediateQueue());
  5. rabbitAdmin.declareExchange(apiDelayExchange());
  6. rabbitAdmin.declareBinding(ordinaryOrderAutoCancelBindingNotify());
  7. return new RabbitAdmin(connectionFactory);
  8. }

发送消息

  1. public void sendAutoCancelMessage(String msg, int delayTime,String flag) {
  2. try {
  3. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  4. log.info(String.format("sendDelay.sendOrdinaryAutoCancelMessage.id=%s.msg=%s", correlationData.getId(), msg));
  5. this.rabbitTemplate.convertAndSend(ApiMTRabbitConfiguration.DELAYED_EXCHANGE_XDELAY, ApiMTRabbitConfiguration.DELAY_ROUTING_KEY_ORDINARY_ORDER_AUTOCANCEL, msg, message -> {
  6. //持久化消息
  7. message.getMessageProperties().setHeader("source", flag);
  8. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  9. message.getMessageProperties().setDelay(delayTime);
  10. log.info("Delay sent.time:{}", LocalDateTime.now());
  11. return message;
  12. }, correlationData);
  13. } catch (AmqpException e) {
  14. log.error("订单自动取消MQ异常,errorMessage:{}", e.getMessage(), e);
  15. }
  16. }

接收消息

  1. @RabbitListener(queues = ApiMTRabbitConfiguration.ORDINARY_ORDER_AUTOCANCEL_QUEUE_XDELAY_QUEUE_XDELAY, containerFactory = "mtApiFactory")
  2. public void ordinaryOrderAutoCancelMessage(Message message, Channel channel,
  3.     @Headers Map<String, Object> heads) throws IOException {
  4.    
  5. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/52809
推荐阅读
相关标签
  

闽ICP备14008679号