当前位置:   article > 正文

基于RabbitMQ原理的自定义消息队列实现

基于RabbitMQ原理的自定义消息队列实现

文章目录

1. 什么是消息队列

与消息队列类似的是阻塞队列(Blocking Queue),它是一个生产者消费者模型 (是在一个进程内部进行的)。

而所谓的消息队列(Message Queue),就是把阻塞队列这样的数据结构,单独提取成了一个程序,进行独立部署 -> 生产者消费模型 (进程和进程之间 / 服务和服务之间);一般是部署在分布式系统上的,就是说整个服务器程序,不是一个单一的程序了,而是由一组服务器构成的集群。

生产者消费者模型作用

  1. 解耦合
  • 本来有个分布式系统,A 服务器 调用 B 服务器(A 给 B 发请求,B 给 A 返回响应),此时 A 和 B 的耦合是比较大的。
  • 引入消息队列后,A 把请求发送到消息队列,B 再从消息队列获取到请求(像写代码一样,追求的是高内聚,低耦合)。
  1. 削峰填谷
  • 比如 A 是入口服务器,A 调用 B 完成一些具体业务,如果是 A 和 B 直接通信,如果突然 A 收到一组用户的请求的峰值,此时 B 也会随着受到峰值(每个物理上的服务器,硬件资源都是有上限的,包括但不限于 CPU,内存,硬盘,网络带宽)。
  • 引入消息队列后,A 把请求发送到消息队列,B 再从消息队列获取到请求;此时虽然 A 收到很多请求,队列也收到了很多请求,但是 B 仍旧可以按照原来的节奏处理请求,不至于说一下就收到太多的并发量。
  • 举个例子:高铁火车站,进站口。 乘客好 比A ,进站口好比 B,是有限的,就需要一个队列来排队,这样不管人多少,就不会影响到乘客进站以后的坐车。

市面上一些知名的消息队列

  • RabbitMQ
  • Kafka
  • RocketMQ
  • ActiveMQ

2. 需求分析

2.1. 核心概念1

  1. 生产者(Producer)
  2. 消费者(Consumer)
  3. 中间人(Broker)
  4. 发布(Push) :生产者向中间人这里投递消息的过程
  5. 订阅(Subscribe):哪些消费者要从中间人取数据,这个注册的过程,称为 “订阅”
  6. 消费 (Consume) :消费者从中间人这里取数据的动作

一个生产者,一个消费者

img

N 个生产者,N 个消费者

img

2.2. 核心概念2

Broker server 内部也涉及一些关键概念(是为了如何进出队列)

  1. 虚拟主机(Virtual Host),类似于 MySQL 中的 database,算是一个 “逻辑” 上的数据集合。
  • 一个 Broker server 上可以组织多种不同类别数据,可以使用 Virtual Host 做出逻辑上的区分。
  • 实际开发中,一个 Broker server 也可能同时用来管理多个 业务线上的数据,就可以使用 Virtual Host 做出逻辑上的区分。
  1. 交换机(Exchange)
  • 生产者把消息投递给 Broker Server,实际上是把消息先交给了 (公司某一层楼)Broker Server 上的交换机,再由交换机把消息交给对应的队列。 (交换机类似于公司的“前台小姐姐”)。
  1. 队列(Queue)
  • 真正用来存储处理消息的实体,后续消费者也是从对应的队列中取数据。
  • 一个大的消息队列中,可以有很多具体的小队列。
  1. 绑定(Binding)
  • 把交换机和队列之间,建立关系。
  • 可以把 交换机 和 队列 视为,数据库中 多对多的关系。可以想象,在 MQ 中,也是有一个这样的中间表,所谓的 “绑定’其实就是中间表中的一项
  1. 消息(Message)
  • 具体来说,是 服务器 A 发给 B 的请求(通过MQ转发), 服务器 B 给 服务器 A 返回的响应(通过MQ转发)。
  • 一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

img

RabbitMQ 就是按照这些概念来组织的(AMQP协议),而项目的实现也是参照 RabbitMq 的。

2.3. 核心API

消息队列服务器(Broker Server),要提供的核心 API 有下面几个

  1. 创建队列(queueDeclare)
  • 此处不用 Create 这样的术语,原因是 Create 仅仅是创建;而 Declare 起到的效果是,不存在则创建,存在就啥也不做
  1. 销毁队列(queueDelete)

  2. 创建交换机(exchangeDeclare)

  3. 销毁交换机(exchageDelete)

  4. 创建绑定(queueBind)

  5. 解除绑定(queueUnbind)

  6. 发布消息(basicPublish)

  7. 订阅消息(basicConsume)

  8. 确认消息(basicAck)

  • 这个 API 起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提高整个系统的可靠性,保证消息处理没有遗漏

