当前位置:   article > 正文

2024.2.19 模拟实现 RabbitMQ —— 虚拟主机设计_rabbitmq虚拟主机

rabbitmq虚拟主机

目录

引言

实现 VirtualHost 类

属性

交换机相关操作

队列相关操作

绑定相关操作

消息相关操作

关于线程安全问题

针对 VirtualHost 单元测试


引言

  • 虚拟主机的概念类似于 MySQL 的 database,用于将 交换机、队列、绑定、消息 进行逻辑上的隔离
  • 虚拟主机不仅仅要管理数据,还需要提供一些 核心 API,供上层代码进行调用
  • 就是将之前写的 内存 和 硬盘 的数据管理给串起来
  • 即整个程序的核心业务逻辑

核心 API:

  1. 创建交换机 exchangeDeclare
  2. 删除交换机 exchangeDelete
  3. 创建队列 queueDeclare
  4. 删除队列 queueDelete
  5. 创建绑定 queueBind
  6. 删除绑定 queueUnbind
  7. 发送消息 basicPublish
  8. 订阅消息 basicConsume
  9. 确认消息 basicAck

注意点一:

  • 此处各 API 的取名,各 API 中设定的参数,均参考于 RabbitMQ

注意点二:

  • 此处我们仅实现单个虚拟主机,并不打算实现添加/删除虚拟主机的 API
  • 但是会在设计数据结构上留下这样的扩展空间

实例理解

  • 虚拟主机存在目的,就是为了保证隔离,即不同虚拟主机之间的内容互不影响
  • 当 虚拟主机1 中创建了一个名为 "testExchange" 的交换机
  • 而 虚拟主机2 中也创建了一个名为 "testExchange" 的交换机
  • 虽然这两个交换机的名字相同,但是却处于不同虚拟主机中,所以需要区分开来

问题:

  • 如何表示 交换机 与 虚拟主机 之间的从属关系?

可选方案:

  • 方案一:参考数据库设计 "一对多" 的方案,给交换机表添加个属性,虚拟主机 id/name
  • 方案二:重新约定交换机的名字,即  新交换机名字 = 虚拟主机名字 + 交换机真实名字
  • 方案三:给每个虚拟主机,分配一组不同的数据库和文件(比方案二麻烦,但更优雅)

回答:

  • 此处我们选择方案二!
  • 约定在 VirtualHost 中的核心 api 里,对 exchangeName 和 queueName 做一个转换
  • 在该层代码中进行转换后,后续代码 MemoryDataCenter、DiskDataCenter 无需调整

  • 按照这个方式,也可以去区分不同的队列

  • 绑定 与 交换机和队列 相关,通过上述操作,绑定自然也就被隔离开了!
  • 消息 与 队列 相关,因为队列名已经区分开了,消息自然也就被区分开了!

实现 VirtualHost 类

属性

  1. @Getter
  2. public class VirtualHost {
  3. private String virtualHostName;
  4. private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
  5. private DiskDataCenter diskDataCenter = new DiskDataCenter();
  6. private Router router = new Router();
  7. private ConsumerManager consumerManager = new ConsumerManager(this);
  8. // 操作交换机的锁对象
  9. private final Object exchangeLocker = new Object();
  10. // 操作队列的锁对象
  11. private final Object queueLocker = new Object();
  12. public VirtualHost(String name) {
  13. this.virtualHostName = name;
  14. // 对于 MemoryDataCenter 来说,不需要额外的初始化操作的,只要对象 new 出来就行
  15. // 但是,针对 DiskDataCenter 来说,则需要进行初始化操作,建库建表和初始化数据的设定
  16. // 另外还需要针对硬盘的数据,进行恢复到内存中
  17. diskDataCenter.init();
  18. try {
  19. memoryDataCenter.recovery(diskDataCenter);
  20. } catch (IOException | MqException | ClassNotFoundException e) {
  21. e.printStackTrace();
  22. System.out.println("[VirtualHost] 恢复内存数据失败!");
  23. }
  24. }
  25. }
  • Router 类用于实现交换机的转发规则,验证 bindingKey 和 routingKey 的合法性
  • ConsumerManager 类用于实现消费消息的核心逻辑

