赞
踩
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理,实现无感知中间件编程 使用的都是org.springframework包下的内容,却可以操作RabbitMQ 或者 Kafka。
注意
SubscribableChannel:定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法
MessageChannel:当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法
pom文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.10.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.yinzi</groupId>
- <artifactId>rabbit-stream-receiver</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>rabbit-stream-receiver</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- <version>2.1.3.RELEASE</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>

接收消息的接口
@input:
- public interface ReceiverService {
- //@Input定义输入通道
- @Input("test-exchange")
- SubscribableChannel receive();
- }
实现类
-
- @Service
- @EnableBinding({ReceiverService.class})
- public class ReceiverServiceImpl {
- //@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
- @StreamListener(value = "test-exchange")
- public void onReceive(byte[] msg){
- //处理消息
- System.out.println("ReceiverServiceImpl.onReceive:"+new String(msg));
- }
- }
消息发送方:
- @Service
- public interface SenderService {
- // @Output定义输出通道
- @Output("test-exchange")
- MessageChannel send();
- }
启动类:
- @SpringBootApplication
- @EnableEurekaClient
- @EnableBinding(value = {SenderService.class})
- public class RabbitStreamSenderApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(RabbitStreamSenderApplication.class, args);
- }
-
- }
详细在码云:点我跳转到码云
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。