RabbitMQ 提供了 肯定(已读已回) 和 否定的 (已读未回)确认,此处我们项目就只有 肯定确认。
在这里的项目中,并没有搞一个 API 叫 “消费消息”,即让消费者通过这个 API 从服务器上取走消息。
对于 MQ 和 消费者之间的工作模式有两种,一是 Push(推),就是 Broker 把收到的数据,主动的发送给订阅的消费者,RabbitMQ 只支持 推 的方式;二是 Pull (拉),就是消费者主动调用 Broker 的 API 取数据,Kafka 就能支持 拉。

2.4. 交换机类型

交换机在转发消息的时候,是有一套转发规则的
提供了几种不同的 交换机类型 (ExchangType)来描述这里不同的转发规则
RabbitMQ 主要实现了四种交换机类型(也是由 AMQP 协议定义的)

  • Direct 直接交换机
  • Fanout 扇出交换机
  • Topic 主题交换机
  • Header 消息头交换机

项目中实现了前三种

  1. Direct 直接交换机
    a. 生产者发送消息时,会指定一个“目标队列”的名字(此时的 routingKey 就是 队列的名字)
    b. 交换机收到后,就看看绑定的队列里面,有没有匹配的队列
    c. 如果有,就转发过去(把消息塞进对应的队列中)
    d. 如果没有,消息直接丢弃

img

  1. Fanout 扇出交换机
    a. 会把消息放到交换机绑定的每个队列
    b. 只要和这个交换机绑定任何队列都会转发消息

img

  1. Topic 主题交换机,有两个关键概念
    a. bindingKey:把队列和交换机绑定的时候,指定一个单词(像是一个暗号一样)
    b. routingKey:生产者发送消息的时候,也指定一个单词

如果当前 bindingKey 和 routingKey 对上了,就可以把消息转发到对应的队列

img

  1. 上述三种交换机类型,就像 QQ 群发红包
  • 专属红包,直接交换机
  • 发个10块钱红包,大家都能领 10 块钱红包,扇出交换机
  • 我发个口令红包,只有输入对应口令才能领到红包,主题交换机
package com.example.mq.mqserver.core;

public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);

    private final int type;

    private ExchangeType(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.5. 持久化

上述 虚拟机、交换机、队列、绑定、消息,需要存储起来。此时内存和硬盘各存储一份,内存为主,硬盘为辅。

  • 交换机、队列、绑定:这几个部分可能会进行频繁的增删改查,使用数据库管理存储
  • 消息:存储在文件中,一是因为消息是不需要进行复杂的增删改查,二是数据库性能也有限

在内存中存储的原因:

对于 MQ 来说,能够高效的转发处理数据,是非常关键的指标! 因此对于使用内存来组织数据,得到的效率,就比放硬盘要高很多。

在硬盘中存储原因:

为了防止内存中数据随着进程重启 / 主机重启而丢失。

2.6. 网络通信

其他的服务器(生产者 / 消费者)通过网络,是要和 Broker Server 进行交互的。

此处设定,使用 TCP + 自定义的应用层协议 实现 生产者 / 消费者 和 BrokerServer 之间的交互工作

应用层协议主要工作:就是让客户端可以通过网络,调用 brokerserver 提供的编程接口

img

因此,客户端这边也要提供上述 API,只有服务器是真正干实事的;客户端只是发送 / 接受响应

img

客户端的本地方法调用,实际上就好像调用了一个远端服务器的方法一样 (远程过程调用 RPC,可以视为是编写客户端服务器程序,通信过程的一种设计思想)。

客户端除了提供上述 9 个和服务器方法对应的方法之外,还需要提供 4 个 额外的方法,支撑其他工作

  1. 创建 Connection
  2. 关闭 Connection
  • 此处用的 TCP 连接,一个 Connection 对象,就代表一个 TCP连接
  1. 创建 Channel
  • 一个 Connection 里面包含多个 Channel,每个 Channel 上传输的数据都是互不相干的
  • TCP 中,建立 / 断开一个连接,成本挺高的,因此很多时候不希望频繁建立断开 TCP 连接
  • 所以定义一个 Channel ,不用的时候,销毁 Channel,此处 Channel 是逻辑概念,比 TCP 轻量很多
  1. 关闭 Channel

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/811376?site
推荐阅读
相关标签