交换机相关操作

  1. // 创建交换机
  2. // 如果交换机不存在就创建,如果存在 则直接返回
  3. // 返回值是 boolean,创建成功,返回 true,失败返回 false
  4. public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable,
  5. boolean autoDelete, Map<String,Object> arguments) {
  6. // 把交换机的名字,加上虚拟主机作为前缀
  7. exchangeName = virtualHostName + exchangeName;
  8. try {
  9. synchronized (exchangeLocker) {
  10. // 1、判定该交换机是否已经存在,直接通过内存查询
  11. Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
  12. if (existsExchange != null) {
  13. // 该交换机已经存在
  14. System.out.println("[VirtualHost] 交换机已经存在!exchangeName = " + exchangeName);
  15. return true;
  16. }
  17. // 2、真正创建交换机,先构造 Exchange 对象
  18. Exchange exchange = new Exchange();
  19. exchange.setName(exchangeName);
  20. exchange.setType(exchangeType);
  21. exchange.setDurable(durable);
  22. exchange.setAutoDelete(autoDelete);
  23. exchange.setArguments(arguments);
  24. // 3、把交换机对象写入硬盘
  25. if(durable) {
  26. diskDataCenter.insertExchange(exchange);
  27. }
  28. // 4、把交换机对象写入内存
  29. memoryDataCenter.insertExchange(exchange);
  30. System.out.println("[VirtualHost] 交换机创建完成!exchangeName = " + exchangeName);
  31. // 上述逻辑,先写硬盘,后写内存,目的就是因为硬盘更容易写失败,如果硬盘写失败了,内存就不写了
  32. // 要是先写内存,内存写成功了,硬盘写失败了,还需要把内存的数据给再删掉,就比较麻烦了
  33. }
  34. return true;
  35. }catch (Exception e) {
  36. System.out.println("[VirtualHost] 交换机创建失败!exchangeName = " + exchangeName);
  37. e.printStackTrace();
  38. return false;
  39. }
  40. }
  41. // 删除交换机
  42. public boolean exchangeDelete(String exchangeName) {
  43. exchangeName = virtualHostName + exchangeName;
  44. try {
  45. synchronized (exchangeLocker) {
  46. // 1、先找到对应的交换机
  47. Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
  48. if(toDelete == null) {
  49. throw new MqException("[VirtualHost] 交换机不存在无法删除!");
  50. }
  51. // 2、删除硬盘上的数据
  52. if(toDelete.isDurable()) {
  53. diskDataCenter.deleteExchange(exchangeName);
  54. }
  55. // 3、删除内存中的交换机数据
  56. memoryDataCenter.deleteExchange(exchangeName);
  57. System.out.println("[VirtualHost] 交换机删除成功!exchangeName = " + exchangeName);
  58. }
  59. return true;
  60. }catch (Exception e) {
  61. System.out.println("[VirtualHost] 交换机删除失败!exchangeName = " + exchangeName);
  62. e.printStackTrace();
  63. return false;
  64. }
  65. }

