赞
踩
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。
在后台运行的工作进程将弹出任务并最终执行作业。
当有多个工作线程时,这些工作线程将一起处理这些任务。工作线程就相当于消费者
当生产者给队列发送大量消息(非常多,超级多)时,此时很多消息停留在队列当中无法被一个或多个线程(消费者)及时的处理,这时候会考虑增加线程(消费者),由原来的单一线程(消费者)调整为多个线程(消费者),但是又要遵循一个原则(一个消息只能被处理一次,避免重复消费)。
而rabbitmq的工作队列(又叫任务队列)就有一个特点叫轮询分发消息,采用轮询机制给各处理线程分发消息,保证每个消息都会被消费并且只被消费一次,从而达到避免重复分发消息的目的。各处理线程(消费者)之间是竞争关系,轮流抢夺消费信息。
一、创建工具类
- package com.zzuli.rabbitmq.utils;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * @Author WangXiaoSong
- * @Date 2023/4/11 21:42
- * @Description 此类为连接工厂创建信道的工具类
- */
- public class RabbitMqUtils {
-
- public static Channel getChannel() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.80.132");
- factory.setUsername("admin");
- factory.setPassword("root");
- factory.setVirtualHost("/");
-
- Connection connection = factory.newConnection();
- return connection.createChannel();
- }
- }

二、创建工作线程
- package com.zzuli.rabbitmq.two;
-
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
- import com.rabbitmq.client.Delivery;
- import com.zzuli.rabbitmq.utils.RabbitMqUtils;
-
- /**
- * @Author WangXiaoSong
- * @Date 2023/4/11 21:47
- * @Description 这是一个工作线程(相当于之前的消费者)
- */
- public class Worker01 {
-
- //队列名称
- public static final String QUEUE_NAME = "hello";
-
- //接收消息
- public static void main(String[] args) throws Exception{
- //获取通道
- Channel channel = RabbitMqUtils.getChannel();
-
- //接收消息回调
- DeliverCallback callback = (String var1, Delivery var2) ->{
- System.out.println("接收到的消息:" + new String(var2.getBody()));
- };
-
- //消息被中断接收回调
- CancelCallback cancelCallback = (String var1) ->{
- System.out.println(var1 + "消息被消费者取消消费接口回调逻辑");
- };
- System.out.println("C2等待接收消息...");
- //消息的接收
- channel.basicConsume(QUEUE_NAME,true,callback,cancelCallback);
- }
- }

三、创建生产者队列
- package com.zzuli.rabbitmq.two;
-
- import com.rabbitmq.client.Channel;
- import com.zzuli.rabbitmq.utils.RabbitMqUtils;
-
- import java.util.Scanner;
-
- /**
- * @Author WangXiaoSong
- * @Date 2023/4/11 22:16
- * @Description 生产者 发送大量消息
- */
- public class Task01 {
-
- //队列名称
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception{
- //获取channel
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 生成一个队列
- * 1.队列名称
- * 2.队列中消息是否持久化(磁盘) 默认存储在内存中
- * 3.队列是否只供多个消费者消费(是否可进行消息共享),true多个,false一个
- * 4.是否自动销毁 最后一个消费者断开连接后,该队列是否自动销毁 true销毁,false不销毁
- * 5.关于延迟消息、死信消息等设置
- */
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- //从控制台输入要发送的消息
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.next();
- /**
- * 1.发送到哪个交换机上(使用哪个交换机)
- * 2.路由key(本次队列名)
- * 3.某些参数
- * 4.二进制形式的消息
- */
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println(message + "消息已发送");
- }
-
- }
-
-
- }

四、idea设置进程多开模式
五、为了便于区分,启动Worker01中的main方法之后,将 print中的C1改为C2再次启动
六、启动Task01,依次发送AA BB CC DD四个消息,查看Worker01的两个进程的运行情况
结论:通过此次测试,得出rabbitmq的工作队列是轮询发送消息的,多个消费者之间是竞争(抢占)关系。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。