赞
踩
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest
按照RocketMQ官网的异步消息生产者中的《2.2 Send Messages Asynchronously》
创建了一个异步消息的生成者。如下:
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
在我启动了namesrv
、broker
和consumer
之后,运行该示例代码。
结果就出现了,无法找到Topic
的异常。
No route info of this topic, TopicTest
通过
mqadmin clusterList -n 127.0.0.1:9876
我发现broker
与namesrv
的连接是没有问题的。
然后我又在通过在mqbroker
的启动参数上增加autoCreateTopicEnable=true
mqbroker -n localhost:9876 autoCreateTopicEnable=true
同样的,问题没有解决。
默认情况下
broker
会自动创建话题
由于是异步消息,所以我猜测是否是由于producer.shutdown()
提前关闭,导致其他异步的线程也受到影响。于是我在producer.shutdown();
的上方使主线程睡眠。
感谢 @wang489687009 斧正
...
}
//Shut down once the producer instance is not longer in use.
Thread.sleep(3000);
producer.shutdown();
}
}
再次运行,发现问题解决。
6 OK AC1619665D2818B4AAC283E0C4500002
4 OK AC1619665D2818B4AAC283E0C4500004
...
通过在RocketMQ官方提供 AsyncProducer.java中我发现,它使用到了CountDownLatch
来解决这个异步问题。
CountDownLatch
类的三个方法
void await()
调用这个方法的线程会被挂起,直到count值为0才继续执行。boolean await(long timeout, TimeUnit unit)
与await()
类似,该方法需要设置超时时间,如果等待超过超时时间,则继续执行。countDown()
将count值减1。通过CountDownLatch
的构造器就可以创建
final CountDownLatch cntLatch = new CountDownLatch(4);
官方代码示例如下
public class AsyncProducer { public static void main( String[] args) throws MQClientException , InterruptedException , UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。