队列相关操作

  1. // 创建队列
  2. public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
  3. Map<String,Object> arguments) {
  4. // 把队列的名字,给拼接上虚拟主机的名字
  5. queueName = virtualHostName + queueName;
  6. try {
  7. synchronized (queueLocker) {
  8. // 1、判定队列是否存在
  9. MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
  10. if(existsQueue != null) {
  11. System.out.println("[VirtualHost] 队列已经存在!queueName = " + queueName);
  12. return true;
  13. }
  14. // 2、创建队列对象
  15. MSGQueue queue = new MSGQueue();
  16. queue.setName(queueName);
  17. queue.setDurable(durable);
  18. queue.setExclusive(exclusive);
  19. queue.setAutoDelete(autoDelete);
  20. queue.setArguments(arguments);
  21. // 3、写硬盘
  22. if(durable) {
  23. diskDataCenter.insertQueue(queue);
  24. }
  25. // 4、写内存
  26. memoryDataCenter.insertQueue(queue);
  27. System.out.println("[VirtualHost] 队列创建成功!queueName = " + queueName);
  28. }
  29. return true;
  30. }catch (Exception e) {
  31. System.out.println("[VirtualHost] 队列创建失败!queueName = " + queueName);
  32. e.printStackTrace();
  33. return false;
  34. }
  35. }
  36. // 删除队列
  37. public boolean queueDelete(String queueName) {
  38. queueName = virtualHostName + queueName;
  39. try {
  40. synchronized (queueLocker) {
  41. // 1、根据队列名字,查询下当前的队列对象
  42. MSGQueue queue = memoryDataCenter.getQueue(queueName);
  43. if(queue == null) {
  44. throw new MqException("[VirtualHost] 队列不存在!无法删除!queueName = " + queueName);
  45. }
  46. // 2、删除硬盘数据
  47. if(queue.isDurable()) {
  48. diskDataCenter.deleteQueue(queueName);
  49. }
  50. // 3、删除内存数据
  51. memoryDataCenter.deleteQueue(queueName);
  52. System.out.println("[VirtualHost] 删除队列成功!queueName = " + queueName);
  53. }
  54. return true;
  55. }catch (Exception e) {
  56. System.out.println("[VirtualHost] 删除队列失败!queueName = " + queueName);
  57. e.printStackTrace();
  58. return false;
  59. }
  60. }

绑定相关操作

  1. public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
  2. queueName = virtualHostName + queueName;
  3. exchangeName = virtualHostName + exchangeName;
  4. try {
  5. synchronized (exchangeLocker) {
  6. synchronized (queueLocker) {
  7. // 1、判定当前的绑定是否已经存在了
  8. Binding exchangeBinding = memoryDataCenter.getBinding(exchangeName,queueName);
  9. if(exchangeBinding != null) {
  10. throw new MqException("[VirtualHost] binding 已经存在! queueName = " + queueName + ", exchangeName = " + exchangeName);
  11. }
  12. // 2、验证 bindingKey 是否合法
  13. if(!router.checkBindingKey(bindingKey)) {
  14. throw new MqException("[VirtualHost] bindingKey 非法! bindingKey = " + bindingKey);
  15. }
  16. // 3、创建 Binding 对象
  17. Binding binding = new Binding();
  18. binding.setExchangeName(exchangeName);
  19. binding.setQueueName(queueName);
  20. binding.setBindingKey(bindingKey);
  21. // 4、获取一下对应的交换机和队列,如果交换机或者队列不存在,这样的绑定也是无法创建的
  22. MSGQueue queue = memoryDataCenter.getQueue(queueName);
  23. if(queue == null) {
  24. throw new MqException("[VirtualHost] 队列不存在! queueName = " + queueName);
  25. }
  26. Exchange exchange = memoryDataCenter.getExchange(exchangeName);
  27. if(exchange == null) {
  28. throw new MqException("[VirtualHost] 交换机不存在! exchangeName = " + exchangeName);
  29. }
  30. // 5、先写硬盘
  31. if(queue.isDurable() && exchange.isDurable()) {
  32. diskDataCenter.insertBinding(binding);
  33. }
  34. // 6、写入内存
  35. memoryDataCenter.insertBinding(binding);
  36. System.out.println("[VirtualHost] 绑定创建成功!queueName = " + queueName + ", exchangeName = " + exchangeName);
  37. }
  38. }
  39. return true;
  40. }catch (Exception e) {
  41. System.out.println("[VirtualHost] queueBind 失败!queueName = " + queueName + ", exchangeName = " + exchangeName);
  42. e.printStackTrace();
  43. return false;
  44. }
  45. }
  46. public boolean queueUnbind(String queueName, String exchangeName) {
  47. queueName = virtualHostName + queueName;
  48. exchangeName = virtualHostName + exchangeName;
  49. try {
  50. synchronized (exchangeLocker) {
  51. synchronized (queueLocker) {
  52. // 1、获取 binding 看是否已经存在
  53. Binding binding = memoryDataCenter.getBinding(exchangeName,queueName);
  54. if(binding == null) {
  55. throw new MqException("[VirtualHost] 删除绑定失败!绑定不存在!queueName = " + queueName + ", exchangeName = " + exchangeName);
  56. }
  57. // 2、无论绑定是否持久化了,都尝试从硬盘删一下,就算不存在,这个删除也无副作用
  58. diskDataCenter.deleteBinding(binding);
  59. // 3、删除内存的数据
  60. memoryDataCenter.deleteBinding(binding);
  61. System.out.println("[VirtualHost] 删除绑定成功!");
  62. }
  63. }
  64. return true;
  65. }catch (Exception e) {
  66. System.out.println("[VirtualHost] 删除绑定失败!!");
  67. e.printStackTrace();
  68. return false;
  69. }
  70. }

