当前位置:   article > 正文

RabbitMq springboot版本与原生引用_spring-boot-starter-amqp 2.6.9 与springboot版本对应

spring-boot-starter-amqp 2.6.9 与springboot版本对应

springboot版本

一般使用MQ分成两种模式

  • workqueues 模式,Q消息只会发送到一个队列,该队列连接着多个消费者,单同一个Q消息只能被消费一次
  • exchange 模式,Q消息可以按照一定规则分发到多个队列,每个监听改队列的消费者都可以收到Q消息,类似与广播

--springboot

  1. 首先,启动MQ服务,默认的账号和密码是 guest guest
  2. 在你的项目中引入依赖
    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
  3. 在配置文件中如下rabbitMq的配置,这里使用的yml
    1. spring:
    2. rabbitmq:
    3. host: 127.0.0.1
    4. username: guest
    5. password: guest

     

  4. 接下来是消费者,使用注解的方式声名消费者
    1. @Component
    2. public class Listener {
    3. @RabbitListener(
    4. bindings = @QueueBinding(
    5. value = @Queue(value = "spring.test.queue", durable = "false"),
    6. exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),
    7. key = "insert"
    8. )
    9. )
    10. public void listen(String msg){
    11. System.out.println("消费者接受到消息:" + msg);
    12. }
    13. @RabbitListener(
    14. bindings = @QueueBinding(
    15. value = @Queue(value = "bbb", durable = "false"),
    16. exchange = @Exchange(value = "my.exchange")
    17. )
    18. )
    19. public void listenQueue1(String msg){
    20. System.out.println("bbb:" + msg);
    21. }
    22. @RabbitListener(
    23. bindings = @QueueBinding(
    24. value = @Queue(value = "ccc", durable = "false"),
    25. exchange = @Exchange(value = "my.exchange")
    26. )
    27. )
    28. public void listenQueue2(String msg){
    29. System.out.println("ccc:" + msg);
    30. }
    31. @RabbitListener(
    32. queues = {"work"}
    33. )
    34. public void listenQueue3(String msg){
    35. System.out.println("work 1:" + msg);
    36. }
    37. @RabbitListener(
    38. queues = {"work"}
    39. )
    40. public void listenQueue4(String msg){
    41. System.out.println("work 2:" + msg);
    42. }
    43. }

     

  5. 生产者
    1. @Resource
    2. private AmqpTemplate template;
    3. @Test
    4. public void testSendMsg1() throws InterruptedException {
    5. String message = "hello spring bbbbbbbbbbbb";
    6. template.convertAndSend("my.exchange",null, message);
    7. System.out.println("生产者发送消息:" + message);
    8. Thread.sleep(10000);//等待10s,让测试方法延迟结束,防止消费者未来得及获取消息
    9. }
    10. @Test
    11. public void testSendMsg2() throws InterruptedException {
    12. String message = "hello spring aaaaaaaaaaaaaaaaaa work";
    13. //template.convertAndSend("my.exchange","bbb", message);
    14. for(int i=1;i<15;i++){
    15. template.convertAndSend("work",message+" "+i);
    16. }
    17. System.out.println("生产者发送消息:");
    18. Thread.sleep(1000);//等待10s,让测试方法延迟结束,防止消费者未来得及获取消息
    19. }

     

  6. 测试结论是:生产着向指定的exchange发送消息,消费者只要绑定了这个exchange,都可以收到消息!如果生产着不指定exchange(实际走的是默认的),只指定了rouingKey(路由规则,实际执行了queue),这里和消费者绑定的queue是一致的,生产者发送的消息默认会轮询的方式发送给各个消费者,每个消息只能被消费一次

