赞
踩
队列的消费顺序一般是先进先出。但是在某些订单中业务中,我们需要给vip用户后下单,先出货的特殊权限,这时候就需要用到优先级队列。
原理,在原来先进先出的逻辑上,给队列备注优先级,最后的顺序如下:
优先级高–>优先级低–>没有备注优先级
ps:优先级的范围为0-255
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class RabbitMQUtils {
-
- // 创建连接mq的链接工厂对象
- private static ConnectionFactory connectionFactory;
-
- static {
-
- connectionFactory = new ConnectionFactory();
- // 设置rabbitmq服务器所在的ip port
- connectionFactory.setHost("192.168.1.100");
- connectionFactory.setPort(5672);
- // 设置rabbitmq所在的虚拟主机
- connectionFactory.setVirtualHost("/");
- // 设置rabbitmq所在服务的用户名 密码
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
-
- }
-
- // 定义提供链接对象的方法
- public static Connection getConnection(){
-
-
- try {
- return connectionFactory.newConnection();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
-
- return null;
- }
-
- // 关闭通道和链接的方法
- public static void closeChannelAndConnection(Channel channel, Connection connection){
- try {
- if(channel!=null) channel.close();
- if(connection!=null) connection.close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }

-
- import cn.my.utils.RabbitMQUtils;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
-
- /**
- 优先队列 生产者
- **/
- public class Product {
- //队列名称
- public static final String ORDER_QUEUE = "ORDER_QUEUE";
-
- //发消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //获取信道
- Connection connection = RabbitMQUtils.getConnection();
- Channel channel = connection.createChannel();
- //参数
- Map<String, Object> argument = new HashMap<>();
- argument.put("x-max-priority", 10);//设置优先级范围0-10,官方允许值是0-255。设置过大会浪费内存
-
- //生成队列,
- //不创建交换机,走默认的交换机
- //1.名称
- //2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)
- //3.队列是否只供一个消费者消费,默认否
- //4.最后一个消费者断开连接后,是否自动删除。
- //5.其他参数
- channel.queueDeclare(ORDER_QUEUE, true, false, false, argument);
- //发消息
- String message = "this is QUEUE_P";
- //持续发送消息
- for (int i = 0; i < 10; i++) {
- //1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)
- //2.路由key,直接写队列名即可
- //3.参数,忽略
- //4.消息体
-
- if (i == 5) {
- AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build(); // priority 优先级
- channel.basicPublish("", ORDER_QUEUE, properties, (message + i).getBytes());
- } else {
- channel.basicPublish("", ORDER_QUEUE, null, (message + i).getBytes());
- }
-
- }
-
- System.out.println("消息发送成功");
- }
- }

-
- /**
- 优先队列 消费者
- **/
- public class Consume {
- //队列名称
- public static final String ORDER_QUEUE = "ORDER_QUEUE";
-
- //发消息
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取信道
- Connection connection = RabbitMQUtils.getConnection();
- Channel channel = connection.createChannel();
-
- //消费者未成功消费时候的回调方法
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));
- //手动应答
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
-
-
- };
- //消费者取消消费的回调方法
- CancelCallback cancelCallback = consumerTag -> {
- System.out.println("消费者取消消费的回调方法");
- };
-
- //消费消息
- //1.队列名
- //2.消费成功后是否自动应答
- //3.消费者成功消费时候的回调
- //4.消费者取消消费的回调方法
- Boolean actoAck = false;
- channel.basicConsume(ORDER_QUEUE, actoAck, deliverCallback, cancelCallback);
- }
- }

先启动生产者,再启动消费者
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。