当前位置:   article > 正文

SpringBoot项目:RabbitMQ&MongoDB整合后台消息模块_mongo和mq消息管理设计方案

mongo和mq消息管理设计方案

RabbitMQ&MongoDB整合后台消息模块

1. 消息模块设计原理

消息和用户是一对一关系,例如通过一对一才能知道哪个消息被用户已读,但MYSQL是无法支持海量数据库存储的。所以我们选择使用MongoDB存储消息记录(海量低价值的数据),MongoDB没有表结构,只有集合。message存储消息主体(正文、日期、发送人),message_re存储接收人(接收人、用户是否阅读等数据)

如果瞬间写入海量记录,数据库正常的CRUD会受到影响,我们需要使用消息队列实现削峰填谷,即把消息发送到消息队列(RabbitMQ),当用户登陆时再把消息提取出来保存到MongoDB。

消息日积月累,MongoDB也会有撑不下来的时候,我们需要搞冷热数据分离,热数据定期归档,根据数据被使用的频率可以划分为热数据和冷数据,例如,一年内的数据被看做事热数据,超过一年的数据被当作冷数据。每天定期把冷数据从MongoDB_1转移到MongoDB_2,这样MongoDB_1的数据量减少就提高了速度,MongoDB_2存放数据量很大,但冷数据很少被使用,仅仅充当归档库而已。冷数据定期销毁,释放存储空间。假设五年以上的冷数据就被当成超期数据,删除超期数据之后归档库的空间就增加了。

2. 消息发送和收取

引入消息队列MQ,在Web系统中,先CRUD消息主体(不包含接收人等信息)到MongoDB,用异步多线程把消息的ref集合发送到消息队列。每一个消息的topic则对应一个用户ID。

当用户登录系统后,Web系统利用异步线程方式从消息队列中把消息接收回来,把ref消息写入到MongoDB里。

3. RabbitMQ入门

RabbitMQ与Kafka不同,RabbitMQ既支持消息异步收发,又支持同步收发。能适应各种业务场景的优点便显现出来了。

将rabbitmq.tar.gz文件上传到CentOS系统,把镜像导入Docker环境

docker load < rabbitmq.tar.gz
  • 1

创建rabbitmq容器

docker run -it -d --name mq -p 15672:15672 -p 5672:5672 rabbitmq
  • 1
3.1 五种队列模式
3.1.1 简单模式

在这里插入图片描述

一个生产者(发送方)对应一个消费者(接收方),生产者和消费者必须是一对一的关系。

3.1.2 Work模式

在这里插入图片描述

一个生产者对应多个消费者,但只有一个消费者能获得消息,具有排他性。

3.1.3 发布/订阅模式

在这里插入图片描述

一个生产者首先将消息发送到fanout交换器,交换器绑定多个队列,然后与之对应的所有消费真呢个收到消息,不具有排他性。

3.1.4 路由模式

在这里插入图片描述

生产者将消息发送到direct交换器,交换器按照关键字key把消息路由到某个队列,即符合规则才会转发到该队列。

3.1.5 主题模式

在这里插入图片描述

生产者将消息发送到topic交换器,交换器按照复杂的规则将消息路由到某个队列

3.2 消息持久化

消息的可靠性是RabbitMQ的特色,可靠性由消息持久化实现,可以防止在异常情况下丢失数据,交换器和队列都能持久化。

3.3 消息过期时间

默认情况下,消息是无限期存储在RabbitMQ上的,但我们可以设置过期时间,到期后无论是否已经被接受都会被RabbitMQ删除。

3.4 ACK应答

消费者接收消息之后,必须返回一个Ack应答,那么RabbitMQ才会认为这个消息接收成功,如果想要删除这条消息,消费者发送Ack应答的时候,附带一个deliverTag标志位就可以了。

3.5 同步接收和异步接收(针对小程序)

异步接收消息消耗的系统资源较少,但是微信小程序和后端项目之间并不是长连接,后端项目异步方式接收到队列中的消息也无法推送移动端的小程序。所以后端的Java项目采用同步的方式接收队列的消息,在移动端,我们创建定时器,向后端Java项目发出轮询请求,后端项目接收到轮询秦桧后,用同步方式接收队列的消息,然后把消息存储在MongoDB上面。

4. 消息模块数据模型设计

4.1 创建POJO映射类

MongoDB中没有数据表的概念,而是采用集合Collection存储数据,每一个数据就是一个Document,文档结构其实就是我们常用的JSON。

4.1.1 Message集合
字段 类型 备注
_id UUID 自动生成的主键值
uuid UUID UUID值,并设置有唯一索引,防止消息被重复消费
senderId Integer 发送者ID,就是用户ID,如果是系统发出,则ID为0
senderPhoto String 发送者头像URL
senderName String 发送者名称
msg String 消息内容
sendTime Date 发送时间

