赞
踩
消息队列的完整使用场景中至少包含三个角色:
可以看到,消息队列服务的核心是消息处理中心,它至少要具备消息发送、消息接收和消息暂存功能。所以,从消息处理中心开始逐步搭建一个消息队列。
消息处理中心类(Broker)的实现:
package org.study.mq.myMq; import java.util.concurrent.ArrayBlockingQueue; /** * 消息处理中心 */ public class Broker { // 队列存储消息的最大数量 private final static int MAX_SIZE = 3; // 保存消息数据的容器 private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); // 生产消息 public static void produce(String msg) { if (messageQueue.offer(msg)) { System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size()); } else { System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!"); } System.out.println("======================="); } // 消费消息 public static String consume() { String msg = messageQueue.poll(); if (msg != null) { // 消费条件满足情况,从消息容器中取出一条消息 System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size()); } else { System.out.println("消息处理中心内没有消息可供消费!"); } System.out.println("======================="); return msg; } }
有了消息处理中心类之后,需要将该类的功能暴露出去,这样别人才能用它来发送和接收消息。所以,定义BrokerServer类用来对外提供Broker类的服务
package org.study.mq.myMq; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; /** * 用于启动消息处理中心 */ public class BrokerServer implements Runnable { public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerServer(Socket socket) { this.socket = socket; } @Override public void run() { try ( BufferedReader in = new BufferedReader(new InputStreamReader( socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()) ) { while (true) { String str = in.readLine(); if (str == null) { continue; } System.out.println("接收到原始数据:" + str); if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息 //从消息队列中消费一条消息 String message = Broker.consume(); out.println(message); out.flush(); } else { //其他情况都表示生产消息放到消息队列中 Broker.produce(str); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { ServerSocket server = new ServerSocket(SERVICE_PORT); while (true) { BrokerServer brokerServer = new BrokerServer(server.accept()); new Thread(brokerServer).start(); } } }
客户端访问
有了消息处理中心后,自然需要有相应的客户端与之通信俩发送和接收消息。
package org.study.mq.myMq; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; /** * 访问消息队列的客户端 */ public class MqClient { //生产消息 public static void produce(String message) throws Exception { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try ( PrintWriter out = new PrintWriter(socket.getOutputStream()) ) { out.println(message); out.flush(); } } //消费消息 public static String consume() throws Exception { Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT); try ( BufferedReader in = new BufferedReader(new InputStreamReader( socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()) ) { //先向消息队列发送命令 out.println("CONSUME"); out.flush(); //再从消息队列获取一条消息 String message = in.readLine(); return message; } } }
生产消息:
package org.study.mq.myMq;
public class ProduceClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
client.produce("Hello World");
}
}
消费消息:
package org.study.mq.myMq;
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
String message = client.consume();
System.out.println("获取的消息为:" + message);
}
}
RabbitMQ是一个由Erlang语言开发的基于AMQP标准的开源实现。
保证可靠性(Reliablity)
具有灵活的路由(Flexible Routing)功能
支持消息集群(Clustering)
具有高可用性(Highly Available)
支持多种协议(Multi-protocol)
支持多语言客户端(Many Client)。几乎支持所有常用的语言
提供管理界面(Management UI)
提供跟踪机制(Tracing)
提供插件机制(Plugin System)
RabbitMQ整体架构图
(1)AMQP 消息路由
AMQP中的消息路由过程和Java开发者熟悉的JMS存在一些差别,在AMQP中增加了Exchange和Binding的角色。生产者需要把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器上的消息应该被发送到哪个队列中
(2)交换器类型
不同类型的交换器分发消息的策略也不同,目前交换器有4种类型:Direct、Fanout、Topic、Headers。其中Headers交换器匹配AMQP消息的Header而不是路由键。此外,Headers交换器和Direct交换器完全一致,但是性能相差很多,目前几乎不用了。
如果消息中的路由键(routing key)和Binding中的绑定键(binding key)一致,交换器就将消息发送到对应的队列中。路由键与队列名称要完全匹配,如果将一个队列绑定到交换机要求路由键为"dog",则只转发routing key 标记为"dog"的消息,不会转发"dog.puppy"消息,也不会转发"dog.guard"消息等。Direct交换器是完全匹配、单播的模式。
Fanout交换器不处理路由键,只是简单地将队列绑定到交换器,发送到交换器的每条消息都会被转发到与该交换器绑定的所有队列中。这很像子网广播,子网内的每个主机都获得了一份复制的消息。通过Fanou交换器转发消息是最快。
Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,此时队列需要绑定一种模式。Topic交换器将路由键和绑定键的字符串切分成单词,这些单词之间用点".“隔开。该交换器会识别两个通配符:“#”和”*",其中"#“匹配0个或多个单词,”*"匹配一个单词。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
package com.study.mq.rabbitMQ.java; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); factory.setVirtualHost("/"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //创建信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "testRoutingKey"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }
首先创建一个连接工厂,再根据工厂创建连接,之后从连接中创建信道,接着声明一个交换器和指定路由键,然后才是发布消息,最后将所创建的信道、连接等资源关闭。代码中的ConnectionFactory、Connection、Channel都是RabbitMQ提供的API中最基本的类。ConnectionFactory是Connection的制造工厂,Connection代表RabbitMQ的Socket连接,它封装了Socket操作的相关逻辑。Channel是与RabbitMQ打交道的最重要的接口,大部分业务操作都是在Channel中完成的,比如定义队列、定义交换器、队列与交换器的绑定、发布消息等。
package com.study.mq.rabbitMQ.java; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setVirtualHost("/"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //创建信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "testRoutingKey"; //绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while (true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }
消息消费者通过不断循环等待服务器推送消息,一旦有消息过来,就在控制台输出消息的相关内容。一开始的创建连接、创建信道、声明交换器的代码和发布消息时是一样的,但在消费消息时需要指定队列名称,所以这里多了绑定队列这一步,接下来是循环等待消息过来并打印消息内容。
D:\rabbitmq_server3.8.3\rabbitmq_server-3.8.3\sbin\rabbitmq-server
可以访问http://localhost:15672/
先运行Consumer的main方法,这样当生产者发送消息时就能在消费者后端看到消息记录
接下来运行Producer的main方法,发布一条消息,在Consumer的控制台就能看到接收到的消息
从上面的Java访问RabbitMQ实例中可以看出,在消息消费者和消息生产者中有很多重复的代码,并且里面很多都是配置信息。使用Spring框架集成RabbitMQ,简化使用RabbitMQ时的操作。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean id="fooMessageListener" class="com.study.mq.rabbitMQ.spring.FooMessageListener"/> <!--配置连接--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/> <!--配置 RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="foo.bar"/> <!--配置 RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!--配置队列名--> <rabbit:queue name="myQueue"/> <!--配置 topic 类型 Exchange --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*"/> </rabbit:bindings> </rabbit:topic-exchange> <!--配置监听--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="fooMessageListener" queue-names="myQueue" /> </rabbit:listener-container> </beans>
spring-rabbit的主要API如下:
所以,与RabbitMQ相关的配置也会包括配置连接、配置RabbitTemplate、配置RabbitAdmin、配置队列名称、配置交换器、配置监听器等。
package com.study.mq.rabbitMQ.spring; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SendMessage { public static void main(final String... args) throws Exception { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml"); RabbitTemplate template = ctx.getBean(RabbitTemplate.class); template.convertAndSend("Hello World"); ctx.close(); } }
在发送消息时先从配置文件中获取到RabbitTemplate对象,接着就调用convertAndSend发送消息。可以看到,这段代码比上面使用RabbitMQ的Java API简单了很多
package com.study.mq.rabbitMQ.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FooMessageListener implements MessageListener {
public void onMessage(Message message) {
String messageBody = new String(message.getBody());
System.out.println("接收到消息 '" + messageBody + "'");
}
}
通过实现MessageListener 接口来监听消息的方式消费消息。注意:在配置文件中将声明FooMessageListener的一个bean,然后在rabbit:listen的配置中引用该bean。
运行SendMessage类的main方法,在控制台将看到打印出接收到的消息’Hello World’
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean id="mailMessageListener" class="com.study.mq.rabbitMQ.async.MailMessageListener"/> <!--配置连接--> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!--配置 RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="mailExchange" routing-key="mail.test" message-converter="jsonMessageConverter"/> <!--配置 RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!--配置队列名--> <rabbit:queue name="mailQueue"/> <!--配置 Exchange --> <rabbit:topic-exchange name="mailExchange"> <rabbit:bindings> <rabbit:binding queue="mailQueue" pattern="mail.*"/> </rabbit:bindings> </rabbit:topic-exchange> <!--配置监听--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="mailMessageListener" queue-names="mailQueue" /> </rabbit:listener-container> </beans>
与上节不同的是,这里增加了一个消息转换器的配置,因为需要发送一条自定义类型的对象消息,所以使用Jackson2JsonMessageConvert将对象转换为JSON格式来传递。
package com.study.mq.rabbitMQ.async; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 业务处理 */ public class Business { //用户注册 public void userRegister(){ //校验用户填写的信息是否完整 //将用户及相关信息保存到数据库 //注册成功后发送一条消息表示要发送邮件 AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("async-context.xml"); RabbitTemplate template = ctx.getBean(RabbitTemplate.class); Mail mail = new Mail(); mail.setTo("12345678@qq.com"); mail.setSubject("我的一封邮件"); mail.setContent("我的邮件内容"); template.convertAndSend(mail); ctx.close(); } public static void main(final String... args) throws Exception { Business business = new Business(); business.userRegister(); } }
发送消息就是把上面注册成功后发送邮件的代码改成了发送一条消息的代码
package com.study.mq.rabbitMQ.async; public class Mail{ private String from;//发件人 private String to;//收件人 private String subject;//邮件标题 private String content;//邮件内容 public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return "Mail{" + "from='" + from + '\'' + ", to='" + to + '\'' + ", subject='" + subject + '\'' + ", content='" + content + '\'' + '}'; } }
package com.study.mq.rabbitMQ.async; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import java.io.IOException; public class MailMessageListener implements MessageListener { public void onMessage(Message message) { String body = new String(message.getBody()); ObjectMapper mapper = new ObjectMapper(); try { Mail mail = mapper.readValue(body, Mail.class); System.out.println("接收到邮件消息:" + mail); sendEmail(mail); } catch (IOException e) { e.printStackTrace(); } } public void sendEmail(Mail mail) { //调用 JavaMail API 发送邮件 } }
把上面实际发送邮件的代码挪到了消费消息这里,只有收到了邮件消息才会实际发送邮件
运行Business类的main方法,在控制台将看到打印出接收到的邮件消息:接收到邮件消息:Mail{from=‘null’, to=‘12345678@qq.com’, subject=‘我的一封邮件’, content=‘我的邮件内容’},这样就将原先同步调用的代码通过使用消息队列改成了异步处理的方式。
以前,浏览器中的推送功能都是通过轮询来实现的。所谓轮询是指以特定时间间隔(如每隔1s)由浏览器向服务器发出请求,然后服务器返回最新的数据给浏览器。但这种模式的缺点是浏览器需要不断向服务器发出请求,每次请求中的绝大多数数据都是相同的,里面包含的有效数据可能只是很小的一部分,这导致占用很多带宽,而且不断地连接将大量消耗服务器资源,为了改善这种状况,HTML5定义了WebSocket,它能够实现浏览器与服务器之间全双工通信。其优点有两个:一是服务器与客户端之间交换的标头信息很小;二是服务器可以主动传送数据给客户端。
本例属于一个生产者、多个消费者的模式,所以采用Topic交换器,这样一个由生产者指定了确定路由键的消息将会被推送给所有与之绑定的消费者。
安装RabbitMQ后,在sbin目录下执行以下命令:
./rabbitmq-plugins enable rabbitmq_web_stomp
为了模拟服务端产生消息,写一个main函数向指定的队列中发布消息
package com.study.mq.rabbitMQ.java; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class StompProducer { public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); factory.setVirtualHost("/"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //创建信道 Channel channel = conn.createChannel(); String exchangeName = "exchange-stomp"; channel.exchangeDeclare(exchangeName, "topic"); String routingKey = "shopping.discount"; String message = "<a href=\"https://www.baidu.com\" target=\"_black\">微醺好时光,美酒3件7折,抢购猛戳</a>"; //发布消息 channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); channel.close(); conn.close(); } }
创建连接和创建信道与2.2.1节相同,不同的是声明交换器时指定的是Topic类型
<html> <head> <meta charset="UTF-8"> <title>rabbitMQ 消息提醒示例</title> <link rel="stylesheet" type="text/css" href="default.css"> <link rel="stylesheet" type="text/css" href="jquery.notify.css"> <script type="text/javascript" src="stomp.js"></script> <script type="text/javascript" src="jquery.min.js"></script> <script type="text/javascript" src="jquery.notify.js"></script> </head> <script type="text/javascript"> $(function () { //设置消息提醒声音 $.notifySetup({sound: 'jquery.notify.wav'}); //创建客户端 var client = Stomp.client("ws://localhost:15674/ws"); //定义连接成功回调函数 var onConnect = function () { //订阅商品折扣主题的消息 client.subscribe("/exchange/exchange-stomp/shopping.discount", function (message) { //弹出业务消息提醒,并停留10秒 $("<p>" + message.body + "</p>").notify({stay: 10000}); }); }; //定义错误时回调函数 var onError = function (msg) { $("<p>服务器错误:" + msg + "</p>").notify("error"); }; //连接服务器 client.connect("guest", "guest", onConnect, onError); client.heartbeat.incoming = 5000; client.heartbeat.outgoing = 5000; }); </script> <body> </body> </html>
为了展示消息提醒效果,使用jQuery的Notify插件。在JavaScript中与STOMP服务器通信,首先要创建一个STOMP服务器通信,这就需要调用Stomp.client(URL)函数,该函数的参数表示服务器的WebSocket Endpoint的 URI,在stomp.js中使用ws://URL格式。例子中的localhost是RabbitMQ服务器的地址,在实际使用时可以改成服务器的IP地址,Web STOMP插件默认监听15674端口。
有了客户端对象,接着就是连接服务器,在stomp.js中用connect函数连接服务器,该函数的前两个参数是登录RabbitMQ的用户名和密码,默认都是guest;后两个参数是回调函数前一个用于连接成功后回调,后一个用于连接服务器出错时回调。示例中将这两个回调函数变量都先定义好了,即onConnect和onError。在连接成功后的回调函数中用subscribe()订阅消息,这个方法有两个必需参数,即目的地(destination)和回调函数(callback),还有一个可选参数headers。这里订阅的消息队列格式是/queue/stomp-queue,stomp-queue就是上面发布消息时指定的队列名称。
如果STOMP Nroker支持STOMP1.1版本,则会默认启用心跳检测功能,其中incoming表示接受频率,outgoing表示发送频率,改变incoming 和 outgoing 可以更改客户端的心跳频率(默认为10000s)。
启动RabbitMQ服务器,在sbin目录下执行如下命令:
./rabbitmq-server
如果看到“completed with 7 plugins”信息,则表示启动成功。接着执行StompProducer的main函数向RabbitMQ中创建交换器、发送消息,然后打开页面,最后可以多次执行StompProducer类,将在页面中看到消息提醒框。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。