赞
踩
在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:
上述场景经常产生死信,即消息在这些场景中时,被称为死信。
死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。
那么,我们到底如何来使用死信队列呢?
在 RabbitMQ 中,死信队列的标识为 x-dead-letter-exchange ,通过观察死信队列的标识,我们不难发现,其标识最后为 exchange ,即 RabbitMQ 中的交换机,RabbitMQ 中的死信队列就是由死信交换机而得出的。
要想使用死信队列,我们需要首先声明一个普通的消息队列,并将死信队列的标识绑定到这个普通的消息队列上, 这个过程需要我们在生产端进行配置,代码如下所示:
// 使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("xx"); connectionFactory.setPort("5672"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 使用建立好的连接,来创建了一个频道 channel 。 Channel channel = connection.createChanel(); // 声明了一个普通队列的额外参数的 Map , // 这个 Map 的 key 就是死信队列的标识,value 就是我们后续声明的真正的死信交换机的名称。 Map<String, Object> argumentsMap = new HashMap(); argumentsMap.put("x-dead-letter-exchange", "dlx_exchange"); /* 使用 channel 的 exchangeDeclare 方法和 queueDeclare 方法, 分别声明了一个名为 dlx_common_exchange 的交换机和名为 dlx_common_queue 的普通消息队列, 之所以名称中有 common ,是因为要对这个交换机和队列做一个标识,表示该交换机和队列是绑定了死信队列的。 */ channel.exchangeDeclare("dlx_common_exchange", "direct", true, false, null); channel.queueDeclare("dlx_common_queue", true, false, false, argumentsMap); /* 使用 channel 的 queueBind 方法来讲声明的普通交换机和消息队列进行绑定,并且制定了 routingKey , 这样消息就可以经 dlx_common_exchange 根据 routingKey 来路由到 dlx_common_queue 中。 */ channel.queueBind("dlx_common_queue", "dlx_common_exchange", routingKey);
声明了要绑定死信队列的普通队列之后,最后我们需要声明真正的死信队列
// 使用 chanel 的 exchangeDeclare 方法来声明了一个名为 dlx_exchange 的交换机。
channel.exchangeDeclare("dlx_exchange", "direct", true, false, null);
// 使用 channel 的 queueDeclare 方法来声明了一个名为 dlx_queue 的队列。
channel.queueDeclare("dlx_queue", true, false, false, null);
// 使用 channel 的 queueBind 方法,来将 dlx_exchange 的交换机与 dlx_queue 队列进行了绑定。
channel.queueBind("dlx_queue", "dlx_exchange", routingKey);
在生产环境中,使用MQ的时候设计两个队列:
比如说消费消息的时候,数据库故障了,此时无法将数据落盘,那么消费者每次消费一条消息,尝试落盘持久化的时候,都会遇到数据库报错。此时消费者就可以把这条消息拒绝访问,或者标志位处理失败!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。