赞
踩
官方安装指南:https://www.rabbitmq.com/install-rpm.html
我们将要安装的RabbitMQ的版本是3.8.2
https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.2-1.el7.noarch.rpm
不需要单独安装Erlang环境。
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
安装已下载的rpm包(可根据刚才自己选择的版本修改下面的版本号)
sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
若报错:rabbitmq Unregistered Authentication Agent for unix- process:6485:746263
执行以下命令:rpm -ivh --nodeps rabbitmq-server-3.8.13-1.el7.noarch.rpm
再重试
相关博客:https://blog.csdn.net/m0_67402914/article/details/123972575
将账号密码都设置为 admin:
设置用户分配操作权限:
进入虚拟主机,添加允许访问的用户。
访问 http://IP地址:15672 ,输入新增的用户
amqp:5673:用于客户端连接
clustering:25672:用于集群
http:15672:用于http协议 登录管理后台
exchanges:交换机,默认会有 7 个
admin:用户管理,可以用户进行 CURD 操作等。
可以在右侧进入虚拟主机的管理页面
Maven:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.17.0</version>
</dependency>
<!-- rabbitmq要求的内部用于记录日志的 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>2.0.7</version>
</dependency>
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("收到消息:" + msg);
}
});
channel.close();
connection.close();
不会接收到 消费端启动前的消息。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- 注意:当已存在相同交换机名且类型不一致时,会报出 IOException。描述:received 'direct' but current is 'fanout'
// 获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
public class EmitLog { private static String EXCHANGE_NAME = "direct-logs"; public static void main(String[] args) throws IOException, TimeoutException { // 创建工厂,建立连接,获取信道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.65.128"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String msg1 = "info: Hello World"; String msg2 = "warning: Hello World"; String msg3 = "error: Hello World"; channel.basicPublish(EXCHANGE_NAME, "info", null, msg1.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "warning", null, msg2.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "error", null, msg3.getBytes(StandardCharsets.UTF_8)); System.out.println("发送了消息"); channel.close(); connection.close(); } }
public class ReceiveLogs { private static String EXCHANGE_NAME = "direct-logs"; public static void main(String[] args) throws IOException, TimeoutException { // 创建工厂,建立连接,获取信道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.65.128"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取临时队列名 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息:" + new String(body, "UTF-8")); } }); } }
干完了手头上的工作,再分配得到第二个工作
channel.basicQos(int prefetchCount):设置此通道最希望处理的消息数量。
关闭自动确认:
在 basicConsume() 的 callback 函数里,当消息处理完毕时使用 basicAck() 函数来确认。
channel.basicAck(long deliveryTag, boolean multiple):确认一个或多个接收到的消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 接收消息并消费
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("收到消息:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
String queueName = channel.queueDeclare().getQueue();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。