赞
踩
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
#配置RabbitMQ的相关连接信息(单机版)
spring.rabbitmq.host=192.168.2.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
package com.springboot.rabbitmq.send.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * ClassName:RabbitMQConfig * Package:com.springboot.rabbitmq.send.config * Description: * * @Date:2021/6/18 10:20 * @Author:hm */ @Configuration public class RabbitMQConfig { //配置direct交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } //配置direct队列 @Bean public Queue directQueue(){ return new Queue("directQueue"); } //将direct队列绑定到交换机上 @Bean public Binding directBinding(Queue directQueue,DirectExchange directExchange){ // 参数 1 为需要绑定的队列 // 参数 2 为需要绑定的交换机 // 参数 3绑定时的RoutingKey return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting"); } //配置Fanout交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } //配置Topic交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } }
package com.springboot.rabbitmq.send.service.impl; import com.springboot.rabbitmq.send.service.SendService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * ClassName:SendMessageImpl * Package:com.springboot.rabbitmq.send.service.impl * Description: * * @Date:2021/6/18 10:17 * @Author:hm */ @Service("sendService") public class SendServiceImpl implements SendService{ @Resource private AmqpTemplate amqpTemplate; @Override public void sendDirectMessage(String message) { /** * 发送direct消息 * 参数一为交换机名称 * 参数二为路由键名称 * 参数三为消息 */ amqpTemplate.convertAndSend("directExchange","directRouting",message); } @Override public void sendFanoutMessage(String message) { amqpTemplate.convertAndSend("fanoutExchange","",message); } @Override public void sendTopicMessage(String message) { //amqpTemplate.convertAndSend("topicExchange","aa",message); amqpTemplate.convertAndSend("topicExchange","aa.bb",message); //amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message); } }
package com.springboot.rabbitmq.send; import com.springboot.rabbitmq.send.service.SendService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; @SpringBootApplication public class SendApplication { public static void main(String[] args) { ApplicationContext ac = SpringApplication.run(SendApplication.class, args); SendService sendService = (SendService) ac.getBean("sendService"); //sendService.sendDirectMessage("directMessage测试消息"); //sendService.sendFanoutMessage("fanoutMessage测试消息"); sendService.sendTopicMessage("topicMessage测试消息"); } }
#配置RabbitMQ的相关连接信息(单机版)
spring.rabbitmq.host=192.168.2.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
package com.springboot.rabbitmq.receive.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * ClassName:RabbitMQConfig * Package:com.springboot.rabbitmq.send.config * Description: * * @Date:2021/6/18 10:20 * @Author:hm */ @Configuration public class RabbitMQConfig { //配置direct交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } //配置direct队列 @Bean public Queue directQueue(){ return new Queue("directQueue"); } //将direct队列绑定到交换机上 @Bean public Binding directBinding(Queue directQueue,DirectExchange directExchange){ // 参数 1 为需要绑定的队列 // 参数 2 为需要绑定的交换机 // 参数 3绑定时的RoutingKey return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting"); } //配置一个 Fanout类型的交换 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } }
package com.springboot.rabbitmq.receive.service.impl; import com.springboot.rabbitmq.receive.service.ReceiveService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * ClassName:ReceiveImpl * Package:com.springboot.rabbitmq.receive.service.impl * Description: * * @Date:2021/6/18 10:38 * @Author:hm */ @Service("receiveService") public class ReceiveServiceImpl implements ReceiveService{ @Resource private AmqpTemplate amqpTemplate; /** * @RabbitListener 注解用于标记当前方法是一个RabbitMQ的消息监听方法,作用是持续性的自动接收消息 * 这个方法不需要手动调用Spring会自动运行这个监听 * 属性 * queues 用于指定一个已经存在的队列名,用于进行队列的监听 * @param message 接收到的具体的消息数据 * * 注意:如果当前监听方法正常结束Spring就会自动确认消息,如果出现异常则不会确认消息 * 因此在消息处理时我们需要做好消息的防止重复处理工作 */ @Override @RabbitListener(queues = {"directQueue"}) public void directReceive(String message) { System.out.println("direct监听器 接收消息:" + message); } /** * @QueueBinding注解要完成队列和交换机的 * @Queue创建一个队列(没有指定参数则表示创建一个随机队列) * @Exchange创建一个交换机 * @param message * */ @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))}) public void fanoutReceive1(String message){ System.out.println("fanout1监听器 接收消息:" + message); } @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))}) public void fanoutReceive2(String message){ System.out.println("fanout2监听器 接收消息:" + message); } @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic01"),key = {"aa"},exchange =@Exchange(name = "topicExchange",type = "topic"))}) public void topicReceive01(String message){ System.out.println("topic01消费者 ---aa---"+message ); } @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic02"),key = {"aa.*"},exchange =@Exchange(name = "topicExchange",type = "topic"))}) public void topicReceive02(String message){ System.out.println("topic02消费者 ---aa.*---"+message ); } @RabbitListener(bindings = {@QueueBinding(value=@Queue("topic03"),key = {"aa.#"},exchange =@Exchange(name = "topicExchange",type = "topic"))}) public void topicReceive03(String message){ System.out.println("topic03消费者 ---aa.#---"+message ); } }
注意:需要先启动接收端,这样再启动发送端,就会实时监听到发送的消息并被接收端接收到
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。