注意:

  • 观察下图红框代码

问题:

  • 如果在解除该绑定之前,该绑定中的 交换机 或 队列 已经被删除了
  • 那么此时我们采用上述逻辑就无法解除绑定了

方案一:

  • 参考类似于 MySQL 的外键一样,删除队列/交换机的时候,判定一下看当前交换机/队列 是否存在对应的绑定,如果存在,则禁止删除队列/交换机,要求先解除绑定,再尝试删除队列/交换机
  • 优点:更严谨
  • 缺点:更麻烦,尤其是,查看当前的队列是否有对应的绑定的时候

  • 由上图我们给 Binding 设定的内存数据结构可知,查看一个交换机有哪些绑定是比较容易的,但是查看一个队列有哪些绑定是比较难的!

方案二:

  • 删除绑定时,干脆不校验交换机/队列存在,直接就尝试删除
  • 优点:简单
  • 缺点:没那么严谨,也还好

回答:

  • 此处,我们直接采取第二种方法!

  • 无论 exchange 或 queue 是否持久化了,均尝试从硬盘上删除一下
  • 因为即使 exchange 或 queue 未持久化到硬盘上,底层调用的 delete 语句也不会有什么副作用


消息相关操作

  1. // 发送消息到指定的交换机/队列中
  2. public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body) {
  3. try {
  4. // 1、转换交换机的名字
  5. exchangeName = virtualHostName + exchangeName;
  6. // 2、检查 routingKey 是否合法
  7. if(!router.checkRoutingKey(routingKey)) {
  8. throw new MqException("[VirtualHost] routingKey 非法!routingKey = " + routingKey);
  9. }
  10. // 3、查找交换机对象
  11. Exchange exchange = memoryDataCenter.getExchange(exchangeName);
  12. if(exchange == null) {
  13. throw new MqException("[VirtualHost] 交换机不存在!exchangeName = " + exchangeName);
  14. }
  15. // 4、判定交换机类型
  16. if(exchange.getType() == ExchangeType.DIRECT) {
  17. // 按照直接交换机的方式来转发消息
  18. // 以 routingKey 作为队列的名字,直接把消息写入到指定的队列中
  19. // 此时,可以无视绑定关系
  20. String queueName = virtualHostName + routingKey;
  21. // 5、构造消息对象
  22. Message message = Message.createMessageWithId(routingKey,basicProperties,body);
  23. // 6、查找该队列对应的对象
  24. MSGQueue queue = memoryDataCenter.getQueue(queueName);
  25. if(queue == null) {
  26. throw new MqException("[VirtualHost] 队列不存在!queueName = " + queueName);
  27. }
  28. // 7、队列存在,直接给队列中写入消息
  29. sendMessage(queue,message);
  30. }else {
  31. // 按照 fanout 和 topic 的方式来转发
  32. // 5、找到该交换机关联的所有绑定,并遍历这些绑定对象
  33. ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
  34. for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {
  35. // 1)获取到绑定对象,判定对应的的队列是否存在
  36. Binding binding = entry.getValue();
  37. MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
  38. if(queue == null) {
  39. // 此处无需抛出异常,可能此处有多个这样的队列
  40. // 希望不要因为一个队列影响到其他队列的消息的传输
  41. System.out.println("[VirtualHost] basicPublish 发送消息时,队列不存在!queueName = " + binding.getQueueName());
  42. continue;
  43. }
  44. // 2)构造消息对象
  45. Message message = Message.createMessageWithId(routingKey,basicProperties,body);
  46. // 3)判定这个消息时否能转发给该队列
  47. // 如果是 fanout,所有绑定的队列都要转发的
  48. // 如果是 topic,还需要判定下,bindingKey 和 routingKey 是不是匹配
  49. if(!router.route(exchange.getType(),binding,message)){
  50. continue;
  51. }
  52. // 真正转发消息给队列
  53. sendMessage(queue,message);
  54. }
  55. }
  56. return true;
  57. }catch (Exception e) {
  58. System.out.println("[VirtualHost] 消息发送失败!");
  59. e.printStackTrace();
  60. return false;
  61. }
  62. }
  63. private void sendMessage(MSGQueue queue,Message message) throws IOException, MqException, InterruptedException {
  64. // 此处发送消息,就是把消息写入到硬盘 和 内存上
  65. int deliverMode = message.getDeliverMode();
  66. // deliverMode 为 1 表示不持久化,deliverMode 为 2 表示持久化
  67. if(deliverMode == 2) {
  68. diskDataCenter.sendMessage(queue,message);
  69. }
  70. // 写入内存
  71. memoryDataCenter.sendMessage(queue,message);
  72. // 通知消费者可以消费消息了
  73. consumerManager.notifyConsume(queue.getName());
  74. }
  75. // 订阅消息
  76. // 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
  77. // consumerTag:表示消费者的身份标识
  78. // autoAck:消息被消费完成后,应答的方式,为 true 自动应答,为 false 手动应答
  79. // consumer:是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子
  80. public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
  81. // 构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 Consumer 对象添加到该队列中
  82. queueName = virtualHostName + queueName;
  83. try {
  84. consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
  85. System.out.println("[VirtualHost] basicConsume 成功! queueName = " + queueName);
  86. return true;
  87. }catch (Exception e) {
  88. System.out.println("[VirtualHost] basicConsume 失败! queueName = " + queueName);
  89. e.printStackTrace();
  90. return false;
  91. }
  92. }
  93. public boolean basicAck(String queueName,String messageId) {
  94. queueName = virtualHostName + queueName;
  95. try {
  96. // 1、获取到消息和队列
  97. Message message = memoryDataCenter.getMessage(messageId);
  98. if(message == null) {
  99. throw new MqException("[VirtualHost] 要确认的消息不存在!messageId = " + messageId);
  100. }
  101. MSGQueue queue = memoryDataCenter.getQueue(queueName);
  102. if(queue == null) {
  103. throw new MqException("[VirtualHost] 要确认的消息不存在!queueName = " + queueName);
  104. }
  105. // 2、删除硬盘上的数据
  106. if(message.getDeliverMode() == 2) {
  107. diskDataCenter.deleteMessage(queue,message);
  108. }
  109. // 3、删除消息中心的数据
  110. memoryDataCenter.removeMessage(messageId);
  111. // 4、删除待确认的集合中的消息
  112. memoryDataCenter.removeMessageWaitAck(queueName,messageId);
  113. System.out.println("[VirtualHost] basicAck 成功!消息被确认成功!queueName = " + queueName
  114. + ", messageId = " + messageId);
  115. return true;
  116. }catch (Exception e) {
  117. System.out.println("[VirtualHost] basicAck 失败!消息确认失败!queueName = " + queueName
  118. + ", messageId = " + messageId);
  119. e.printStackTrace();
  120. return false;
  121. }
  122. }