-- 不适用注解的方式,原生方式

  1. 消费者1
    1. package com.example.rabbit_product;
    2. import com.example.rabbit_product.util.RabbitMqUtil;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * @author
    7. * @date 2021/2/18 18:26
    8. * @description 绑定exchange
    9. */
    10. public class Consumer3 {
    11. public static void main(String args[]) throws Exception {
    12. //通过公用方法获取rabbit连接
    13. Connection connection = RabbitMqUtil.getConnection();
    14. //创建通道连接
    15. Channel channel = connection.createChannel();
    16. //通道绑定队列
    17. channel.queueBind("workone","my.exchange","");
    18. //消费消息
    19. channel.basicConsume("workone",true,new DefaultConsumer(channel){
    20. @Override
    21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    22. System.out.println("消费者-workone:" + new String(body));
    23. }
    24. });
    25. }
    26. }

     

  2. 消费者2
    1. package com.example.rabbit_product;
    2. import com.example.rabbit_product.util.RabbitMqUtil;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * @author
    7. * @date 2021/2/18 18:26
    8. * @description 只有queue
    9. */
    10. public class Consumer1 {
    11. public static void main(String args[]) throws Exception {
    12. //通过公用方法获取rabbit连接
    13. Connection connection = RabbitMqUtil.getConnection();
    14. //创建通道连接
    15. Channel channel = connection.createChannel();
    16. //通道绑定队列
    17. /**
    18. * 参数1 queue:队列名称,如果队列不存在则自动创建
    19. * 参数2 durable:用来定义队列是否需要持久化,true 需要持久化;false 不需要持久化
    20. * 参数3 exclusive:定义是否独占队列,true 是;false 否
    21. * 参数4 autodelete:消费完成后是否需要自动删除队列,true 是;false 否
    22. * 参数5:自定义的额外附加参数
    23. */
    24. channel.queueDeclare("work",true,false,false,null);
    25. //消费消息
    26. /**
    27. * 参数1:队列名称,即将要消费哪一个队列的消息
    28. * 参数2:开启消息的自动确认机制
    29. * 参数3:消费时的回调接口,常用来处理业务逻辑,一般都是Consumer接口的实现类,此处用默认的DefaultConsumer
    30. */
    31. channel.basicConsume("work",true,new DefaultConsumer(channel){
    32. @Override
    33. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    34. System.out.println("消费者-1:" + new String(body));
    35. }
    36. });
    37. }
    38. }

     

  3. 生产者
    1. @Test
    2. public void sendMsg() throws Exception {
    3. Connection connection = RabbitMqUtil.getConnection();
    4. Channel channel = connection.createChannel();
    5. channel.queueDeclare("work", true, false, false, null);
    6. for (int i = 0; i < 10; i++) {
    7. channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, ("第(" + i + ")条Work queues").getBytes());
    8. }
    9. RabbitMqUtil.getClose(connection, channel);
    10. }
    11. @Test
    12. public void sendMsg111() throws Exception {
    13. Connection connection = RabbitMqUtil.getConnection();
    14. Channel channel = connection.createChannel();
    15. channel.exchangeDeclare("my.exchange", BuiltinExchangeType.DIRECT,true);
    16. for (int i = 0; i < 10; i++) {
    17. channel.basicPublish("my.exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN, ("第(" + i + ")条Work queues").getBytes());
    18. }
    19. RabbitMqUtil.getClose(connection, channel);
    20. }

     

    1. package com.example.rabbit_product.util;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import org.springframework.context.annotation.Configuration;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * @author
    10. * @date 2021/2/18 18:21
    11. * @description todo
    12. */
    13. @Configuration
    14. public class RabbitMqUtil {
    15. private static ConnectionFactory connectionFactory;
    16. static {
    17. //第一步:创建rabbitmq的连接工厂
    18. connectionFactory = new ConnectionFactory();
    19. //第二步:设置连接的rabbitmq服务器的连接ip
    20. connectionFactory.setHost("127.0.0.1");
    21. //第三步:连接端口号,默认的为5672
    22. connectionFactory.setPort(5672);
    23. //第四步:设置连接哪个虚拟主机
    24. //connectionFactory.setVirtualHost("/ems");
    25. //第五步:设置连接虚拟主机的用户和密码
    26. connectionFactory.setUsername("guest");
    27. connectionFactory.setPassword("guest");
    28. }
    29. //获取rabbitmq的连接
    30. public static Connection getConnection() {
    31. //第六步:获取连接对象
    32. Connection connection = null;
    33. try {
    34. connection = connectionFactory.newConnection();
    35. } catch (IOException | TimeoutException e) {
    36. e.printStackTrace();
    37. }
    38. return connection;
    39. }
    40. public static void getClose(Connection connection, Channel channel) {
    41. try {
    42. if (channel != null) {
    43. channel.close();
    44. }
    45. if (connection != null) {
    46. connection.close();
    47. }
    48. } catch (IOException e) {
    49. e.printStackTrace();
    50. } catch (TimeoutException e) {
    51. e.printStackTrace();
    52. }
    53. }
    54. }

     

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/956422
推荐阅读
相关标签
  

闽ICP备14008679号