赞
踩
rocketmq-spring的consumer的相关属性配置有两种方式:
关于注解中的属性可以查看:org.apache.rocketmq.spring.annotation.RocketMQMessageListener,而在文件中可以配置的属性只有如下几个(并不遵守spring boot自动配置规范,所以在idea中不会有相关提示)

说明如下:
| 配置项 | 说明 |
|---|---|
| rocketmq.name-server | rocketmq的name server地址,格式:`主机:端口;主机:端口`,多个地址以英文分号分隔 |
| rocketmq.consumer.secret-key | ACL的secret-key属性 |
| rocketmq.consumer.access-key | ACL的access-key属性 |
| 自定义消费轨迹topic,不使用忽略 |
| 枚举类型,值为:【LOCAL, CLOUD】,值为CLOUD表示设置接入阿里云。忽略。 |
如果想要设置最大重试次数等一些相关初始化参数配置,很明显是不支持的。
同时,看一下构造consumer的源码,可以看到只配置了固定的几个属性:
- private void initRocketMQPushConsumer() throws MQClientException {
-
- RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
- this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
- boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
- if (Objects.nonNull(rpcHook)) {
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
- enableMsgTrace, this.applicationContext.getEnvironment().
- resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
- consumer.setVipChannelEnabled(false);
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
- } else {
- log.debug("Access-key or secret-key not configure in " + this + ".");
- consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
- this.applicationContext.getEnvironment().
- resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
- }
-
- String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
- if (customizedNameServer != null) {
- consumer.setNamesrvAddr(customizedNameServer);
- } else {
- consumer.setNamesrvAddr(nameServer);
- }
- if (accessChannel != null) {
- consumer.setAccessChannel(accessChannel);
- }
- consumer.setConsumeThreadMax(consumeThreadMax);
- if (consumeThreadMax < consumer.getConsumeThreadMin()) {
- consumer.setConsumeThreadMin(consumeThreadMax);
- }
- consumer.setConsumeTimeout(consumeTimeout);
-
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
-
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
-
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
-
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
- } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
- }
-
- }

但是看代码的最后几行,rocketMQListener如果实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListener的prepareStart(consumer)方法,很明显,可以在这里设置consuemr的参数。
说明:rocketMQListener就是类上带有RocketMQMessageListener的bean。
- @RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*")
- class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
-
- @Override
- public void onMessage(String message) {
- LOGGER.info("receive message: {}", message);
- }
-
- @Override public void prepareStart(DefaultMQPushConsumer consumer) {
- // 设置最大重试次数
- consumer.setMaxReconsumeTimes(5);
- // 如下,设置其它consumer相关属性
- consumer.setPullBatchSize(16);
- }
- }
我是在翻源码的才想到这个解决方案,我想既然提供有这个接口进行自定义配置,官方文档应该会有示例说明,然后翻了下github,是有类似的使用方式的,源码上还有其它示例,如果有其它问题,建议还是先看官方示例是否提供了相关解决方案。github地址:https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。