针对重复消费:小程序每个5分钟进行轮询,如果积压得消息太多,Java系统没有接受完消息,这时候新的轮询到来,就会产生两个消费者共同接收同一个消息的情况,数据库就添加了同样的记录,如果每条MQ消息都有唯一的UUID值,第一个消费者把消息保存到数据库后,第二个消费者就无法继续保存了。

创建MessageEntity类映射message集合

@Data
@Document(collation = "message")
//标明由mongo来维护该表,collection里面的名字对应着mongodb里面的文档
public class MessageEntity implements Serializable {
   
    //Serializable启用其序列化功能的接口
    //serializable接口的作用:存储对象在存储介质中,以便在下次使用的时候,可以很快捷的重建一个副本;便于数据传输,尤其是在远程调用的时候。
    @Id
    private String _id;

    @Indexed(unique = true)//唯一索引
    private String uuid;

    @Indexed//加索引后以该字段为条件检索将大大提高速度
    private Integer sendId;

    //可设置默认值为系统头像
    private String senderPhoto="XXX";

    private String senderName;

    private String msg;

    @Indexed
    private Date sendTime;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
4.1.2 Message_ref集合

虽然message集合记录是消息,里面有接收者ID,但群发消息是接收者ID空,这时候需要用上message_ref集合来记录接收人和已读状态。

字段 类型 备注
_id UUID 主键
messageId UUID message记录的_id
receiverId String 接收人ID
readFlag Boolean 是否已读
lastFlag Boolean 是否为新接收的消息

创建MessageRefEntity类映射message_ref集合

public class MessageRefEntity implements Serializable {
   
    @Id
    private String _id;

    @Indexed
    private String messageId;

    @Indexed
    private Integer receiverId;

    @Indexed
    private Boolean readFlag;

    @Indexed
    private Boolean lastFlag;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
4.1.3 MongoDB的联合查询

MongoDB从3.X开始支持集合的连接查询,也就相当于MYSQL的表连接。

先插入两条数据

db.message.insert({
   
	_id:ObjectId("600bea9ab5bafb311f147506"),
	uuid:"bfcb7c47-5886-c528-5127-ce285bc2322a",
	senderId:0,
	senderPhoto:"https://static-1258386385.cos.ap-beijing.myqcloud.com/img/System.jpg",
	senderName:"Emos系统",
	msg:"HelloWord",
	sendTime:ISODate("2021-01-23T17:21:30Z")
});

db.message_ref.insert({
   
	_id:ObjectId("600beaf0d6310000830036f3"),
	messageId:"600bea9ab5bafb311f147506",
	receiverId:1,
	readFlag:false,
	lastFlag:true
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

再使用语句连接

db.message.aggregate(
    //数据类型转换,将Oject转成String去连接
	{
   
        //set定义变量
		$set:{
   
            //临时变量Id
			"id":{
   $toString:"$_id"}
		}
	},
    //
	{
   
		$lookup:{
   
            //连接message_ref,message出Id,跟message_ref的messageId连接
			from:"message_ref",
			localField:"id",
			foreignField:"messageId",
            //从message_ref取出来的数据保存在ref
			as:"ref"
		}
	},
    //寻找receiverId=1的消息内容,match为查询条件
	{
    $match:{
   "ref.receiverId":1} },
    //按照发送时间降序
	{
    $sort:{
   sendTime:-1} },
    //从0开始往后取50条数据
	{
    $skip:0 },
	{
    $limit:50 }
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

5. 后端持久层

提供的功能包括:

  • 刷新消息模块
  • 获取分页消息
  • 根据ID查询消息
  • 把未读消息更新成已读
  • 删除消息
5.1 创建MessageDao类
@Repository
public class MessageDao {
   

    @Autowired
    private MongoTemplate mongoTemplate;

    //插入数据
    public String insert(MessageEntity entity){
   
        //把北京时间转成格林时间再存到MongoDB
        Date sendTime  = entity.getSendTime();
        //偏移8个小时
        sendTime = DateUtil.offset(sendTime, DateField.HOUR,8);
        entity.setSendTime(sendTime);
        entity = mongoTemplate.save(entity);
        return entity.get_id();
    }
    //按照分页查询消息,注意第二个参数为long
    public List<HashMap> searchMessageByPage(int userId, long start, int length){
   
        JSONObject json = new JSONObject();
        //数据类型转换
        json.set("$toString","$_id");
        //集合连接
        Aggregation aggregation = Aggregation.newAggregation(
                //设置临时变量id,值为json中拿,
                Aggregation.addFields().addField("id").withValue(json).build(),
                //集合联合查询
                Aggregation.lookup("message_ref","id","messageId","ref"),
                //match设置查询条
                Aggregation.match(Criteria.where("ref.receiverId").is(userId)),
                //排序
                Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")),
                Aggregation.skip(start),
                Aggregation.limit(length
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/870616
推荐阅读
相关标签
  

闽ICP备14008679号