赞
踩





[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
#其中-ivh中i代表install,v代表visual(视觉的) ,h代表进度,可用安装时,看到进度
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
#关闭防火墙
[root@localhost opt]# systemctl stop firewalld.service
#启动rabbitmq服务(要等待一会,因为启动需要时间)
[root@localhost opt]# systemctl start rabbitmq-server.service
#查看rabbitmq状态
[root@localhost opt]# systemctl status rabbitmq-server.service
#重启rabbitmq服务,因为当我们直接操作启动,并不会执行(没有redis的继承的说明介绍,因为没有等待的时间)
[root@localhost opt]# systemctl restart rabbitmq-server.service
#关闭rabbitmq服务
[root@localhost opt]# systemctl stop rabbitmq-server.service
[root@localhost opt]# ps -ef | grep rabbitmq
#-ef,显示所有的进程和命令行
#-aux,列出目前所有的正在内存当中的程序
#具体的可以去这个网址看介绍:https://blog.csdn.net/m0_38121874/article/details/120374176
[root@localhost opt]# rabbitmqctl add_user laosun 123456
[root@localhost opt]# rabbitmqctl set_user_tags laosun administrator
#administrator超级管理员,可查看所有的信息,操作的权限,设置none就不能登录成功了,其他都可以
#但不能不设置(否则会提示非管理用户,也就是说默认是none)
[root@localhost opt]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
#设置对虚拟主机/的是资源访问权限(后面三个.*,分别是资源的配置权限、写权限、读权限)
#他们是对信息的操作,当然,已经存在的还是会显示的,只是不能操作信息的写入,读取,和一些配置等等
#要可以访问,才能进行对应操作,即优先于角色,但与登录无关
[root@localhost opt]# rabbitmqctl list_users
[root@localhost opt]# rabbitmqctl change_password laosun 123123
#修改成了123123,即需要123123登录了,修改后,那么对应的密码若不是原来的(因为可以修改同一个)
#那么当你操作界面时,会提示密码错误,即不能操纵,这时就需要登出了(不会提示)


<dependencies> <dependency> <!--有对应的连接类,如连接工厂类ConnectionFactory--> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <!-- 操作日志的,不可能你写了日志配置文件,就会读取吧 这里就是用来读取的,基本上所有的log4j都会读取相同的配置文件 --> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <!--这里可以删除,有很多的工具类可以操作,只是这里并没有操作而已--> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> </dependencies>
# 输出方式 log4j.appender.stdout=org.apache.log4j.ConsoleAppender # 表示输出信息为out级别,即输出到控制台的信息(即位置) log4j.appender.stdout.Target=System.out # 表示输出格式 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout # 打印信息格式 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n # log4j.appender.file = 表示文件输出方式 log4j.appender.file=org.apache.log4j.FileAppender # log4j.appender.file.File = 表示文件输出位置 log4j.appender.file.File=rebbitmq.log # log4j.appender.file.layout = 表示输出格式 log4j.appender.file.layout=org.apache.log4j.PatternLayout # log4j.appender.file.layout.ConversionPattern = 表示打印格式 log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n # log4j.rootLogger = 表示根日志级别 log4j.rootLogger=debug, stdout,file


package util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * */ public class ConnectionUtil { public static Connection getConnection() throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //在工厂对象中设置MQ的连接信息(ip,端口,虚拟主机,账号,密码) connectionFactory.setHost("192.168.164.128"); //虽然与前面的图片不符合,但是这里认为改变了就行 connectionFactory.setPort(5672); //连接对应信息,就如操作sql界面类似 connectionFactory.setVirtualHost("/lagou"); //若没有,则会报错,默认是/(不设置默认是/) //对应的/lagou虽然看起来是目录,实际上可以说是虚拟主机的存放地址 connectionFactory.setUsername("laosun"); connectionFactory.setPassword("123123"); //我们的连接工厂已经操作完毕,接下来我们可以通过这个工厂得到对应的连接了(类似于连接池) Connection connection = connectionFactory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection connection = getConnection(); System.out.println(connection); //amqp://laosun@192.168.164.128:5672//lagou //连接的确成功 connection.close(); } }





package simplest; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //定义对应的消息信息 String msg = "Hello,RabbitMq"; //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道可以创建队列,使得队列可以找到这个信道,那么为什么要通过信道创建队列,而不是连接来创建呢 //这样的创建是这个依赖的操作的,主要是为了更好的代码 //因为信道的个数基本一定,而队列的个数基本不确定 //所以我们不可能先操作不确定的,再根据这些不确定的一个一个的连接创建的信道,这样太麻烦 //所以我们直接通过信道创建队列,这时可以顺便进行连接,要不然总不能先创建队列,然后再连接信道吧 //因为不在信道内部,不好操作他里面的内容,所以明面上基本都会有一段代码进行连接,即太麻烦,不好封装 //所以虽然都是创建队列连接,但通过信道创建队列,明面上的代码更少了 //即他这个依赖就主要操作信道创建队列了 /* 有五个参数(1,2,3,4,5): 参数1:队列的名称 参数2:队列的数据是否持久化 即当我们对应的服务器挂掉,对应的队列删除后,下次的服务器的重启会不会再次还原持久化的数据 参数3:是否排外,或者说是否支持扩展,即当前队列是否只能自己用,而不给别人用 可以说只有同一连接共享此队列,且连接断开时队列删除 但是连接基本上是分开的,即连接基本会不相同,所有这个通常设置false 参数4:是否自动删除,即至少经过一次连接,这是肯定的,否则这个也基本没有创建 这时当没有连接队列时,也就是连接数为0时,队列会销毁,不管队列是否还保存数据) 参数5:队列的参数(没有参数为null),可以设置很多内容,如过期时间(有默认),通常操作不持久化 */ //通过信道创建队列 channel.queueDeclare("queue1",false,false,false,null); //一般都默认为false或者null //向指定的队列发送消息 /* 有四个参数(1,2,3,4): 参数1:交换机名称,当前是简单模式,也就是P2P模式(点对点),没有交换机,所以名称为"" 参数2:需要发送的指定队列名称,也就是目标队列名称 参数3:设置消息的属性(没有属性就是null) 参数4:消息的内容(只接收字节数组) */ //对队列发送消息 channel.basicPublish("","queue1",null,msg.getBytes()); //""不能写null,否则报错 System.out.println("已发送"); //释放资源 channel.close(); //对应的图中,虽然只显示了队列,但是实际上都操作了信道 //而在更前面的图中,包含交换机的,实际上那个信道都是先操作的(整体),他们指向的其实是整体 //只是图中的箭头看起来是一路指向的,实际上我们可以看成三个部分的整体,中间看成两个部分整体即可 //所以实际上我们都是先操作(创建)信道的,所以这里我们先关闭信道 connection.close(); //关闭连接 } }
package simplest; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer { public static void main(String[] args) throws Exception { //java执行结束,一般都会释放掉对应的class内存 //我们的web项目一般都是持续执行的,只有关闭才会真正的释放内存 //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override //交付处理 //handleDelivery方法的参数: /* 参数1(consumerTag):收件人信息,或者说这个消费者的信息,实际上可以说是关于线程的信息 可能是前一个线程信息,进行的保留信息 每次的消费都会是一个新的结果,你可以打印试试看,当你关闭消费者,再次打开时,他就是不同的值 当然,若生产者再次生产,打印的还是同一个,因为是同一个消费者,好像是根据线程来判断的 同一个线程,那么这个值就是一样的 参数2(envelope):包裹上的快递标签,包括对应的信息 如所在的对应的队列名称,和该消息的唯一编号,比如某某快递站等等 参数3(properties):协议的配置,我们操作对应协议时有那些配置 比如对应来回的数据编码等等,可能编码操作不了中文,有可能) 参数4(body):消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的消息 String s = new String(body); //当有异常时,这里并不会出现异常信息打印,只是不会执行后面的操作 //也就是该方法不执行了,虽然会进行监听,但是生产者再次发送消息时,这个方法并不会执行 //因为该方法的执行完毕,也会导致下一次监听到的执行方法会执行(比如取反) //因为是下次监听到的消息方法,所以已经得到的监听方法还是会执行的 //而异常,那么这个导致也就不会出现了,当然,若中途删除队列,也会使得报错 //即也会使得下次监听到的方法执行不了(中途创建也是没用的,可能会有唯一的队列标志) //比如无论是之间结束还是异常结束,都会改变某个地方,使得继续监听(可能) //可能需要日志,因为异常信息是可以自定义的,这里就是 System.out.println(s); //Hello,RabbitMq //当然再次启动时,虽然有队列,但是队列却没有信息,即返回空的 //相当于没有打印,因为已经取出对应的信息了,所以对应的数据并不是直接赋值 //而是可以说是赋值后删除该数据 //可以理解为从集合取得一个数据赋值后时,删掉对应下标的集合数据 //实际上相当于直接就是给集合一个空的赋值(因为是全部取出来) } }; //监听队列(程序不会结束,一直监听,除非手动结束,或者程序结束),且被监听后,其他就不能再次监听了 //比如新创建类执行监听,并没有进行监听 /* 参数1:监听的队列名称,每当有这个队列出现信息时,就会操作信息处理 每次处理都会全部处理完毕(每个下标都取出来一个不放过,因为只有一个监听) 这时你可以再次进行发送,然后在消费方这里进行查看,发现,又打印信息了,即又消费掉了 参数2:是否自动消息确认,比如7天的自动确认收获,true得到消息就确认(在执行方法之前) false需要手动确认 参数3:信息处理的地方,我们总要有一个地方需要进行操作信息吧,即会自动执行handleDelivery方法 即将队列的信息,交给defaultConsumer操作,这里就是打印信息 */ channel.basicConsume("queue1",true,defaultConsumer); //执行这行代码,且在调用对应的方法之前,就会取出数据 //虽然会监听,但后面的代码还是会执行的,即并不会阻塞(因为不是同一个线程) } }


// false:手动消息确认
channel.basicConsume("queue1",false,defaultConsumer);

package simplest; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class RecerByACK { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override //交付处理 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的消息 String s = new String(body); System.out.println(s); //envelope.getDeliveryTag()对应的数据在队列中的编号,我们发送消息时有先后顺序的,从1开始 //当然这个队列的编号并不是创建的那个队列,而是对应队列信息的第几个 //对于一起来说,可以说是编号,若是一个一个来,那么就可以说是个数 //因为一个一个的来,对应的编号都是1,若编号不一致,那么就会报错 //所以我们需要envelope.getDeliveryTag()来当成参数 channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认 //参数1:收件人的信息,一般我们需要给出消息的唯一值,最好是单调递增的 //使得操作该消息,一般是编号 //参数2:是否同时确认多个消息,若为true,则一次性确认小于等于编号的对应消息 //false默认确认当前消息 //但是由于true这种情况基本很难出现,因为无论你设置true还是false //基本是一路确认过去的,即true这种情况就很难出现了(因为基本不会处理判断) //在没有判断的情况下,true和false基本一样,因为都只操作了本消息,若有判断的情况下 //即true可以通过判断了进行实现这种情况,如经过两次发送,进行确认一下(true的确认) //一次性确认当前以及小于自己的,如这里的两次发送(确认两次) //而false只能确认自己,即只确认了一次发送 //这种情况,在某些时刻,如可以高效的处理未确认的信息,而不用手动的一个一个编号的处理了 //但是false的性能要高一点,因为他们的确认本质上也是一样的,但true需要判断 //加起来代码变多,但某些时刻方便 //可以自己试验一下 //所以对应的false和true在底层的判断基本上都是false //所以你会感觉设置false和true是一样的结果 //应该在某些时候会有不同的作用,有时间你可以看看源码 //我们可以注意:这行代码是放在最后的,也就是说只有操作完毕,我们才会执行确认 //假设我们操作完毕了,但没有确认并不会影响真实情况,比如库存,因为库存的确减少的(操作了) //这时我们通常会手动的进行处理(但这是非常小的情况),直接改 //或者重新处理程序(一般是这个,从源头去解决) } }; //监听队列 channel.basicConsume("queue1",false,defaultConsumer); } }


package work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道创建队列 channel.queueDeclare("test_work_queue", false, false, false, null); for(int i =1 ;i<=100; i++) { String msg = "羊肉串 ==> "+ i; channel.basicPublish("", "test_work_queue", null, msg.getBytes()); System.out.println("新鲜出炉:" + msg); } //释放资源 channel.close(); connection.close(); } }
package work; import com.rabbitmq.client.*; import simplest.Sender; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer1 { private static int i = 1; //统计吃掉羊肉串的数量 public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道创建队列,queueDeclare方法操作时有双重作用 //若队列不存在,则创建并连接,存在,则获取并连接(或者说直接连接) channel.queueDeclare("test_work_queue", false, false, false, null); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override //交付处理 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的消息 String s = new String(body); System.out.println("【顾客1】吃掉" + s + "! 总共吃掉【" + i +"】串!"); i++; //实际上可以在输出中操作i++,但是不好观察,因为+叠一起了 //模拟网络延迟 try { Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }; //实际上匿名内部类相当于内部的类,继承了当前类,编译时单独出来一个class //有个无参构造方法,{}里写的就是对应的方法,没有其他 //其中{}可以访问当前所在方法(只能是当前方法)的局部变量 //因为访问时,就会默认该变量是final变量 //方法的内部类只能操作final变量,final变量不能操作,只能访问 //若其他地方(不是内部类)操作了该变量,那么就不会默认(优先本方法决定) //所以要操作,一般需要是静态的,反正共享同一个内存地方,所以私有的也可,即这里就使用了静态变量 //比如这里的匿名内部类只有一个方法handleDelivery方法,加上无参构造方法 //名称是:当前类名(如这里就是Recer1)$编号(从1开始,然后依次加1) //比如Recer1$1(第一个匿名内部类),Recer1$2等等 //上面是匿名内部类的具体构造 //监听队列 channel.basicConsume("test_work_queue",false,defaultConsumer); } }
package work; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer2 { private static int i = 1; //统计吃掉羊肉串的数量 public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //通过信道创建队列,queueDeclare方法操作时有双重作用 //若队列不存在,则创建并连接,存在,则获取并连接(或者说直接连接) channel.queueDeclare("test_work_queue", false, false, false, null); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override //交付处理 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的消息 String s = new String(body); System.out.println("【顾客2】吃掉" + s + "! 总共吃掉【" + i +"】串!"); i++; //实际上可以在输出中操作i++,但是不好观察,因为+叠一起了 //模拟网络延迟 try { Thread.sleep(900); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听队列 channel.basicConsume("test_work_queue",false,defaultConsumer); } }
/* 消费者1:看成[1] 消费者2:看成[2] 这里有一个地方: ============ ============ 他们中间是空的 当我们开启多个监听时,他们会先排队,然后队列会依次确定对应的监听,从而发送信息 其中先监听的排在前面,然后依次排队 假设消费者1先启动监听,消费者2后启动监听 ============ [1][2] ============ 他们会这样排列,由于数量是平均分发的(分发:每个消费者都发送一次),前面我们也说过,会全部取出来处理完毕 而正是因为平均分发,那么假设有100个消息 那么我们启动生产者时,创建队列并获取后或者直接获取队列后会确定监听(先启动的消费者,监听了对应队列的) 然后会将该100个消息进行有排列的给对应消费者,或者说给消费者发送消息时,因为已经确定了对应消费者了 由于是[1][2]排列的,那么第一个信息就是给[1],然后给[2],再次给[1],再然后给[2] 以此类推,当全部给完后,那么也就说明生产者发送的对应的队列信息全部发送完毕 实际上这些信息只是确定要发给谁,在队列里,还是一样的存在,只是有消费者标记而已 这时我们可以看到消费者的打印日志中就出现了消费者1,处理1,3,5...等等 消费者2,处理2,4,6...等等,可能会更少 而之所以是有顺序的,那是因为发送的消息到队列中是从头到尾的获取(每次) 所以无论是先发送,还是一般发送一遍获取,都是有顺序的 所以当消费者1先运行时,那么就是1开始,然后3,因为排队 所以说,每次的启动,都会有顺序的分发 最后要注意一下:我们的监听是在分发完毕后,才会进行执行方法,监听只是用来确认消费者的 这时,并不是监听只监听一个消息 而是方法只能由一个消息来执行,所以监听后的状态是可以知道的,如设置了false,那么发送10个消息 那么对应就有10个未处理的消息,在没有操作确认的情况,可以在页面看到,极少数在操作了确认的情况会在未确定的地方出现(页面),因为确定也是需要时间的,但基本是不会的 */


// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的(通知多的)送件就多
//在程序上可以这样理解:我们前面说过,队列的获取会进行监听,当我们写下下面的程序
//那么原来的平均分发变成了监听自动获取,意思是:
//他虽然会先进行分发,然后执行,但这个分发只是确认他平均时要访问的顺序
//实际上由监听者,即消费者,自己去抢消息,不管这个顺序或者说消费者标记,每个消息相当于是单线程的
//因为队列在rabbitmq中是单线程的
//当有消费者获取后,其他消费者不能获取,只能是下一个了
channel.basicQos(1);





package ps; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //创建路由(路由名,路由类型,有4种类型) /* fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上) */ channel.exchangeDeclare("test_exchange_fanout","fanout"); String msg = "hello,哈哈哈"; channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes()); //""不能写null,否则报错 System.out.println("生产者:" + msg); //释放资源 channel.close(); connection.close(); } }
package ps; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer1 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_fanout_queue_1", false, false, false, null); //绑定路由(关注),参数列表(队列名,路由名,一般设置为"") channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout",""); //""不能写null,否则报错 //实际上这个绑定也可以在生产者那里操作,虽然我们前面说过,连接交给生产者 //但是这里是一对多,队列需要确定消费者 //然后绑定,所以生产者只需要创建路由即可(因为消费者会执行多次,即不能一直创建路由或者执行创建路由的代码) //而正是因为会创建多次,所以创建队列有两个意思 //所以这样说,绑定的操作一般要消费者进行 //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " +s); } }; channel.basicConsume("test_exchange_fanout_queue_1",true,defaultConsumer); } }
package ps; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer2 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_fanout_queue_2", false, false, false, null); //绑定路由(关注),参数列表(队列名,路由名,一般设置为"") channel.queueBind("test_exchange_fanout_queue_2","test_exchange_fanout",""); //实际上这个绑定也可以在生产者那里操作,虽然我们前面说过,连接交给生产者 //但是这里是一对多,队列需要确定消费者 //然后绑定,所以生产者只需要创建路由即可(因为消费者会执行多次,即不能一直创建路由或者执行创建路由的代码) //而正是因为会创建多次,所以创建队列有两个意思 //所以这样说,绑定的操作一般要消费者进行 //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " +s); } }; channel.basicConsume("test_exchange_fanout_queue_2",true,defaultConsumer); } }


package direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //创建路由(路由名,路由类型,有4种类型) /* direct:根据路由键,进行定向的分发消息,处理了路由键 */ channel.exchangeDeclare("test_exchange_direct","direct"); String msg = "用户注册,【userid=S101】"; //当有路由时,那么第二个参数就是路由键,而不是队列名称,用来确认对应键的队列 //即找到设置了该路由键的队列,然后他发送信息给该队列,而不是全部发送了 channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes()); System.out.println("[用户系统]:" + msg); //释放资源 channel.close(); connection.close(); } }
package direct; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer1 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_direct_queue_1", false, false, false, null); //绑定路由(关注) //参数列表(队列名,路由名,一般设置为"",这个参数使得该队列只接收该参数路由键名称的消息) //当然,若第三个参数是"",那么他也只能得到""的路由键信息 //所有说发送消息的路由键会判断绑定队列的路由键是否相等,然后决定发送消息 //若相等,那么就会将该信息,发送给绑定的队列里面去 //这样就实现了指定队列的发送 channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert"); //channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert"); 覆盖上一个 //如果是两个一样的,那么就是覆盖 channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update"); channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete"); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " +s); } }; channel.basicConsume("test_exchange_direct_queue_1",true,defaultConsumer); } }
package direct; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer2 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_direct_queue_2", false, false, false, null); //绑定路由(关注) //参数列表(队列名,路由名,一般设置为"",这个参数使得该队列只接收该参数路由键名称的消息) //当然,若第三个参数是"",那么他也只能得到""的路由键信息 //所有说发送消息的路由键会判断绑定队列的路由键是否相等,然后决定发送消息 //若相等,那么就会将该信息,发送给绑定的队列里面去 //这样就实现了指定队列的发送 channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select"); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " +s); } }; channel.basicConsume("test_exchange_direct_queue_2",true,defaultConsumer); } }

