赞
踩
pom
<!--boot web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring-cloud-starter-stream-rocketmq 2.2.5.RELEASE-->
<!--根据自己部署的mq版本来选择对应的客户端版本-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.8.0</version>
</dependency>
<!-- actuator 不引入会报错哦-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
生产者yaml
spring:
cloud:
stream:
#RocketMQ 通用配置
rocketmq:
binder:
#客户端接入点,必填 rocketMQ的连接地址,binder高度抽象
name-server: localhost:9876
bindings:
# bindings 具体生产消息、消费消息的桥梁
# destination: 指定发送的topic
# content-type: #默认是application/json
# group: 组
# gis_group_out: @Output @Input绑定 @Output绑定就是生产者 @Input绑就是消费者
# consumer:concurrency: 设置消费线程数
gis_group_out:
destination: godown_gis
content-type: application/plain
group: gis_group_out
#consumer:
# concurrency: 20
自定义Source
package com.dist.ytgz.approve.rocketmq;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 自定义Source
*
* @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a>
* @since
*/
public interface MySource {
String GIS_GROUP_OUT = "gis_group_out";
@Output(MySource.GIS_GROUP_OUT)
MessageChannel gisGroupOut();
}
发送消息
// spring cloud stream里面发消息通过 Source 发送
@Autowired
private MySource source;
// 发送消息的方法 withPayload(T payload) 所以是传任意类型, 把对应的CONTENT_TYPE, APPLICATION_JSON设置好就行
source.gisGroupOut().send(
MessageBuilder
.withPayload(messagePayload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()
);
启动类
/**
* 记得添加@EnableBinding注解指定消息信道
* Sink.class 表示信道 input , Source.class 表示信道 output
* 这两个分别与application.yml中的spring.cloud.stream.bindings.input和spring.cloud.stream.bindings.output对应
* 如果信道名称换成是sender和receiver,就得配置spring.cloud.stream.bindings.sender和spring.cloud.stream.bindings.receiver
*
*
* 而我这里是gis_group_out 所以 spring.cloud.stream.bindings.gis_group_out
*/
// @EnableBinding(value = {Sink.class, Source.class})
@EnableBinding(value = Source.class)
@SpringBootApplication
public class ApproveApplication {
public static void main(String[] args) {
SpringApplication.run(ApproveApplication.class, args);
}
}
消费者yaml
spring:
cloud:
stream:
#RocketMQ 通用配置
rocketmq:
binder:
#客户端接入点,必填 rocketMQ的连接地址,binder高度抽象
name-server: localhost:9876
bindings:
gis_group_in:
consumer: # 消息失败直接放入死信队列, 消息不会被重复消费
delayLevelWhenNextConsume: -1
bindings:
# bindings 具体生产消息、消费消息的桥梁
# destination: 指定发送的topic
# content-type: #默认是application/json
# group: 组
# gis_group_out: @Output @Input绑定 @Output绑定就是生产者 @Input绑就是消费者
# consumer.maxAttempts: 1 #默认是3,1表示不重试
gis_group_in:
destination: godown_gis
content-type: application/plain
group: gis_group_in
consumer:
concurrency: 20
maxAttempts: 1
自定义Sink
package com.dist.ic.rocketmq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 自定义Sink
*
* @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a>
* @since
*/
public interface MySink {
String GIS_GROUP_IN = "gis_group_in";
@Input(MySink.GIS_GROUP_IN)
SubscribableChannel gis_group_in();
}
接受消息
package com.dist.ic.rocketmq;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.dist.common.security.vo.UserVO;
import com.dist.ic.service.ICService;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* TODO
*
* @author <a href="mailto:zhangxiao@dist.com.cn">Zhang Xiao</a>
* @since
*/
@Component
public class RocketMessageListener {
private final Logger log = LoggerFactory.getLogger(RocketMessageListener.class);
@Autowired
private ICService icService;
@StreamListener(MySink.GIS_GROUP_IN) // 指定对应的Input管道
public void handlerMessage(MessagePayload message) {
log.info("接收到rocketmq的消息========>{}", JSON.toJSONString(message));
}
}
启动类
/**
* 记得添加@EnableBinding注解指定消息信道
* Sink.class 表示信道 input , Source.class 表示信道 output
* 这两个分别与application.yml中的spring.cloud.stream.bindings.input和spring.cloud.stream.bindings.output对应
* 如果信道名称换成是sender和receiver,就得配置spring.cloud.stream.bindings.sender和spring.cloud.stream.bindings.receiver
*
*
* 而我这里是gis_group_out 所以 spring.cloud.stream.bindings.gis_group_out
*/
// @EnableBinding(value = {Sink.class, Source.class})
@EnableBinding(value = Sink.class)
@SpringBootApplication
public class ICApplication {
public static void main(String[] args) {
SpringApplication.run(ICApplication.class, args);
}
}
重点: RocketMQ相关配置看https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。