关于线程安全问题

  • 针对创建交换机 exchangeDeclare 方法加锁

  • 针对删除交换机 exchangeDelete 方法加锁

  • 如上图所示,此处我们是针对 exchangeLocker 对象进行加锁的,从而导致这个锁的粒度还是比较大的
  • 比如 创建/删除 交换机A 时,此时就会影响到 交换机B 的创建/删除

注意:

  • 此处我们确实可以做出一系列调整,加一个更细粒度的锁,但是也没啥必要
  • 对于 Broker Server 来说,创建交换机、创建绑定、创建队列、删除交换机、删除绑定、删除队列,这些均属于低频操作!
  • 既然是低频操作,所以遇到两个线程都去操作创建队列之类的情况本身就概率很低了
  • 因此,对于绝大多数情况来说,是不会触发锁冲突的
  • 再加之 synchronized 最初为偏向锁状态,该状态下加锁成本也还好,只有遇到竞争才会真正加锁
  • 当然,为了应对一些少数的极端情况,此处加锁还是有一定必要的

问题:

  • 既然在这一层代码加锁了
  • 里面的 MemoryDataCenter 中的操作是否就不必加锁了?
  • 是否之前的加锁就没有意义了?

回答:

  • 我们并不知道 MemoryDataCenter 的方法是给哪个类进行调用的
  • 因为当前 VirtualHost 自身是保证了线程安全的
  • 所以 VirtualHost 内部调用的 MemoryDataCenter 中不加锁也问题不大
  • 但是如果是另一个自身未保证线程安全的类,也多线程调用 MemoryCenter 呢?

