当前位置:   article > 正文

Rabbitmq_rabbit配置多个消费者

rabbit配置多个消费者

Rabbitmq

null 默认交换机 direct模式

RabbitTemplate使用模板

RabbitMqConfig:消息队列配置类,包括拿到application的配置,定义队列,交换机,交换机绑定队列指定routingkey
RabbitmqSend:消息发送类,使用时直接将此自动注入然后使用,主要的方法就是将消息放入每个routkey指定的队列中
MsgReceiver:负责接受消息并处理他们

convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。
  • 1

convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。

convertAndSend(String exchange, String routingKey, Object message)
  • 1

参数1:交换机名称;参数2:路由键,这里没有使用到路由键,所以为空;参数3:发送的消息内容。

spring rabbitmq为listener配置并发消费者数量

我们的项目原来插入队列的消息相对少,随着插入队列消息的不断增多,发现队列中消息堆积越来越多,最多时高大30多万条。
寻找问题瓶颈在于队列消费者比较少,每个监听器只有一个消费者,当队列消费的速度小于队列生产者插入队列中消息个数时就造成了队列消息堆积。
查找spring AMQP文档,我们使用的版本比较低是1.1.4,现在都是1.5的版本了。
此处说的是1.1.4我们现用的版本,文档地址:http://docs.spring.io/spring-amqp/docs/1.1.4.RELEASE/reference/htmlsingle/#containerAttributes
查看Message Listener Container Configuration发现listener-container有个属性concurrency就是用来为每个listener配置并发的消费者个数的。
原文如下:

concurrency The number of concurrent consumers to start for each listener.

所以只需简单的修改配置文件就可以了,如下

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency= "2">
        <rabbit:listener ref="messageListener" queue-names="some.queue"/>
</rabbit:listener-container>
  • 1
  • 2
  • 3

配置绑定最好用代码,不要在web界面

队列最好声明在消费者,消费者消费对列为空时,会报错。

默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除

@RabbitListener和@RabbitHandler搭配使用

@RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。

springboot + rabbitmq 消费者消息确认 (Ack)

开启消息确认

消费者

channel.basicAck(deliveryTag, false);

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

channel.basicNack(deliveryTag, false, true);

deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

channel.basicReject(deliveryTag:, false);

deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

总结: 消息正确消费了,使用channel.basicAck(deliveryTag, false)进行确认。

如果消息消费过程中发生异常,如果想把消息重新放回消息队列中给其他消费者或者下次消费,

则使用 channel.basicNack(deliveryTag, false, true),或者 channel.basicReject(deliveryTag:, true);

如果消息异常,而且不想再把消息重新放入队列中

则使用 channel.basicNack(deliveryTag, false, false), channel.basicReject(deliveryTag:, false);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/386671
推荐阅读
相关标签
  

闽ICP备14008679号