赞
踩
一般使用MQ分成两种模式
--springboot
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- host: 127.0.0.1
- username: guest
- password: guest
- @Component
- public class Listener {
-
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "spring.test.queue", durable = "false"),
- exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
- key = "insert"
- )
- )
- public void listen(String msg){
- System.out.println("消费者接受到消息:" + msg);
- }
-
-
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "bbb", durable = "false"),
- exchange = @Exchange(value = "my.exchange")
- )
- )
- public void listenQueue1(String msg){
- System.out.println("bbb:" + msg);
- }
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "ccc", durable = "false"),
- exchange = @Exchange(value = "my.exchange")
- )
- )
- public void listenQueue2(String msg){
- System.out.println("ccc:" + msg);
- }
-
- @RabbitListener(
- queues = {"work"}
- )
- public void listenQueue3(String msg){
- System.out.println("work 1:" + msg);
- }
-
- @RabbitListener(
- queues = {"work"}
- )
- public void listenQueue4(String msg){
- System.out.println("work 2:" + msg);
- }
-
- }

-
- @Resource
- private AmqpTemplate template;
- @Test
- public void testSendMsg1() throws InterruptedException {
- String message = "hello spring bbbbbbbbbbbb";
- template.convertAndSend("my.exchange",null, message);
- System.out.println("生产者发送消息:" + message);
- Thread.sleep(10000);//等待10s,让测试方法延迟结束,防止消费者未来得及获取消息
- }
-
- @Test
- public void testSendMsg2() throws InterruptedException {
- String message = "hello spring aaaaaaaaaaaaaaaaaa work";
- //template.convertAndSend("my.exchange","bbb", message);
- for(int i=1;i<15;i++){
- template.convertAndSend("work",message+" "+i);
- }
-
- System.out.println("生产者发送消息:");
- Thread.sleep(1000);//等待10s,让测试方法延迟结束,防止消费者未来得及获取消息
- }

测试结论是:生产着向指定的exchange发送消息,消费者只要绑定了这个exchange,都可以收到消息!如果生产着不指定exchange(实际走的是默认的),只指定了rouingKey(路由规则,实际执行了queue),这里和消费者绑定的queue是一致的,生产者发送的消息默认会轮询的方式发送给各个消费者,每个消息只能被消费一次
-- 不适用注解的方式,原生方式
- package com.example.rabbit_product;
-
- import com.example.rabbit_product.util.RabbitMqUtil;
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- * @author
- * @date 2021/2/18 18:26
- * @description 绑定exchange
- */
- public class Consumer3 {
- public static void main(String args[]) throws Exception {
- //通过公用方法获取rabbit连接
- Connection connection = RabbitMqUtil.getConnection();
- //创建通道连接
- Channel channel = connection.createChannel();
- //通道绑定队列
- channel.queueBind("workone","my.exchange","");
- //消费消息
-
- channel.basicConsume("workone",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消费者-workone:" + new String(body));
- }
- });
- }
-
- }

- package com.example.rabbit_product;
-
- import com.example.rabbit_product.util.RabbitMqUtil;
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- * @author
- * @date 2021/2/18 18:26
- * @description 只有queue
- */
- public class Consumer1 {
- public static void main(String args[]) throws Exception {
- //通过公用方法获取rabbit连接
- Connection connection = RabbitMqUtil.getConnection();
- //创建通道连接
- Channel channel = connection.createChannel();
- //通道绑定队列
- /**
- * 参数1 queue:队列名称,如果队列不存在则自动创建
- * 参数2 durable:用来定义队列是否需要持久化,true 需要持久化;false 不需要持久化
- * 参数3 exclusive:定义是否独占队列,true 是;false 否
- * 参数4 autodelete:消费完成后是否需要自动删除队列,true 是;false 否
- * 参数5:自定义的额外附加参数
- */
- channel.queueDeclare("work",true,false,false,null);
- //消费消息
- /**
- * 参数1:队列名称,即将要消费哪一个队列的消息
- * 参数2:开启消息的自动确认机制
- * 参数3:消费时的回调接口,常用来处理业务逻辑,一般都是Consumer接口的实现类,此处用默认的DefaultConsumer
- */
- channel.basicConsume("work",true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消费者-1:" + new String(body));
- }
- });
- }
-
- }

- @Test
- public void sendMsg() throws Exception {
- Connection connection = RabbitMqUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare("work", true, false, false, null);
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, ("第(" + i + ")条Work queues").getBytes());
- }
- RabbitMqUtil.getClose(connection, channel);
- }
-
- @Test
- public void sendMsg111() throws Exception {
- Connection connection = RabbitMqUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare("my.exchange", BuiltinExchangeType.DIRECT,true);
- for (int i = 0; i < 10; i++) {
- channel.basicPublish("my.exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN, ("第(" + i + ")条Work queues").getBytes());
- }
- RabbitMqUtil.getClose(connection, channel);
- }

- package com.example.rabbit_product.util;
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import org.springframework.context.annotation.Configuration;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @author
- * @date 2021/2/18 18:21
- * @description todo
- */
- @Configuration
- public class RabbitMqUtil {
- private static ConnectionFactory connectionFactory;
-
- static {
- //第一步:创建rabbitmq的连接工厂
- connectionFactory = new ConnectionFactory();
- //第二步:设置连接的rabbitmq服务器的连接ip
- connectionFactory.setHost("127.0.0.1");
- //第三步:连接端口号,默认的为5672
- connectionFactory.setPort(5672);
- //第四步:设置连接哪个虚拟主机
- //connectionFactory.setVirtualHost("/ems");
- //第五步:设置连接虚拟主机的用户和密码
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- }
-
- //获取rabbitmq的连接
- public static Connection getConnection() {
- //第六步:获取连接对象
- Connection connection = null;
- try {
- connection = connectionFactory.newConnection();
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- return connection;
- }
-
- public static void getClose(Connection connection, Channel channel) {
- try {
- if (channel != null) {
- channel.close();
- }
- if (connection != null) {
- connection.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
-
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。