针对 VirtualHost 单元测试

  • 编写测试用例代码是十分重要的!
  1. package com.example.demo;
  2. import com.example.demo.common.Consumer;
  3. import com.example.demo.mqserver.VirtualHost;
  4. import com.example.demo.mqserver.core.BasicProperties;
  5. import com.example.demo.mqserver.core.ExchangeType;
  6. import org.apache.tomcat.util.http.fileupload.FileUtils;
  7. import org.junit.jupiter.api.AfterEach;
  8. import org.junit.jupiter.api.Assertions;
  9. import org.junit.jupiter.api.BeforeEach;
  10. import org.junit.jupiter.api.Test;
  11. import org.springframework.boot.SpringApplication;
  12. import org.springframework.boot.test.context.SpringBootTest;
  13. import java.io.File;
  14. import java.io.IOException;
  15. @SpringBootTest
  16. public class VirtualHostTests {
  17. private VirtualHost virtualHost = null;
  18. @BeforeEach
  19. public void setUp() {
  20. DemoApplication.context = SpringApplication.run(DemoApplication.class);
  21. virtualHost = new VirtualHost("default");
  22. }
  23. @AfterEach
  24. public void tearDown() throws IOException {
  25. DemoApplication.context.close();
  26. virtualHost = null;
  27. // 把硬盘的目录删除掉
  28. File dataDir = new File("./data");
  29. FileUtils.deleteDirectory(dataDir);
  30. }
  31. @Test
  32. public void testExchangeDeclare() {
  33. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  34. true,false,null);
  35. Assertions.assertTrue(ok);
  36. }
  37. @Test
  38. public void testExchangeDelete() {
  39. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  40. true,false,null);
  41. Assertions.assertTrue(ok);
  42. ok = virtualHost.exchangeDelete("testExchange");
  43. Assertions.assertTrue(ok);
  44. }
  45. @Test
  46. public void testQueueDeclare() {
  47. boolean ok = virtualHost.queueDeclare("testQueue",true,
  48. false,false,null);
  49. Assertions.assertTrue(ok);
  50. }
  51. @Test
  52. public void testQueueDelete() {
  53. boolean ok = virtualHost.queueDeclare("testQueue",true,
  54. false,false,null);
  55. Assertions.assertTrue(ok);
  56. ok = virtualHost.queueDelete("testQueue");
  57. Assertions.assertTrue(ok);
  58. }
  59. @Test
  60. public void testQueueBind() {
  61. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  62. true,false,null);
  63. Assertions.assertTrue(ok);
  64. ok = virtualHost.queueDeclare("testQueue",true,
  65. false,false,null);
  66. Assertions.assertTrue(ok);
  67. ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");
  68. Assertions.assertTrue(ok);
  69. }
  70. @Test
  71. public void testQueueUnbind() {
  72. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  73. true,false,null);
  74. Assertions.assertTrue(ok);
  75. ok = virtualHost.queueDeclare("testQueue",true,
  76. false,false,null);
  77. Assertions.assertTrue(ok);
  78. ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");
  79. Assertions.assertTrue(ok);
  80. ok = virtualHost.queueUnbind("testQueue","testExchange");
  81. Assertions.assertTrue(ok);
  82. }
  83. @Test
  84. public void testBasicPublish() {
  85. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  86. true,false,null);
  87. Assertions.assertTrue(ok);
  88. ok = virtualHost.queueDeclare("testQueue",true,
  89. false,false,null);
  90. Assertions.assertTrue(ok);
  91. ok = virtualHost.basicPublish("testExchange","testQueue",null,
  92. "hello".getBytes());
  93. Assertions.assertTrue(ok);
  94. }
  95. // 先订阅队列,后发送消息
  96. @Test
  97. public void testBasicConsume1() throws InterruptedException {
  98. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  99. true,false,null);
  100. Assertions.assertTrue(ok);
  101. ok = virtualHost.queueDeclare("testQueue",true,
  102. false,false,null);
  103. Assertions.assertTrue(ok);
  104. // 先订阅队列
  105. ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
  106. @Override
  107. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  108. // 消费者自身设定的回调方法
  109. System.out.println("messageId = " + basicProperties.getMessageId());
  110. System.out.println("body = " + new String(body,0,body.length));
  111. Assertions.assertEquals("testQueue",basicProperties.getRoutingKey());
  112. Assertions.assertEquals(1,basicProperties.getDeliverMode());
  113. Assertions.assertArrayEquals("hello".getBytes(),body);
  114. }
  115. });
  116. Assertions.assertTrue(ok);
  117. Thread.sleep(500);
  118. // 再发送消息
  119. ok = virtualHost.basicPublish("testExchange","testQueue",null,
  120. "hello".getBytes());
  121. Assertions.assertTrue(ok);
  122. }
  123. // 先发送消息,后订阅队列
  124. @Test
  125. public void testBasicConsume2() throws InterruptedException {
  126. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  127. true,false,null);
  128. Assertions.assertTrue(ok);
  129. ok = virtualHost.queueDeclare("testQueue",true,
  130. false,false,null);
  131. Assertions.assertTrue(ok);
  132. // 先发送消息
  133. ok = virtualHost.basicPublish("testExchange","testQueue",null,
  134. "hello".getBytes());
  135. Assertions.assertTrue(ok);
  136. // 再订阅队列
  137. ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
  138. @Override
  139. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  140. // 消费者自身设定的回调方法
  141. System.out.println("messageId = " + basicProperties.getMessageId());
  142. System.out.println("body = " + new String(body,0,body.length));
  143. Assertions.assertEquals("testQueue",basicProperties.getRoutingKey());
  144. Assertions.assertEquals(1,basicProperties.getDeliverMode());
  145. Assertions.assertArrayEquals("hello".getBytes(),body);
  146. }
  147. });
  148. Assertions.assertTrue(ok);
  149. Thread.sleep(500);
  150. }
  151. @Test
  152. public void testBasicConsumeFanout() throws InterruptedException {
  153. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT,
  154. false,false,null);
  155. Assertions.assertTrue(ok);
  156. ok = virtualHost.queueDeclare("testQueue1",false,
  157. false,false,null);
  158. Assertions.assertTrue(ok);
  159. ok = virtualHost.queueBind("testQueue1","testExchange","");
  160. Assertions.assertTrue(ok);
  161. ok = virtualHost.queueDeclare("testQueue2",false,
  162. false,false,null);
  163. Assertions.assertTrue(ok);
  164. ok = virtualHost.queueBind("testQueue2","testExchange","");
  165. Assertions.assertTrue(ok);
  166. // 往交换机中发送一个消息
  167. ok = virtualHost.basicPublish("testExchange","",null,"hello".getBytes());
  168. Assertions.assertTrue(ok);
  169. Thread.sleep(500);
  170. // 两个消费者订阅上述的两个队列
  171. ok = virtualHost.basicConsume("testConsumer", "testQueue1", true, new Consumer() {
  172. @Override
  173. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  174. System.out.println("consumerTag = " + consumerTag);
  175. System.out.println("messageId = " + basicProperties.getMessageId());
  176. Assertions.assertArrayEquals("hello".getBytes(),body);
  177. }
  178. });
  179. Assertions.assertTrue(ok);
  180. ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
  181. @Override
  182. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  183. System.out.println("consumerTag = " + consumerTag);
  184. System.out.println("messageId = " + basicProperties.getMessageId());
  185. Assertions.assertArrayEquals("hello".getBytes(),body);
  186. }
  187. });
  188. Assertions.assertTrue(ok);
  189. Thread.sleep(500);
  190. }
  191. @Test
  192. public void testBasicConsumeTopic() throws InterruptedException {
  193. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC,
  194. false,false,null);
  195. Assertions.assertTrue(ok);
  196. ok = virtualHost.queueDeclare("testQueue1",false,
  197. false,false,null);
  198. Assertions.assertTrue(ok);
  199. ok = virtualHost.queueBind("testQueue1","testExchange","aaa.*.bbb");
  200. Assertions.assertTrue(ok);
  201. ok = virtualHost.basicPublish("testExchange","aaa.ccc.bbb",null,"hello".getBytes());
  202. Assertions.assertTrue(ok);
  203. ok = virtualHost.basicConsume("testConsumer", "testQueue1", true, new Consumer() {
  204. @Override
  205. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  206. System.out.println("consumerTag = " + consumerTag);
  207. System.out.println("messageId = " + basicProperties.getMessageId());
  208. Assertions.assertArrayEquals("hello".getBytes(),body);
  209. }
  210. });
  211. Assertions.assertTrue(ok);
  212. Thread.sleep(500);
  213. }
  214. @Test
  215. public void testBasicAck() throws InterruptedException {
  216. boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
  217. true,false,null);
  218. Assertions.assertTrue(ok);
  219. ok = virtualHost.queueDeclare("testQueue",true,
  220. false,false,null);
  221. Assertions.assertTrue(ok);
  222. // 先发送消息
  223. ok = virtualHost.basicPublish("testExchange","testQueue",null,
  224. "hello".getBytes());
  225. Assertions.assertTrue(ok);
  226. // 再订阅队列 【要改的地方,把 autoAck 改成 false】
  227. ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
  228. @Override
  229. public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
  230. // 消费者自身设定的回调方法
  231. System.out.println("messageId = " + basicProperties.getMessageId());
  232. System.out.println("body = " + new String(body,0,body.length));
  233. Assertions.assertEquals("testQueue",basicProperties.getRoutingKey());
  234. Assertions.assertEquals(1,basicProperties.getDeliverMode());
  235. Assertions.assertArrayEquals("hello".getBytes(),body);
  236. // 【要改的地方,新增手动调用 basicAck】
  237. boolean ok = virtualHost.basicAck("testQueue",basicProperties.getMessageId());
  238. Assertions.assertTrue(ok);
  239. }
  240. });
  241. Assertions.assertTrue(ok);
  242. Thread.sleep(500);
  243. }
  244. }
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号