当前位置:   article > 正文

Spring jms 和 ActiveMQ 开发消息服务_?jms.useasyncsend=true 设置后发不出去消息

?jms.useasyncsend=true 设置后发不出去消息

ActiveMQ 是高性能消息中间件,主要针对JMS实现,当然其他语言也可以使用。其支持点对点、发布/订阅、推拉模式,具体看官网,这里略。


1、先下载ActiveMQ,并成功启动服务。

2、建立maven项目,添加依赖

activemq-all-5.6.0.jar

  1. <dependency>
  2. <groupId>org.springframework</groupId>
  3. <artifactId>spring-jms</artifactId>
  4. <version>3.1.1.RELEASE</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>log4j</groupId>
  3. <artifactId>log4j</artifactId>
  4. <version>1.2.16</version>
  5. </dependency>


3、jms.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:aop="http://www.springframework.org/schema/aop"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  8. http://www.springframework.org/schema/context
  9. http://www.springframework.org/schema/context/spring-context-3.0.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
  12. default-autowire="byName" default-lazy-init="false">
  13. <!-- 连接工厂 -->
  14. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  15. <property name="brokerURL" value="tcp://127.0.0.1:61616?jms.useAsyncSend=true" />
  16. <!-- jms.useAsyncSend=true 指定异步方式,性能比同步方式提升5倍 -->
  17. </bean>
  18. <!-- 消息列队(目的地) -->
  19. <bean id="demoQueue" class="org.apache.activemq.command.ActiveMQQueue">
  20. <constructor-arg value="demo" />
  21. </bean>
  22. </beans>

4、编写发送消息和接收消息的简单java类

  1. public class JmsMQ {
  2. private JmsTemplate jmsTemplate;
  3. private Queue demoQueue;
  4. public void setConnectionFactory(ConnectionFactory connectionFactory) {
  5. this.jmsTemplate = new JmsTemplate(connectionFactory);
  6. // this.jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息不持久化
  7. // this.jmsTemplate.setDeliveryPersistent(false);//消息不持久化
  8. }
  9. public void setDemoQueue(Queue demoQueue) {
  10. this.demoQueue = demoQueue;
  11. }
  12. //发送消息
  13. public void simpleSend(final Long size) {
  14. this.jmsTemplate.send(this.demoQueue, new MessageCreator() {
  15. public Message createMessage(Session session) throws JMSException {
  16. ObjectMessage msg = session.createObjectMessage();
  17. msg.setObject(size);
  18. return msg;
  19. }
  20. });
  21. }
  22. //接收消息
  23. public void simpleReceive() {
  24. Message message = jmsTemplate.receive(demoQueue);
  25. if (message instanceof ObjectMessage) {
  26. ObjectMessage msg = (ObjectMessage) message;
  27. Long size;
  28. try {
  29. size = (Long) msg.getObject();
  30. System.out.println("onMessage [" + size + "]");
  31. } catch (JMSException e) {
  32. e.printStackTrace();
  33. }
  34. } else {
  35. throw new IllegalArgumentException("Message must be of type ObjectMessage.");
  36. }
  37. }
  38. }


5、jmsexample.xml

<bean id="jmsMQ" class="jmsexample.JmsMQ" />

5、编写测试类

  1. public class JmsMQTest {
  2. public static void main(String[] args) {
  3. ApplicationContext ctx = new ClassPathXmlApplicationContext("jms.xml", "jmsexample.xml");
  4. JmsMQ mq = (JmsMQ) ctx.getBean("jmsMQ");
  5. long start = System.currentTimeMillis();
  6. for(long i=0;i<1000;i++){
  7. mq.simpleSend(i);//发送消息,异步方式下1000次发送7000毫秒。
  8. }
  9. System.out.println("耗时:" + (System.currentTimeMillis() - start));
  10. //
  11. // long start1 = System.currentTimeMillis();
  12. // for(long i=0;i<10000;i++){
  13. // mq.simpleReceive();//获取消息
  14. // }
  15. // System.out.println("耗时:" + (System.currentTimeMillis() - start1));
  16. }
  17. }

上面已经完成了JMS开发过程,测试类包含发送消息,接收消息。但是接收消息是主动的,这种方式叫做“拉”模式。


下面是一个监听指定列队上消息的例子,也就是“推”模式:

1、编写监听消息的java类,需要实现接口

  1. public class ExampleListener implements MessageListener {
  2. public void onMessage(Message message) {
  3. if (message instanceof ObjectMessage) {
  4. ObjectMessage msg = (ObjectMessage) message;
  5. Long size;
  6. try {
  7. size = (Long) msg.getObject();
  8. System.out.println("onMessage [" + size + "]");
  9. } catch (JMSException e) {
  10. e.printStackTrace();
  11. }
  12. } else {
  13. throw new IllegalArgumentException(
  14. "Message must be of type ObjectMessage.");
  15. }
  16. }
  17. }

