赞
踩
消息和用户是一对一关系,例如通过一对一才能知道哪个消息被用户已读,但MYSQL是无法支持海量数据库存储的。所以我们选择使用MongoDB存储消息记录(海量低价值的数据),MongoDB没有表结构,只有集合。message存储消息主体(正文、日期、发送人),message_re存储接收人(接收人、用户是否阅读等数据)。
如果瞬间写入海量记录,数据库正常的CRUD会受到影响,我们需要使用消息队列实现削峰填谷,即把消息发送到消息队列(RabbitMQ),当用户登陆时再把消息提取出来保存到MongoDB。
消息日积月累,MongoDB也会有撑不下来的时候,我们需要搞冷热数据分离,热数据定期归档,根据数据被使用的频率可以划分为热数据和冷数据,例如,一年内的数据被看做事热数据,超过一年的数据被当作冷数据。每天定期把冷数据从MongoDB_1转移到MongoDB_2,这样MongoDB_1的数据量减少就提高了速度,MongoDB_2存放数据量很大,但冷数据很少被使用,仅仅充当归档库而已。冷数据定期销毁,释放存储空间。假设五年以上的冷数据就被当成超期数据,删除超期数据之后归档库的空间就增加了。
引入消息队列MQ,在Web系统中,先CRUD消息主体(不包含接收人等信息)到MongoDB,用异步多线程把消息的ref集合发送到消息队列。每一个消息的topic则对应一个用户ID。
当用户登录系统后,Web系统利用异步线程方式从消息队列中把消息接收回来,把ref消息写入到MongoDB里。
RabbitMQ与Kafka不同,RabbitMQ既支持消息异步收发,又支持同步收发。能适应各种业务场景的优点便显现出来了。
将rabbitmq.tar.gz文件上传到CentOS系统,把镜像导入Docker环境
docker load < rabbitmq.tar.gz
创建rabbitmq容器
docker run -it -d --name mq -p 15672:15672 -p 5672:5672 rabbitmq
一个生产者(发送方)对应一个消费者(接收方),生产者和消费者必须是一对一的关系。
一个生产者对应多个消费者,但只有一个消费者能获得消息,具有排他性。
一个生产者首先将消息发送到fanout交换器,交换器绑定多个队列,然后与之对应的所有消费真呢个收到消息,不具有排他性。
生产者将消息发送到direct交换器,交换器按照关键字key把消息路由到某个队列,即符合规则才会转发到该队列。
生产者将消息发送到topic交换器,交换器按照复杂的规则将消息路由到某个队列
消息的可靠性是RabbitMQ的特色,可靠性由消息持久化实现,可以防止在异常情况下丢失数据,交换器和队列都能持久化。
默认情况下,消息是无限期存储在RabbitMQ上的,但我们可以设置过期时间,到期后无论是否已经被接受都会被RabbitMQ删除。
消费者接收消息之后,必须返回一个Ack应答,那么RabbitMQ才会认为这个消息接收成功,如果想要删除这条消息,消费者发送Ack应答的时候,附带一个deliverTag标志位就可以了。
异步接收消息消耗的系统资源较少,但是微信小程序和后端项目之间并不是长连接,后端项目异步方式接收到队列中的消息也无法推送移动端的小程序。所以后端的Java项目采用同步的方式接收队列的消息,在移动端,我们创建定时器,向后端Java项目发出轮询请求,后端项目接收到轮询秦桧后,用同步方式接收队列的消息,然后把消息存储在MongoDB上面。
MongoDB中没有数据表的概念,而是采用集合Collection存储数据,每一个数据就是一个Document,文档结构其实就是我们常用的JSON。
字段 | 类型 | 备注 |
---|---|---|
_id | UUID | 自动生成的主键值 |
uuid | UUID | UUID值,并设置有唯一索引,防止消息被重复消费 |
senderId | Integer | 发送者ID,就是用户ID,如果是系统发出,则ID为0 |
senderPhoto | String | 发送者头像URL |
senderName | String | 发送者名称 |
msg | String | 消息内容 |
sendTime | Date | 发送时间 |
针对重复消费:小程序每个5分钟进行轮询,如果积压得消息太多,Java系统没有接受完消息,这时候新的轮询到来,就会产生两个消费者共同接收同一个消息的情况,数据库就添加了同样的记录,如果每条MQ消息都有唯一的UUID值,第一个消费者把消息保存到数据库后,第二个消费者就无法继续保存了。
创建MessageEntity类映射message集合
@Data @Document(collation = "message") //标明由mongo来维护该表,collection里面的名字对应着mongodb里面的文档 public class MessageEntity implements Serializable { //Serializable启用其序列化功能的接口 //serializable接口的作用:存储对象在存储介质中,以便在下次使用的时候,可以很快捷的重建一个副本;便于数据传输,尤其是在远程调用的时候。 @Id private String _id; @Indexed(unique = true)//唯一索引 private String uuid; @Indexed//加索引后以该字段为条件检索将大大提高速度 private Integer sendId; //可设置默认值为系统头像 private String senderPhoto="XXX"; private String senderName; private String msg; @Indexed private Date sendTime; }
虽然message集合记录是消息,里面有接收者ID,但群发消息是接收者ID空,这时候需要用上message_ref集合来记录接收人和已读状态。
字段 | 类型 | 备注 |
---|---|---|
_id | UUID | 主键 |
messageId | UUID | message记录的_id |
receiverId | String | 接收人ID |
readFlag | Boolean | 是否已读 |
lastFlag | Boolean | 是否为新接收的消息 |
创建MessageRefEntity类映射message_ref集合
public class MessageRefEntity implements Serializable { @Id private String _id; @Indexed private String messageId; @Indexed private Integer receiverId; @Indexed private Boolean readFlag; @Indexed private Boolean lastFlag; }
MongoDB从3.X开始支持集合的连接查询,也就相当于MYSQL的表连接。
先插入两条数据
db.message.insert({ _id:ObjectId("600bea9ab5bafb311f147506"), uuid:"bfcb7c47-5886-c528-5127-ce285bc2322a", senderId:0, senderPhoto:"https://static-1258386385.cos.ap-beijing.myqcloud.com/img/System.jpg", senderName:"Emos系统", msg:"HelloWord", sendTime:ISODate("2021-01-23T17:21:30Z") }); db.message_ref.insert({ _id:ObjectId("600beaf0d6310000830036f3"), messageId:"600bea9ab5bafb311f147506", receiverId:1, readFlag:false, lastFlag:true });
再使用语句连接
db.message.aggregate( //数据类型转换,将Oject转成String去连接 { //set定义变量 $set:{ //临时变量Id "id":{ $toString:"$_id"} } }, // { $lookup:{ //连接message_ref,message出Id,跟message_ref的messageId连接 from:"message_ref", localField:"id", foreignField:"messageId", //从message_ref取出来的数据保存在ref as:"ref" } }, //寻找receiverId=1的消息内容,match为查询条件 { $match:{ "ref.receiverId":1} }, //按照发送时间降序 { $sort:{ sendTime:-1} }, //从0开始往后取50条数据 { $skip:0 }, { $limit:50 } )
提供的功能包括:
@Repository public class MessageDao { @Autowired private MongoTemplate mongoTemplate; //插入数据 public String insert(MessageEntity entity){ //把北京时间转成格林时间再存到MongoDB Date sendTime = entity.getSendTime(); //偏移8个小时 sendTime = DateUtil.offset(sendTime, DateField.HOUR,8); entity.setSendTime(sendTime); entity = mongoTemplate.save(entity); return entity.get_id(); } //按照分页查询消息,注意第二个参数为long public List<HashMap> searchMessageByPage(int userId, long start, int length){ JSONObject json = new JSONObject(); //数据类型转换 json.set("$toString","$_id"); //集合连接 Aggregation aggregation = Aggregation.newAggregation( //设置临时变量id,值为json中拿, Aggregation.addFields().addField("id").withValue(json).build(), //集合联合查询 Aggregation.lookup("message_ref","id","messageId","ref"), //match设置查询条 Aggregation.match(Criteria.where("ref.receiverId").is(userId)), //排序 Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")), Aggregation.skip(start), Aggregation.limit(length
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。