# Q1绑定了路由键*.orange.*
# Q2绑定了路由键*.*.rabbit和lazy.#
# 下面生产者的消息会被发送给哪个队列:
quick.orange.rabbit # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox # Q1
lazy.brown.fox # Q2
lazy.pink.rabbit # Q2
quick.brown.fox # 无
orange # 无
quick.orange.male.rabbit # 无

package topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //创建路由(路由名,路由类型,有4种类型) /* topic:根据路由键,进行定向的分发消息,处理了路由键,只是他用来进行模糊的比较(或者说模糊匹配) */ channel.exchangeDeclare("test_exchange_topic","topic"); String msg = "用户注册,【userid=S102】"; //当有路由时,那么第二个参数就是路由键,而不是队列名称,用来确认对应键的队列 //即找到设置了该路由键的队列,然后他发送信息给该队列,而不是全部发送了 channel.basicPublish("test_exchange_topic", "user.register", null, msg.getBytes()); System.out.println("[用户系统]:" + msg); //释放资源 channel.close(); connection.close(); } }
package topic; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer1 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_topic_queue_1", false, false, false, null); //绑定路由 channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#"); //其中对应的模糊查询符号,需要在.后面才会起作用,可能吧,你可以自己测试一下 //注意:这个符号只操作这里,即其他地方的符号并不会起作用,所有你在发送中操作这个是无效的 //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " +s); } }; channel.basicConsume("test_exchange_topic_queue_1",true,defaultConsumer); } }
package topic; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer2 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.queueDeclare("test_exchange_topic_queue_2", false, false, false, null); //绑定路由 channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#"); channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#"); //其中对应的模糊查询符号,需要在.后面才会起作用,可能吧,你可以自己测试一下 //注意:这个符号只操作这里,即其他地方的符号并不会起作用,所有你在发送中操作这个是无效的 //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " +s); } }; channel.basicConsume("test_exchange_topic_queue_2",true,defaultConsumer); } }
package topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //创建路由(路由名,路由类型,有4种类型) /* topic:根据路由键,进行定向的分发消息,处理了路由键,只是他用来进行模糊的比较 true代表持久化,但是要注意:同样的那么也可以说是覆盖,或者说是获取路由,不同的不会 即对应的参数不是true时,那么会造成冲突,即会报错,不会覆盖 默认不写第三个参数的话,就相当于第三个参数写上false */ channel.exchangeDeclare("test_exchange_topic","topic",true); String msg = "用户注册,【userid=S102】"; //当有路由时,那么第二个参数就是路由键,而不是队列名称,用来确认对应键的队列 //即找到设置了该路由键的队列,然后他发送信息给该队列,而不是全部发送了 //第三个参数:消息的配置,即设置消息的属性(没有属性就是null) //若设置了MessageProperties.PERSISTENT_TEXT_PLAIN属性,那么对应的消息就是持久化的 channel.basicPublish("test_exchange_topic", "user.register", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); System.out.println("[用户系统]:" + msg); //释放资源 channel.close(); connection.close(); } }
package topic; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer1 { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //参数2:队列的数据是否持久化 //自然对应不同的配置也是不能覆盖的,即true后,再次false启动会报错,反之也是如此 channel.queueDeclare("test_exchange_topic_queue_1", true, false, false, null); //再次创建该队列,若是一样的配置,那么就是得到连接(相当于重新赋值连接),但是不同的配置就会报错了 //绑定路由 channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#"); //其中对应的模糊查询符号,需要在.后面才会起作用,可能吧 //注意:这个符号只操作这里,即其他地方的符号并不会起作用,所有你在发送中操作这个是无效的 //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " +s); } }; channel.basicConsume("test_exchange_topic_queue_1",true,defaultConsumer); } }
#重启rabbitmq服务
systemctl restart rabbitmq-server.service

<dependency> <!--有对应的类,如json转换类,ObjectMapper类(序列化和反序列化,以及json操作的),命名空间等 等着被识别,因为整合的,也包括了前面的rabbitmq的依赖,即有对应的连接类,如连接工厂类ConnectionFactory--> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <!--操作日志的,当然需要对应的日志文件配置(加不加无所谓)--> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <!--这里可以删除,有很多的工具类可以操作,只是这里并没有操作而已--> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.164.128" port="5672" username="laosun" password="123123" virtual-host="/lagou"/> <!--配置(创建)队列,队列名是test_spring_queue_1--> <rabbit:queue name="test_spring_queue_1"/> <!-- 配置rabbitAdmin:主要是用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等操作 根据连接进行操作形成的(通过工厂可以得到连接) --> <rabbit:admin connection-factory="connectionFactory"></rabbit:admin> <!--不需要的话,可以删掉--> <!--配置(创建)交换机,topic类型,交换机名称是spring_topic_exchange--> <rabbit:topic-exchange name="spring_topic_exchange"> <!--rabbit:topic-exchange标签可以使用durable属性,来创建是否持久化的交换机,默认为true,即默认持久 若不需要持久的,可以设置成false--> <rabbit:bindings> <!--绑定队列,自然可以绑定多个,即这个标签可以是多个--> <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding> <!-- 进行队列绑定,绑定在该交换机上,相当于 channel.queueBind("test_spring_queue_1","spring_topic_exchange","msg.#"); 我们发现,我们其实可以手动的先绑定,后操作 --> </rabbit:bindings> </rabbit:topic-exchange> <!--配置json转换的工具--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 配置rabbitmq模板,通过这个模板,可以向对应的绑定的这个交换机队列发送消息 相当于 channel.basicPublish("spring_topic_exchange", "(对应路由键)", null(对应配置,如持久化), msg.getBytes());,这样的发送消息,其中消息msg.getBytes() 可以支持json的变换,因为jsonMessageConverter这个json转换的工具 而这个模板,直接封装了上面的操作(对应方法),连接工厂肯定是需要的 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter"></rabbit:template> </beans>
package test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * */ public class Sender { public static void main(String[] args) { //创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); //从容器获取rabbit模板对象,来发送消息 RabbitTemplate bean = context.getBean(RabbitTemplate.class); //发消息 Map<String,String> map = new HashMap<>(); map.put("name","叶凡"); map.put("email","6666666666@qq.com"); //参数1,路由键,参数2,消息 bean.convertAndSend("msg.user",map); //转换并发送 context.close(); } }

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <rabbit:connection-factory id="connectionFactory" host="192.168.164.128" port="5672" username="laosun" password="123123" virtual- host="/lagou"/> <rabbit:queue name="test_spring_queue_1"/> <!--创建队列在生产者中创建了,但这是消费者,所以一般写上 虽然可以不写,但实际上创建队列通常都是消费者写的 但由于生产者是进行初始化的一方,操作了绑定 所以生产者这个必须要有(在程序上可以没有,但是配置文件上有约束,那么必须有,否则报错) --> <rabbit:admin connection-factory="connectionFactory"></rabbit:admin> <!--没有使用的话,可以删掉--> <!-- 我们发现,与交换机的配置,并没有写,就如前面说的一样,消费者是多的一方 所以交换机基本只需要创建一次,交给生产者来创建,而由于交换机也顺便绑定了,所以导致绑定也在生产这里面 而绑定又要队列,所以导致队列和绑定都在生产者里面 虽然生产者初始化了最基本的队列,消费者也是,但是后续的扩展,若没有配置文件的重新读取的话 那么基本是需要rabbit:admin来创建队列或者绑定的 生产者也可以来删除队列或者交换机,消费者虽然也可,但交换机基本只能在生产者里操作,所有生产者也要有这个配置 --> <!-- 在这里我们发现,还有一个没有配置,那么就是监听的配置,所有接下来我们来操作监听的配置 实际上就是对应的监听类,这里进行了扫描 --> <context:component-scan base-package="listener"></context:component-scan> <!--配置监听,自然需要指定连接的,因为需要指定地方--> <rabbit:listener-container connection-factory="connectionFactory"> <!--对应注解的创建bean,是操作对应的类的首字母小写得到 这里就是将监听的队列的执行地方,放在指定的类中操作--> <rabbit:listener ref="cousumerListener" queue-names="test_spring_queue_1"></rabbit:listener> </rabbit:listener-container> </beans>
package listener; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * */ @Component //只有实现了这个MessageListener接口(有个类也可以) //才会将监听的信息给这个类的onMessage方法,否则相当于没有类操作监听的结果 //也就没有什么打印信息了 public class CousumerListener implements MessageListener { //jackson提供序列化和反序列化中使用最多的类,也是用来转换json的 private static final ObjectMapper mapper = new ObjectMapper(); @Override public void onMessage(Message message) { //这个message,保存了我们传递的消息,由于有对应的json工具类,所有会解析map集合后的数据存放在这里面 try { //getBody()就是对应得到消息的方法,将对应处理后的map集合数据(处理过的,好像是byte数组)变成了json对象 JsonNode jsonNode = mapper.readTree(message.getBody()); String name = jsonNode.get("name").asText(); //通过json对象,得到对应的键的值 String email = jsonNode.get("email").asText(); //通过json对象,得到对应的键的值 System.out.println(name); System.out.println(email); } catch (Exception e) { e.printStackTrace(); } } }
package test; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * */ public class TestRunner { public static void main(String[] args) { //获得容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml"); //由于是监听的,那么对应的就会一直等待,所以执行后,不会结束 } }
/* 比如说 对应的map集合没有处理之前: Body:'{name=萧炎, email=6666666666@qq.com}' json转换的工具处理之后: Body:'{"name":"萧炎","email":"6666666666@qq.com"}' //我们发现,对应的处理后的数据,的确是可以被解析成json的,而不处理的基本会报错 //注意:对应工具,基本只会对map集合处理,其他的可能也会处理,但主要用来处理map集合 //当然,由于他们是byte数组,所以,若不转发为json,也是可以的,比如这样: /* String str = new String(message.getBody()); System.out.println(str); 通过String类,使得byte数组变成字符串,全部打印 比如打印Body:'{"name":"萧炎","email":"6666666666@qq.com"}'这个整体 若不是map集合,那么自然打印的就是对应的字符串了,可以说是万能的,但是json转换的工具可以更好的解决map集合的数据 */ //最后:我们要注意一点:消费者运行时,监听也是需要对应队列的 //虽然不会立即报出错误(等待一下,便可以出现错误,因为程序运行也是需要时间的)

package transaction; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import util.ConnectionUtil; /** * */ public class Sender { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); channel.exchangeDeclare("test_transaction","topic"); channel.txSelect(); //开启事务,开启事务后,对应交换机的发送信息必须等待提交后才会发送,否则不发生(送) try { channel.basicPublish("test_transaction", "product.register", null, "商品1降 价".getBytes()); System.out.println(1 / 0); //前面我们说过,解决了消费者的确认消息问题,那么如何保证生产者确认发送了呢 //假设有一键订单确认,但是却由于中间出现了问题,导致有些订单没有发送 //使得好不容易找到的商品又要去重新找了,这是对客户是不友好的 //所以我们需要一个需要可以保证不出错 //若出错就会回滚的操作(即一键订单的不发生,一般会返回提示信息) channel.basicPublish("test_transaction", "product.register", null, "商品2降 价".getBytes()); channel.txCommit(); //提交事务(一起成功),需要先开启事务,否则报错 //与mysql不同,mysql在没有开启事务之前,提交事务的操作相当于没有执行 System.out.println("生产者:消息已发送"); }catch (Exception e){ e.printStackTrace(); channel.txRollback(); //事务回滚(一起失败),也要先开启事务,否则报错(mysql的回滚相当于没有执行) //对与java程序来说,是需要一环套一环的,即必须先开启事务,否则报错 //而mysql没有这么严谨,无论是先提交还是先回滚,他们都相当于没有操作(没有严谨的程序来产生错误) //或者说,有判断停止该程序执行 }finally { //释放资源 channel.close(); connection.close(); } } }
package transaction; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * */ public class Recer { public static void main(String[] args) throws Exception { //获得连接 Connection connection = ConnectionUtil.getConnection(); //通过连接可以创建通道(信道) Channel channel = connection.createChannel(); //参数2:队列的数据是否持久化,自然对应不同的配置也是不能覆盖的 //即true后,再次false启动会报错,反之也是如此,你测试的时候,最好不要持久化 channel.queueDeclare("test_transaction_queue_1", true, false, false, null); //绑定路由 channel.queueBind("test_transaction_queue_1","test_transaction","product.#"); //通过信道得到队列的消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " +s); } }; channel.basicConsume("test_transaction_queue_1",true,defaultConsumer); } }

<!--配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.164.128" port="5672" username="laosun" password="123123" virtual-host="/lagou" publisher-confirms="true"/> <!--启动生产者确认机制: publisher-confirms="true",需要启动 否则不使用对应处理类(实际上只是不使用对应的方法而已)--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" confirm-callback="messageConfirm"/> <!--添加确认回调处理类:confirm-callback="messageConfirm"--> <!--确认机制的处理类--> <bean id="messageConfirm" class="confirm.MessageConfirm"></bean> <!--无论消息是否发送成功,这个处理类都会进行处理-->
package confirm; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; /** * */ //必须要实现RabbitTemplate.ConfirmCallback接口,否则不会当成处理类来操作,即相当于没有配置处理类 //这个处理类正好是配置在发送消息那里,即配置好后,对应的异常信息就不会直接打印了 //而是将信息交给该类来处理,然后将结果给对应的参数,如下面的s参数,因为配置好后 //会有判断,若有对应处理类,那么将异常信息给该类操作(或者说就是该类处理,也可以说是赋值异常信息) public class MessageConfirm implements RabbitTemplate.ConfirmCallback { //一般他是异步的,所以容易导致顺序的问题出现,异步好像可以修改成同步,具体操作可以百度,实际上操作某些条件实现同步也行 @Override //参数1:消息相关的数据对象,封装了消息的唯一id,类似于前面我们手动确认时,对应的消息编号 //参数2:代表消息是否发送成功的结果,即true,代表发送成功,为false,代表发送失败 //参数3:异常信息,如果发送失败,即有错误导致,一般是异常信息,否则则是空串(一般是null,而不是"") public void confirm(CorrelationData correlationData, boolean b, String s) { //每一次的发送消息,无论是否成功,都会执行这个方法 if(b){ System.out.println("消息确认成功"); }else{ System.out.println("消息确认失败"); System.out.println(correlationData); //失败时,这个返回异常信息,成功时,则是null,如果失败时也是null,那么可能需要某些条件才会有信息,虽然基本不会出现,如果出现,可以百度,或者自行寻找问题,但一般情况下不会出现这种情况 System.out.println(s); // 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发 // 采用递归(固定次数,不可无限),如可以保存对应的那一条信息参数,并改变信息,然后发送 // 也可以多次执行自己的那一个发送程序等等,即递归的操作 // 也可以使用redis+定时任务,将对应的信息放到redis里面,然后取出来,在根据信息进行改动,然后发送 // 当然是有时间限制的,即定时 // 实际上这里的定时是有方便操作的,如(jdk的timer,或者定时任务框架Quartz等等),这些可以百度查看 //由此看来,的确是可以解决对应的发送是否成功的操作问题(保证发送的操作) } } }
# 输出方式 log4j.appender.stdout=org.apache.log4j.ConsoleAppender # 表示输出信息为out级别,即输出到控制台的信息(即位置) log4j.appender.stdout.Target=System.out # 表示输出格式 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout # 打印信息格式 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n # log4j.appender.file = 表示文件输出方式 log4j.appender.file=org.apache.log4j.FileAppender # log4j.appender.file.File = 表示文件输出位置 log4j.appender.file.File=rebbitmq.log # log4j.appender.file.layout = 表示输出格式 log4j.appender.file.layout=org.apache.log4j.PatternLayout # log4j.appender.file.layout.ConversionPattern = 表示打印格式 log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n # log4j.rootLogger = 表示根日志级别 log4j.rootLogger=debug, stdout,file
package test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * */ public class Sender { public static void main(String[] args) { //创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); //从容器获取rabbit模板对象,来发送消息 RabbitTemplate bean = context.getBean(RabbitTemplate.class); //发消息 Map<String,String> map = new HashMap<>(); map.put("name","萧炎"); map.put("email","6666666666@qq.com"); //参数2,路由键,参数3,消息 bean.convertAndSend("lalala","msg.user",map); //转换并发送 //新加上一个参数,原来默认是配置文件的路由(交换机),现在发送指定的路由,进行测试 context.close(); } }
package test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * */ public class Sender { public static void main(String[] args) { //创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); //从容器获取rabbit模板对象,来发送消息 RabbitTemplate bean = context.getBean(RabbitTemplate.class); //发消息 Map<String,String> map = new HashMap<>(); map.put("name","萧炎"); map.put("email","6666666666@qq.com"); for(int i = 0;i<10;i++) { bean.convertAndSend("msg.user", map); System.out.println("发送成功"); } context.close(); } }

<!--配置监听,自然需要指定连接的,因为需要指定地方--> <!-- prefetch="3" 一次性消费的消息数量 会告诉 RabbitMQ 不要同时给一个消费者推送多于3个消息(没有确认的消息) 一旦有 3 个消息还没有ack(确认),则该 consumer 将阻塞,即监听的方法不执行,且等待打上未确认标签 直到消息被ack 因为没有确认的消息,相当于还没有处理完毕的消息(虽然得到了,但并没有删除该消息) 所以说,这里只会允许你存在3个及其以下的消息没有被确认,当然,确认了的,自然不会算作 或者可以说,他设置的就是未确认消息的最多是多少,而由于这个的存在,使得我们一次性的多个消息中 由原来的全部未确认(设置的上限,且设置了手动确认),只显示三个 实际上还有很多个等待被打上未确认标签,再执行方法 --> <!-- acknowledge="manual" 手动确认,原来是默认确认的,这下我们需要手动确认了--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"> <!--对应注解的创建bean,是操作对应的类的首字母小写得到 这里就是将监听的队列的执行地方,放在指定的类中操作--> <rabbit:listener ref="cousumerListener" queue-names="test_spring_queue_1"></rabbit:listener> </rabbit:listener-container>
package listener; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * */ @Component //将原来的实现接口MessageListener变成了继承AbstractAdaptableMessageListener类 //在保持可以监听的情况下(即原来实现了MessageListener接口的作用下),可以再次进行其他操作 //这是因为原来的接口操作MessageListener只能操作信息(虽然与匿名内部类的那个一样是操作消息或信息的) //但是他们却不能重新的操作原来的发送,创建队列,交换机等操作,只是得到信息 //而使用这个AbstractAdaptableMessageListener类,会给你管道(我们前面创建的对应的管道) //所以我们可以在操作消息时,还可以再进一步操作,提高的扩展性,如确认消息 public class CousumerListener extends AbstractAdaptableMessageListener { //jackson提供序列化和反序列化中使用最多的类,也是用来转换json的 private static final ObjectMapper mapper = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { //这个message,保存了我们传递的消息,由于有对应的json工具类,所有会解析map集合后的数据存放在这里面 try { System.out.println(message); //message里包含了数据的编号(对于队列来说是唯一的),但也由于分发,所以基本上对于自己就是唯一的 //getBody()就是对应得到消息的方法,将对应处理后的map集合数据(处理过的,好像是byte数组)变成了json对象 JsonNode jsonNode = mapper.readTree(message.getBody()); String name = jsonNode.get("name").asText(); //通过json对象,得到对应的键的值 String email = jsonNode.get("email").asText(); //通过json对象,得到对应的键的值 System.out.println(name); System.out.println(email); //手动确认消息 /* 参数1:RabbitMQ向该channel投递的这条消息的唯一标识ID,此ID一般是一个单调递增的正整数 参数2:是否同时确认多个消息,前面已经说过了,这里也可以这样的解释 为了减少网络的流量,手动确认可以被批量处理, */ long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,true); Thread.sleep(3000); System.out.println("休息3秒后,在继续接收消息"); } catch (Exception e) { e.printStackTrace(); } } }

<!--auto-declare="true"当队列不存在时则自动创建,虽然这里并没有体现出这个配置的作用
但是若你启动后,在中途删除这个队列,那么他在使用这个队列时,就会自动创建,而不会报错了-->
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="5000" value-type="long"></entry>
<!--long类型的5000毫秒(代表5秒),key中基本是固定的,代表过期时间(ttl)到了删除(x)消息(message)-->
</rabbit:queue-arguments>
</rabbit:queue>



<rabbit:queue name="test_spring_queue_tt2">
package test; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * */ public class Sender2 { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); RabbitTemplate bean = context.getBean(RabbitTemplate.class); //发消息 Map<String,String> map = new HashMap<>(); //创建消息配置对象 //之前有个使用持久化的那个属性是import com.rabbitmq.client包下的MessageProperties类 //这里是import org.springframework.amqp.core包下的MessageProperties类,是不同的类 //这个类可以操作过期时间,另外一个不可以 MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("3000"); //设置过期时间3秒 //创建消息,第二个参数是上面的import org.springframework.amqp.core包下的MessageProperties类型 //即他们是一起的,可以使用ctrl+左键进入看导入的包,就知道了 Message message = new Message("测试过期时间".getBytes(),messageProperties); //使用了这个配置,即过期配置 //使用的是import org.springframework.amqp.core.Message;包的类、 bean.convertAndSend("msg.user", message); //对应基本不会被json转换工具(也可以说是json工具类)处理 System.out.println("发送成功"); context.close(); } }


<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.164.128" port="5672" username="laosun" password="123123" virtual-host="/lagou"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="my_exchange"/> <!--接下来我们操作死信的配置--> <!--创建死信队列--> <rabbit:queue name="dlx_queue"/> <!--创建死信交换机,根据路由键来确定的交换机,即定向交换机,路由模式--> <rabbit:direct-exchange name="dlx_exchange"> <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="dlx_queue"/> <rabbit:binding key="dlx_max" queue="dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--创建测试队列--> <!--过期的测试队列--> <rabbit:queue name="test_ttl_queue"> <rabbit:queue-arguments> <!--设置队列的过期时间--> <!--当key属性是x-message-ttl时,对应的value就是过期的时间(毫秒),这里就是6秒--> <!--value-type就是数据类型,这里就是long类型--> <entry key="x-message-ttl" value="6000" value-type="long"/> <!--消息如果超时,将超时的信息投递给死信交换机--> <!--当key属性是x-dead-letter-exchange时,对应的value就是超时的信息要发送的交换机名称--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--超出长度的消息队列--> <rabbit:queue name="test_max_queue"> <rabbit:queue-arguments> <!--设置队列的最大长度--> <!--当key属性是x-max-length时,对应的value就是队列最大长度,这里就是2--> <!--当然对应的类型也是long--> <entry key="x-max-length" value="2" value-type="long"/> <!--消息如果超出长度,这里就是2,将超过长度的信息投递给死信交换机--> <!--当key属性是x-dead-letter-exchange时,对应的value就是超时的信息要发送的交换机名称--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--即key属性是x-dead-letter-exchange时,该配置操作的即是超出队列配置范围的指向交换机的配置--> <!--他不只是进行确认位置,实际上也顺便充当发送消息的角色--> <!--所以我们可以知道,我们只需要发送一次消息,对应的队列中消息就在移动了--> <!--创建路由模式的测试交换机,即定向的测试交换机--> <rabbit:direct-exchange name="my_exchange"> <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/> <rabbit:binding key="dlx_max" queue="test_max_queue"/> </rabbit:bindings> </rabbit:direct-exchange> </beans>
package test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * */ public class SenderDLX { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml"); RabbitTemplate bean = context.getBean(RabbitTemplate.class); //bean.convertAndSend("dlx_ttl", "测试超时".getBytes()); bean.convertAndSend("dlx_max", "测试长度1".getBytes()); bean.convertAndSend("dlx_max", "测试长度2".getBytes()); bean.convertAndSend("dlx_max", "测试长度3".getBytes()); context.close(); } }





<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="cousumerListener" queue-names="dlx_queue"></rabbit:listener>
<!--监听的队列改变了-->
</rabbit:listener-container>

127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6
#上面的基本只能重启才会生效,使得出现[root@A ~]#
#但是要先设置下面的A才可,因为不能是单纯的A,A要是具体地址
#但是实际上却不是上面这样说的,而是如下:具体实现却是下面的设置别名
#而不是上面的引导,所以只需要下面即可(即上面的A可以不写),而下面才是真的需要使得hosts文件生效的主要地方
192.168.164.128 A
192.168.164.129 B
#上面两个实际上需要使得hosts文件生效,才会生效
#当然,他们之间的空格基本是可以随便加的,但为了美观,通常只会加一个空格
127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.164.128 A
192.168.164.129 B
reboot #重新服务器,有些作用基本只有重启才可以出现效果,如对应的[root@A(B) ~]#的改变
#我们在对应服务器里,进入如下目录
cd /var/lib/rabbitmq/
#查看隐藏文件,使用 ls -all 显示,-al也可
#发现了.erlang.cookie文件
#使用cat命令查看各个服务器的内容
cat .erlang.cookie
#发现,各个服务的这个内容,基本是不一样的
#可能是根据时间,或者ip,或者唯一的mac,或者对应唯一文件造成的本机不同标志,反正基本不会相同
[root@A opt]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.164.129:/var/lib/rabbitmq
#scp:s代表服务器,cp拷贝,服务器的拷贝,拷贝过去时,一般是会提示覆盖对应相同文件的(可能有些命令不会)
#如scp(可能会跳过,可能),一路yes或y即可
#注意:对应的ip要正确,否则操作不了,报错(找不到对应主机)
[root@A ~]# systemctl stop firewalld #防火墙关闭
[root@A ~]# systemctl start rabbitmq-server #启动rabbitmq服务
[root@B ~]# rabbitmqctl stop_app #关闭应用 #可以说是关闭界面或者对应的具体节点,也可以说是除进程外 #其他的服务或者内容都基本关闭或者删除(因为他们基本都是操作rabbitmqctl命令的,即该命令相关的不可用了) #那么其他的操作基本不能进行了,如添加用户等等,需要启动后才可操作 #这个节点并不是说服务器,而是该服务的内容节点(几乎所有内容) #数据也会删除,当然持久化的下次还会有 #所以服务还是启动的,只是对应内容或者其他的操作没有了 [root@B ~]# rabbitmqctl join_cluster rabbit@A #需要先关闭节点才可进行操作,相当于开启双方通道 #否则这个操作执行不了(这是规定的) #将当前节点,加入到rabbit@A节点里(A前面有说明,实际上就是对应ip的简写) #对应的A的对应节点应用需要存在,即不能关闭,否则加入不了 #加入后可以在A服务器的界面里找到Overview里面往下滑,找到Nodes,点击(一般都会自动打开的) #可以看到多出来了rabbit@B,相对应的在B服务器里面也多出来了rabbit@A,即他们之间互相可以访问 #反过来加也可以(因为都是同步),但是当有相同的时候,主要看加入对方,如用户 #当加上后,会同步对应的服务器的消息,队列,交换机,用户等等,有些并不是真正的同步,后面会说明(如用户,被隐藏) #对应的队列会显示服务器来源,可能会与自身相同用户发生冲突,使得不会显示来源 #比如我们先登录进去,然后加入后,刷新界面,就不会显示来源,若出现这种情况 #我们可以退出登录重新进入,或者再次刷新,即可解决 #若不是持久的,那么对应的服务器停止后,同步的自然也是删除的,即也会导致同步的删除 #若是持久的,那么对应的服务器停止后,同步的并不会删除对应的消息,队列,交换机,只是不能使用了而已 [root@B ~]# rabbitmqctl start_app #启动应用(界面恢复,不是持久的信息被删除) #和上述关闭命令配合使用,达到清空队列的目的(持久化的还是会在) #若需要删除加入的节点,可以执行如下命令: [root@A ~]# rabbitmqctl forget_cluster_node rabbit@B #或者 [root@B ~]# rabbitmqctl forget_cluster_node rabbit@A #注意:这虽然删除了节点服务器(相当于不要他的同步),离开了集群,但是只是当前的信息删除而已 #对方的连接信息没有删除,所以要注意 #且不能是删除自身,否则报错,且需要先将对应的服务器的节点关闭才可 #因为是互相的,那么在对应的服务器里都可以进行删除节点,当删除其中一个时 #相对应的其他有关系的服务器也没有该服务器的连接了,但是由于被删除对象还存在与操作的对象的信息 #所以他启动时会自动的连接对应服务器,但是已经没有加入了(被删除了连接) #所以启动不了(因为不能连接了,不是互相的了,相当于关闭了他的通道) #为了解决这样的情况,主要是将对应的信息删除,比如: [root@B rabbitmq]# ls mnesia #这个文件基本包含了rabbitmq的对应连接信息,用户名信息等等 #当然的,持久的队列,消息,交换机不会删除,不持久的就会) #删除后,虽然我们可以启动了,但是对应的用户名等等需要再次进行创建,这一点我们要注意 #所以我们需要删除局部的连接信息,而不是删除全部 #实际上只要删除mnesia/rabbit@B(我这里是B)/目录下面的如下三个文件 #执行命令:rm cluster_nodes.config nodes_running_at_shutdown schema.DAT #那么就可以启动了,但是用户名也是需要创建,等待一下,然后可以看到自己的持久队列了 #最后注意:不持久的,一般是绑定服务器的启动和关闭,所以每次的关闭,不持久的就会删除,如rabbitmqctl stop_app
[root@B ~]# rabbitmqctl cluster_status
[root@A ~]# rabbitmqctl add_user laosun 123123
[root@A ~]# rabbitmqctl set_user_tags laosun administrator
[root@A ~]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"

[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
#随便一个服务器里使用即可,因为作用与集群



[root@localhost opt]# tar -zxvf haproxy-1.8.12.tar.gz
[root@localhost opt]# uname -r
3.10.0-514.6.2.el7.x86_64 #你的可能会有所不同

[root@localhost opt]# cd haproxy-1.8.12
[root@localhost haproxy-1.8.12]# make TARGET=linux2628 PREFIX=/usr/local/haproxy
#使用对应的版本(TARGET),指定操作位置(PREFIX)
[root@localhost haproxy-1.8.12]# make install PREFIX=/usr/local/haproxy
#指定安装位置(PREFIX)
[root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v
[root@localhost haproxy-1.8.12]# cp /usr/local/haproxy/sbin/haproxy /usr/sbin/
[root@localhost haproxy-1.8.12]# cp ./examples/haproxy.init /etc/init.d/haproxy #在解压的那个文件下
[root@localhost haproxy-1.8.12]# chmod 755 /etc/init.d/haproxy
[root@localhost haproxy-1.8.12]# useradd -r haproxy
[root@localhost haproxy-1.8.12]# mkdir /etc/haproxy
[root@localhost haproxy-1.8.12]# vim /etc/haproxy/haproxy.cfg
#全局配置 global #设置日志 log 127.0.0.1 local0 info #当前工作目录 chroot /usr/local/haproxy #用户与用户组 user haproxy group haproxy #运行进程ID uid 99 gid 99 #守护进程启动 daemon #最大连接数 maxconn 4096 #默认配置 defaults #应用全局的日志配置 log global #默认的模式mode {tcp|http|health},TCP是4层,HTTP是7层,health只返回OK mode tcp #日志类别tcplog option tcplog #不记录健康检查日志信息 option dontlognull #3次失败则认为服务不可用 retries 3 #每个进程可用的最大连接数 maxconn 2000 #连接超时 timeout connect 5s #客户端超时30秒,ha就会发起重新连接 timeout client 30s #服务端超时15秒,ha就会发起重新连接 timeout server 15s #绑定配置 listen rabbitmq_cluster bind 192.168.164.130:5672 #注意修改成自己的ip #配置TCP模式 mode tcp #简单的轮询 balance roundrobin #RabbitMQ集群节点配置,每隔5秒对mq集群做检查,连续2次正确证明服务可用,连续3次失败证明服务不可用 #否则一直检查,直到可用和不可用 server A 192.168.164.128:5672 check inter 5000 rise 2 fall 3 #注意修改成自己的ip,A前面设置的地址,相当于192.168.164.128 server B 192.168.164.129:5672 check inter 5000 rise 2 fall 3 #注意修改成自己的ip,B前面设置的地址,相当于192.168.164.129 #haproxy监控页面地址 listen monitor bind 192.168.164.130:8100 #注意修改成自己的ip mode http option httplog stats enable # 监控页面地址 http://192.168.164.130:8100/monitor 注意修改成自己的ip stats uri /monitor stats refresh 5s
[root@localhost haproxy]# service haproxy start #start修改为stop就是关闭
#注意:复制粘贴时可能有隐藏的符号,记得删除(可以将对应信息放在文件里,通过编码的改变,使得显示出来)
#其中,启动前不要占用当前服务器的对应端口(如5672)
#因为bind绑定,即bind 192.168.164.130:5672
#但这些只是常见的错误,若要看自己有什么错误,可用执行如下:
#systemctl status haproxy(后面可用加上.service)
#查看状态,包括一些错误信息(好像端口的占用信息并没有显示),然后根据对应出现的信息进行百度吧(●ˇ∀ˇ●)


| ip | 用途 | 主机名 |
|---|---|---|
| 192.168.164.130 | KeepAlived 和 HAProxy | C |
| 192.168.164.131 | KeepAlived 和 HAProxy | D |
[root@C ~]# yum install -y keepalived
[root@C ~]# rm -rf /etc/keepalived/keepalived.conf
[root@C ~]# vim /etc/keepalived/keepalived.conf
! Configuration File for keepalived global_defs { router_id C ## 非常重要,标识本机的hostname } vrrp_script chk_haproxy{ script "/etc/keepalived/haproxy_check.sh" ## 执行的脚本位置 interval 2 ## 检测时间间隔 weight -20 ## 如果条件成立则权重减20,使得容易被抢(抢到的就是主机) } vrrp_instance VI_1 { state MASTER ## 非常重要,标识主机,备用机改为 BACKUP,这里的主机就是130 interface ens33 ## 非常重要,网卡名(ifconfig查看),飘移到的目标网卡 virtual_router_id 66 ## 非常重要,自定义,虚拟路由ID号(主备节点要相同,使得可以连接,从而飘移) priority 100 ## 优先级(0-254),一般主机的大于备机 advert_int 1 ## 主备信息发送间隔,两个节点必须一致,默认1秒 authentication { ## 认证匹配,设置认证类型和密码,MASTER和BACKUP必须使用相同的密码才能正常通信 auth_type PASS auth_pass 1111 } track_script { chk_haproxy ## 检查haproxy健康状况的脚本 } virtual_ipaddress { ## 简称"VIP" 192.168.164.66/24 ## 非常重要,虚拟ip,可以指定多个,以后连接mq就用这个虚拟ip,即查询的ip中显示的就是这个 } } virtual_server 192.168.164.66 5672 { ## 虚拟ip的详细配置 delay_loop 6 # 健康检查间隔,单位为秒 lb_algo rr # lvs调度算法rr|wrr|lc|wlc|lblc|sh|dh lb_kind NAT # 负载均衡转发规则,一般包括DR,NAT,TUN 3种 protocol TCP # 转发协议,有TCP和UDP两种,一般用TCP real_server 192.168.164.130 5672 { ## 本机的真实ip,好像并没有特别的作用,可用自己设置没有的ip进行测试 weight 1 # 默认为1,0为失效 } } #复制时,记得看看没有隐藏的符号,下面的操作以该配置文件为例子(说明也是)
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then #检查是否工作
#没有工作执行下面代码
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg #重启HAproxy
sleep 2 #等待两秒
if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then #再次检测
#没有工作执行下面代码
killall keepalived #关掉主 Keepalived ,使得备用机上位
fi
fi
#复制时,记得看看没有隐藏的符号
[root@C etc]# chmod +x /etc/keepalived/haproxy_check.sh
[root@C etc]# systemctl stop firewalld #关闭防火墙
[root@C etc]# service keepalived start(启动)
#start可用写成stop(关闭,停止),status(查看状态),restart(重启)
#给一个小命令:ip a,可用直接查看对应虚拟机的网卡信息(包括ip),而不用使用ifconfig命令来查看了,省事一点
#虽然少一点信息,但那些信息,基本可用忽略
[root@C etc]# ps -ef | grep haproxy
[root@C etc]# ps -ef | grep keepalived
[root@C etc]# ip a

[root@A ~]# curl 192.168.164.66:5672
AMQP ## 正常提供AMQP服务,表示通过vip访问mq服务正常
#因为192.168.164.66也就相当于访问对应虚拟机,而192.168.164.130(飘移到这里),也是访问对应虚拟机
#都是一种访问的方式,或者都是访问同一个网卡(MAC地址),所以效果是一样的,即你也可用操作对应真实ip进行测试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。