当前位置:   article > 正文

RabbitMQ高级特性 - 消息分发(限流、负载均衡)

RabbitMQ高级特性 - 消息分发(限流、负载均衡)

RabbitMQ 消息分发


概述

RabbitMQ 的队列在有多个消费者订阅时,默认会通过轮询的机制将消息分发给不同的消费者,但是有些消费者消费速度慢,有些消费者消费速度快,就会导致消费速度慢的消费者影响整个的任务的吞吐量下降.

例如,公司有1个正式员工和1个实习生,现在有 10 个任务分配平均给他们(各 5 个),而由于实习生干活比较慢,就会导致整个完成任务的吞吐量下降.

消息分发机制给 “正式工” 多分一些任务,给 “实习生” 少分一些任务.

如何实现消费分发机制(限制每个队列消息数量)

可以在配置文件中配置 prefetchCount(或者使用原生的 channel.basicQos(int prefetchCount)),来限制当前消息通道上(channel)的每一个消费所能保持的最大未确认消息的数量.

例如 prefetchCount 为 10,并且一个 channel 上有两个消费者,那么每个消费者都最多接收 10 条未确认的消息. 此时整个 channel 上未确认消息总数可能达到 20 条.

具体使用:例如配置 prefetch = 5,那么 RabbitMQ 就会为消费者计数. 发送一条消息计数+1,消费一条消息计数-1,当达到了上限5,mq队列 就不会再发送消息,直到消费者确认了某条消息(类似 TCP 中的华滑动窗口).

使用场景

限流

背景

假设,订单系统每秒最多处理 1000 请求,正常情况下,该订单系统可以满足日常使用.
但是在突发的秒杀场景下,请求瞬间增多,每秒 1w qps,这不得把订单系统打成筛子.

问题:mq 在中间的话,不是已经有削峰填谷的作用了么?为什么还要使用 mq 的 prefetch 限流机制?
尽管消息队列可以延缓高峰压力,但消费者的处理能力还是有限的(如果不配置 prefetch,消费者自身从队列中取消息的量是不可控的). 如果消费者一次性取走过多的消息,就可能会导致资源紧张. prefetch 限流就是用来控制每个消费者取消息的数量,确保消费者不会过载.

实现 demo

假设限制未确认消息上限为 5,发送消息数量为 20.

a)配置 prefetch 参数,设置应答方式为手动应答.

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

b)配置交换机队列

@Configuration
class MQConfig {

    @Bean
    fun transQueue() = Queue(MQConst.TRANS_QUEUE)

    @Bean
    fun qosExchange() = DirectExchange(MQConst.QOS_EXCHANGE)
    @Bean
    fun qosQueue() = Queue(MQConst.QOS_QUEUE)
    @Bean
    fun qosBinding(): Binding = BindingBuilder
        .bind(qosQueue())
        .to(qosExchange())
        .with(MQConst.QOS_BINDING_KEY)

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

c)接口(生产者)

@RestController
@RequestMapping("/mq")
class MQApi(
    val rabbitTemplate: RabbitTemplate,
) {
    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

d)消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            // 这里不主动应答,模拟超长业务
            // channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

e)效果如下:
可以观察到,消费者只接收到 5 个消息,但由于没有主动应答,队列 就不会给消费者发送新的消息.
在这里插入图片描述
在这里插入图片描述

Ps:此时如果直接关闭程序,这 5 个为应答的消息就会重回队列,成为 Ready 状态.
如下可以直接清理掉这些消息:
在这里插入图片描述

非公平发送(负载均衡)

背景

假设有两个消费者,mq 默认会按照轮询的策略将消息分发给消费者.

*但有一个中情况就比较尴尬:打个比方 一个是正式工,另一个是实习生,正式工就处理的很快,而实习生就很慢,就会造成整个任务的进度被拖慢. *

因此我们可以通过 负载均衡 的方式,让处理的快的消费者多处理一些,处理慢的消费者少处理一些.

具体的:只需要配置 prefetch,并开启自动应答即可. 这样一来,处理的快的消费者,自动应答的就更快,接收的消息也就更多.

实现 demo

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        prefetch: 1 # 具体配置为多少,需要根据实际业务以及系统承受能力(压测)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

b)生产者

    @RequestMapping("/qos")
    fun qos(): String {
        for (i in 1..20) {
            rabbitTemplate.convertAndSend(MQConst.QOS_EXCHANGE, MQConst.QOS_BINDING_KEY, "qos msg $i")
        }
        return "ok"
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

c)两个消费者

@Component
class QosListener {

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun fastHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(1000)
            println("正式工: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

    @RabbitListener(queues = [MQConst.QOS_QUEUE])
    fun slowHandMessage(
        message: Message,
        channel: Channel
    ) {
        val deliverTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, charset("UTF-8"))}, ${message.messageProperties.messageId}")
            Thread.sleep(2000)
            println("实习生: 任务处理完成!")
            channel.basicAck(deliverTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliverTag, false, true)
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

d)效果如下:
在这里插入图片描述

在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/928388
推荐阅读
相关标签
  

闽ICP备14008679号