2、jmsexample-listener.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:aop="http://www.springframework.org/schema/aop"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  8. http://www.springframework.org/schema/context
  9. http://www.springframework.org/schema/context/spring-context-3.0.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
  12. default-autowire="byName" default-lazy-init="false">
  13. <!-- this is the Message Driven POJO (MDP) -->
  14. <bean id="exampleListener" class="jmsexample.ExampleListener" />
  15. <!-- and this is the message listener container -->
  16. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  17. <property name="connectionFactory" ref="connectionFactory" />
  18. <property name="destination" ref="demoQueue" />
  19. <property name="messageListener" ref="exampleListener" />
  20. </bean>
  21. </beans>


3、启动消息监听

  1. /**
  2. * 监听消息列队
  3. * */
  4. public class ListenerTest {
  5. public static void main(String[] args) {
  6. new ClassPathXmlApplicationContext("jms.xml", "jmsexample-listener.xml");
  7. }
  8. }

到这里已经讲了推拉模式,发消息和接消息。



另外、Spring对JMS进行了包装,可以将远程方法调用(RPC)封装到JMS中,下面请看JMS实现消息方法调用的过程。

重点:MQ不仅可以缓解系统压力,还可以让系统与系统之间解耦。通过消息传递,可以实现两个系统之间交互。像观察者模式一样,减少了交互对象之间的耦合度。

1、编写接口

  1. public interface CheckingAccountService {
  2. public void cancelAccount(Long accountId);
  3. public void saveAccount(Long accountId);
  4. }

2、编写实现类

  1. public class SimpleCheckingAccountService implements CheckingAccountService {
  2. public void cancelAccount(Long accountId) {
  3. double result = 0D;
  4. for(int i=0; i<accountId; ){
  5. result = 31/++i;
  6. }
  7. System.out.println("Cancelling account [" + accountId + "]" + ",result=" + result);
  8. }
  9. public void saveAccount(Long accountId) {
  10. System.out.println("Saving account [" + accountId + "]");
  11. }
  12. }

3、编写客户端 client.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:aop="http://www.springframework.org/schema/aop"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  8. http://www.springframework.org/schema/context
  9. http://www.springframework.org/schema/context/spring-context-3.0.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
  12. default-autowire="byName" default-lazy-init="false">
  13. <bean id="checkingAccountService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"><!-- 这是个工厂,返回接口代理,将方法调用封装为消息,发送到指定列队 -->
  14. <property name="serviceInterface" value="com.foo.CheckingAccountService"/>
  15. <property name="connectionFactory" ref="connectionFactory"/>
  16. <property name="queue" ref="demoQueue"/>
  17. </bean>
  18. </beans>

4、编写服务端 server.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:aop="http://www.springframework.org/schema/aop"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  8. http://www.springframework.org/schema/context
  9. http://www.springframework.org/schema/context/spring-context-3.0.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
  12. default-autowire="byName" default-lazy-init="false">
  13. <bean id="checkingAccountService" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"><!-- 这是个代理,将接到的消息转换为方法的调用 -->
  14. <property name="serviceInterface" value="com.foo.CheckingAccountService"/>
  15. <property name="service">
  16. <bean class="com.foo.SimpleCheckingAccountService"/>
  17. </property>
  18. </bean>
  19. <!-- 监听指定列队-->
  20. <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
  21. <property name="connectionFactory" ref="connectionFactory"/>
  22. <property name="destination" ref="demoQueue"/>
  23. <property name="concurrentConsumers" value="10"/>
  24. <property name="messageListener" ref="checkingAccountService"/><!-- 当列队有消息,将触发指定方法,形成方法调用 -->
  25. </bean>
  26. </beans>

5、编写测试类  Client.java
  1. /**使用Spring JMS调用服务器端方法*/
  2. public class Client {
  3. public static void main(String[] args) throws Exception {
  4. ApplicationContext ctx = new ClassPathXmlApplicationContext("jms.xml", "client.xml");
  5. CheckingAccountService service = (CheckingAccountService) ctx.getBean("checkingAccountService");
  6. service.cancelAccount(1000000000L);
  7. System.out.println("Invoke cancelAccount 1000000000L!");
  8. long start1 = System.currentTimeMillis();
  9. for(long i=0;i<1000;i++){
  10. service.saveAccount(i);
  11. }
  12. System.out.println("耗时:" + (System.currentTimeMillis() - start1));
  13. }
  14. }

6、编写测试类 Server.java

  1. public class Server {
  2. //启动服务器,监听列队,获得消息,通过反射调用
  3. public static void main(String[] args) throws Exception {
  4. new ClassPathXmlApplicationContext("jms.xml","server.xml");
  5. }
  6. }

快下班了,细节就不写了,反正这个东西很简单,大家一看明了!






声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/58535
推荐阅读
相关标签
  

闽ICP备14008679号