赞
踩
ActiveMQ 是高性能消息中间件,主要针对JMS实现,当然其他语言也可以使用。其支持点对点、发布/订阅、推拉模式,具体看官网,这里略。
1、先下载ActiveMQ,并成功启动服务。
2、建立maven项目,添加依赖
activemq-all-5.6.0.jar
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jms</artifactId>
- <version>3.1.1.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
- </dependency>
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-3.0.xsd
- http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
- default-autowire="byName" default-lazy-init="false">
-
-
- <!-- 连接工厂 -->
- <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="tcp://127.0.0.1:61616?jms.useAsyncSend=true" />
- <!-- jms.useAsyncSend=true 指定异步方式,性能比同步方式提升5倍 -->
- </bean>
-
-
- <!-- 消息列队(目的地) -->
- <bean id="demoQueue" class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-arg value="demo" />
- </bean>
-
- </beans>

- public class JmsMQ {
-
-
- private JmsTemplate jmsTemplate;
- private Queue demoQueue;
-
-
- public void setConnectionFactory(ConnectionFactory connectionFactory) {
- this.jmsTemplate = new JmsTemplate(connectionFactory);
- // this.jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息不持久化
- // this.jmsTemplate.setDeliveryPersistent(false);//消息不持久化
- }
-
-
- public void setDemoQueue(Queue demoQueue) {
- this.demoQueue = demoQueue;
- }
-
- //发送消息
- public void simpleSend(final Long size) {
- this.jmsTemplate.send(this.demoQueue, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- ObjectMessage msg = session.createObjectMessage();
- msg.setObject(size);
- return msg;
- }
- });
- }
- //接收消息
- public void simpleReceive() {
- Message message = jmsTemplate.receive(demoQueue);
- if (message instanceof ObjectMessage) {
- ObjectMessage msg = (ObjectMessage) message;
- Long size;
- try {
- size = (Long) msg.getObject();
- System.out.println("onMessage [" + size + "]");
- } catch (JMSException e) {
- e.printStackTrace();
- }
- } else {
- throw new IllegalArgumentException("Message must be of type ObjectMessage.");
- }
- }
-
-
- }

5、jmsexample.xml
<bean id="jmsMQ" class="jmsexample.JmsMQ" />
- public class JmsMQTest {
-
-
- public static void main(String[] args) {
- ApplicationContext ctx = new ClassPathXmlApplicationContext("jms.xml", "jmsexample.xml");
- JmsMQ mq = (JmsMQ) ctx.getBean("jmsMQ");
-
- long start = System.currentTimeMillis();
- for(long i=0;i<1000;i++){
- mq.simpleSend(i);//发送消息,异步方式下1000次发送7000毫秒。
- }
- System.out.println("耗时:" + (System.currentTimeMillis() - start));
- //
- // long start1 = System.currentTimeMillis();
- // for(long i=0;i<10000;i++){
- // mq.simpleReceive();//获取消息
- // }
- // System.out.println("耗时:" + (System.currentTimeMillis() - start1));
- }
-
-
- }

上面已经完成了JMS开发过程,测试类包含发送消息,接收消息。但是接收消息是主动的,这种方式叫做“拉”模式。
下面是一个监听指定列队上消息的例子,也就是“推”模式:
1、编写监听消息的java类,需要实现接口
- public class ExampleListener implements MessageListener {
-
- public void onMessage(Message message) {
- if (message instanceof ObjectMessage) {
- ObjectMessage msg = (ObjectMessage) message;
- Long size;
- try {
- size = (Long) msg.getObject();
- System.out.println("onMessage [" + size + "]");
- } catch (JMSException e) {
- e.printStackTrace();
- }
- } else {
- throw new IllegalArgumentException(
- "Message must be of type ObjectMessage.");
- }
- }
-
- }

2、jmsexample-listener.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-3.0.xsd
- http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
- default-autowire="byName" default-lazy-init="false">
-
- <!-- this is the Message Driven POJO (MDP) -->
- <bean id="exampleListener" class="jmsexample.ExampleListener" />
-
- <!-- and this is the message listener container -->
- <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="demoQueue" />
- <property name="messageListener" ref="exampleListener" />
- </bean>
-
- </beans>

3、启动消息监听
- /**
- * 监听消息列队
- * */
- public class ListenerTest {
-
- public static void main(String[] args) {
- new ClassPathXmlApplicationContext("jms.xml", "jmsexample-listener.xml");
- }
-
- }
到这里已经讲了推拉模式,发消息和接消息。
另外、Spring对JMS进行了包装,可以将远程方法调用(RPC)封装到JMS中,下面请看JMS实现消息方法调用的过程。
重点:MQ不仅可以缓解系统压力,还可以让系统与系统之间解耦。通过消息传递,可以实现两个系统之间交互。像观察者模式一样,减少了交互对象之间的耦合度。
1、编写接口
- public interface CheckingAccountService {
-
- public void cancelAccount(Long accountId);
-
- public void saveAccount(Long accountId);
- }
- public class SimpleCheckingAccountService implements CheckingAccountService {
-
- public void cancelAccount(Long accountId) {
- double result = 0D;
- for(int i=0; i<accountId; ){
- result = 31/++i;
- }
-
- System.out.println("Cancelling account [" + accountId + "]" + ",result=" + result);
- }
-
- public void saveAccount(Long accountId) {
- System.out.println("Saving account [" + accountId + "]");
- }
-
- }

- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-3.0.xsd
- http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
- default-autowire="byName" default-lazy-init="false">
-
- <bean id="checkingAccountService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"><!-- 这是个工厂,返回接口代理,将方法调用封装为消息,发送到指定列队 -->
- <property name="serviceInterface" value="com.foo.CheckingAccountService"/>
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="queue" ref="demoQueue"/>
- </bean>
-
- </beans>

- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-3.0.xsd
- http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"
- default-autowire="byName" default-lazy-init="false">
-
- <bean id="checkingAccountService" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"><!-- 这是个代理,将接到的消息转换为方法的调用 -->
- <property name="serviceInterface" value="com.foo.CheckingAccountService"/>
- <property name="service">
- <bean class="com.foo.SimpleCheckingAccountService"/>
- </property>
- </bean>
-
- <!-- 监听指定列队-->
- <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory"/>
- <property name="destination" ref="demoQueue"/>
- <property name="concurrentConsumers" value="10"/>
- <property name="messageListener" ref="checkingAccountService"/><!-- 当列队有消息,将触发指定方法,形成方法调用 -->
- </bean>
-
- </beans>

- /**使用Spring JMS调用服务器端方法*/
- public class Client {
-
- public static void main(String[] args) throws Exception {
- ApplicationContext ctx = new ClassPathXmlApplicationContext("jms.xml", "client.xml");
- CheckingAccountService service = (CheckingAccountService) ctx.getBean("checkingAccountService");
- service.cancelAccount(1000000000L);
- System.out.println("Invoke cancelAccount 1000000000L!");
-
- long start1 = System.currentTimeMillis();
- for(long i=0;i<1000;i++){
- service.saveAccount(i);
- }
- System.out.println("耗时:" + (System.currentTimeMillis() - start1));
- }
- }

- public class Server {
- //启动服务器,监听列队,获得消息,通过反射调用
- public static void main(String[] args) throws Exception {
- new ClassPathXmlApplicationContext("jms.xml","server.xml");
- }
- }
快下班了,细节就不写了,反正这个东西很简单,大家一看明了!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。