当前位置:   article > 正文

SpringBoot集成RabbitMQ的三种方式发送接收消息_springboot两个微服务之间通过rabbitmq收发消息

springboot两个微服务之间通过rabbitmq收发消息

SpringBoot集成RabbitMQ的三种方式发送接收消息

一 引入依赖

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

二 发送端

  1. 配置文件
#配置RabbitMQ的相关连接信息(单机版)
spring.rabbitmq.host=192.168.2.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 配置类
package com.springboot.rabbitmq.send.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ClassName:RabbitMQConfig
 * Package:com.springboot.rabbitmq.send.config
 * Description:
 *
 * @Date:2021/6/18 10:20
 * @Author:hm
 */

@Configuration
public class RabbitMQConfig {


    //配置direct交换机
    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange("directExchange");
    }

    //配置direct队列
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }

    //将direct队列绑定到交换机上
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange){

        // 参数 1 为需要绑定的队列
        // 参数 2 为需要绑定的交换机
        // 参数 3绑定时的RoutingKey
        return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting");
    }


    //配置Fanout交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    //配置Topic交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  1. 实现代码
package com.springboot.rabbitmq.send.service.impl;

import com.springboot.rabbitmq.send.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * ClassName:SendMessageImpl
 * Package:com.springboot.rabbitmq.send.service.impl
 * Description:
 *
 * @Date:2021/6/18 10:17
 * @Author:hm
 */
@Service("sendService")
public class SendServiceImpl implements SendService{

    @Resource
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendDirectMessage(String message) {
        /**
         * 发送direct消息
         * 参数一为交换机名称
         * 参数二为路由键名称
         * 参数三为消息
         */
        amqpTemplate.convertAndSend("directExchange","directRouting",message);
    }

    @Override
    public void sendFanoutMessage(String message) {
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    }

    @Override
    public void sendTopicMessage(String message) {
        //amqpTemplate.convertAndSend("topicExchange","aa",message);
        amqpTemplate.convertAndSend("topicExchange","aa.bb",message);
        //amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  1. 启动类
package com.springboot.rabbitmq.send;

import com.springboot.rabbitmq.send.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SendApplication {

	public static void main(String[] args) {
		ApplicationContext ac = SpringApplication.run(SendApplication.class, args);
		SendService sendService = (SendService) ac.getBean("sendService");
		//sendService.sendDirectMessage("directMessage测试消息");
		//sendService.sendFanoutMessage("fanoutMessage测试消息");
		sendService.sendTopicMessage("topicMessage测试消息");
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

三 接收端

  1. 配置文件
#配置RabbitMQ的相关连接信息(单机版)
spring.rabbitmq.host=192.168.2.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 配置类
package com.springboot.rabbitmq.receive.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ClassName:RabbitMQConfig
 * Package:com.springboot.rabbitmq.send.config
 * Description:
 *
 * @Date:2021/6/18 10:20
 * @Author:hm
 */

@Configuration
public class RabbitMQConfig {


    //配置direct交换机
    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange("directExchange");
    }

    //配置direct队列
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }

    //将direct队列绑定到交换机上
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange){

        // 参数 1 为需要绑定的队列
        // 参数 2 为需要绑定的交换机
        // 参数 3绑定时的RoutingKey
        return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting");
    }


    //配置一个 Fanout类型的交换
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  1. 实现代码
package com.springboot.rabbitmq.receive.service.impl;

import com.springboot.rabbitmq.receive.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * ClassName:ReceiveImpl
 * Package:com.springboot.rabbitmq.receive.service.impl
 * Description:
 *
 * @Date:2021/6/18 10:38
 * @Author:hm
 */

@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService{

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * @RabbitListener 注解用于标记当前方法是一个RabbitMQ的消息监听方法,作用是持续性的自动接收消息
     * 这个方法不需要手动调用Spring会自动运行这个监听
     * 属性
     *   queues 用于指定一个已经存在的队列名,用于进行队列的监听
     * @param message  接收到的具体的消息数据
     *
     * 注意:如果当前监听方法正常结束Spring就会自动确认消息,如果出现异常则不会确认消息
     *      因此在消息处理时我们需要做好消息的防止重复处理工作
     */
    @Override
    @RabbitListener(queues = {"directQueue"})
    public void directReceive(String message) {

        System.out.println("direct监听器 接收消息:" + message);

    }


    /**
     * @QueueBinding注解要完成队列和交换机的
     * @Queue创建一个队列(没有指定参数则表示创建一个随机队列)
     * @Exchange创建一个交换机
     * @param message
     *
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))})
    public void fanoutReceive1(String message){

        System.out.println("fanout1监听器 接收消息:" + message);

    }
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))})
    public void fanoutReceive2(String message){

        System.out.println("fanout2监听器 接收消息:" + message);

    }

        @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic01"),key = {"aa"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
    public void  topicReceive01(String message){
        System.out.println("topic01消费者 ---aa---"+message );
    }
    @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic02"),key = {"aa.*"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
    public void  topicReceive02(String message){
        System.out.println("topic02消费者 ---aa.*---"+message );
    }
    @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic03"),key = {"aa.#"},exchange =@Exchange(name = "topicExchange",type = "topic"))})
    public void  topicReceive03(String message){
        System.out.println("topic03消费者 ---aa.#---"+message );
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

注意:需要先启动接收端,这样再启动发送端,就会实时监听到发送的消息并被接收端接收到

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/399199
推荐阅读
相关标签
  

闽ICP备14008679号