当前位置:   article > 正文

RabbitMQ 学习:Work Queues 轮询(工作队列)_rabbitmq的轮询

rabbitmq的轮询

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。

在后台运行的工作进程将弹出任务并最终执行作业。

当有多个工作线程时,这些工作线程将一起处理这些任务。工作线程就相当于消费者

        当生产者给队列发送大量消息(非常多,超级多)时,此时很多消息停留在队列当中无法被一个或多个线程(消费者)及时的处理,这时候会考虑增加线程(消费者),由原来的单一线程(消费者)调整为多个线程(消费者),但是又要遵循一个原则(一个消息只能被处理一次,避免重复消费)。

        而rabbitmq的工作队列(又叫任务队列)就有一个特点叫轮询分发消息,采用轮询机制给各处理线程分发消息,保证每个消息都会被消费并且只被消费一次,从而达到避免重复分发消息的目的。各处理线程(消费者)之间是竞争关系,轮流抢夺消费信息。

 工作队列代码实现:

一、创建工具类

  1. package com.zzuli.rabbitmq.utils;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. /**
  6. * @Author WangXiaoSong
  7. * @Date 2023/4/11 21:42
  8. * @Description 此类为连接工厂创建信道的工具类
  9. */
  10. public class RabbitMqUtils {
  11. public static Channel getChannel() throws Exception{
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.80.132");
  14. factory.setUsername("admin");
  15. factory.setPassword("root");
  16. factory.setVirtualHost("/");
  17. Connection connection = factory.newConnection();
  18. return connection.createChannel();
  19. }
  20. }

二、创建工作线程

  1. package com.zzuli.rabbitmq.two;
  2. import com.rabbitmq.client.CancelCallback;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import com.rabbitmq.client.Delivery;
  6. import com.zzuli.rabbitmq.utils.RabbitMqUtils;
  7. /**
  8. * @Author WangXiaoSong
  9. * @Date 2023/4/11 21:47
  10. * @Description 这是一个工作线程(相当于之前的消费者)
  11. */
  12. public class Worker01 {
  13. //队列名称
  14. public static final String QUEUE_NAME = "hello";
  15. //接收消息
  16. public static void main(String[] args) throws Exception{
  17. //获取通道
  18. Channel channel = RabbitMqUtils.getChannel();
  19. //接收消息回调
  20. DeliverCallback callback = (String var1, Delivery var2) ->{
  21. System.out.println("接收到的消息:" + new String(var2.getBody()));
  22. };
  23. //消息被中断接收回调
  24. CancelCallback cancelCallback = (String var1) ->{
  25. System.out.println(var1 + "消息被消费者取消消费接口回调逻辑");
  26. };
  27. System.out.println("C2等待接收消息...");
  28. //消息的接收
  29. channel.basicConsume(QUEUE_NAME,true,callback,cancelCallback);
  30. }
  31. }

 三、创建生产者队列

  1. package com.zzuli.rabbitmq.two;
  2. import com.rabbitmq.client.Channel;
  3. import com.zzuli.rabbitmq.utils.RabbitMqUtils;
  4. import java.util.Scanner;
  5. /**
  6. * @Author WangXiaoSong
  7. * @Date 2023/4/11 22:16
  8. * @Description 生产者 发送大量消息
  9. */
  10. public class Task01 {
  11. //队列名称
  12. public static final String QUEUE_NAME = "hello";
  13. public static void main(String[] args) throws Exception{
  14. //获取channel
  15. Channel channel = RabbitMqUtils.getChannel();
  16. /**
  17. * 生成一个队列
  18. * 1.队列名称
  19. * 2.队列中消息是否持久化(磁盘) 默认存储在内存中
  20. * 3.队列是否只供多个消费者消费(是否可进行消息共享),true多个,false一个
  21. * 4.是否自动销毁 最后一个消费者断开连接后,该队列是否自动销毁 true销毁,false不销毁
  22. * 5.关于延迟消息、死信消息等设置
  23. */
  24. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  25. //从控制台输入要发送的消息
  26. Scanner scanner = new Scanner(System.in);
  27. while (scanner.hasNext()){
  28. String message = scanner.next();
  29. /**
  30. * 1.发送到哪个交换机上(使用哪个交换机)
  31. * 2.路由key(本次队列名)
  32. * 3.某些参数
  33. * 4.二进制形式的消息
  34. */
  35. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  36. System.out.println(message + "消息已发送");
  37. }
  38. }
  39. }

四、idea设置进程多开模式

 

 

五、为了便于区分,启动Worker01中的main方法之后,将 print中的C1改为C2再次启动

六、启动Task01,依次发送AA  BB  CC  DD四个消息,查看Worker01的两个进程的运行情况

 

 

 

结论:通过此次测试,得出rabbitmq的工作队列是轮询发送消息的,多个消费者之间是竞争(抢占)关系。 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/911106
推荐阅读
相关标签
  

闽ICP备14008679号