赞
踩
//配置 spring: rabbitmq: host: localhost port: 5672 username: xxx password: xxx virtual-host: xxx # publisher-confirms: true publisher-confirm-type: correlated # 发送方确认 publisher-returns: true # 交换机找不到队列回调 listener: simple: acknowledge-mode: manual # 开启手动ack prefetch: 100 # 消费端同时处理的请求个数限制默认250个 //交换机队列等 @Configuration public class RabbitConfig { @Autowired private CachingConnectionFactory connectionFactory; private final static String DELAY_EXCHANGE = "delay_exchange"; private final static String DELAY_QUEUE = "delay_queue"; private final static String DLX_EXCHANGE = "dlx_exchange"; private final static String DLX_QUEUE = "dlx_queue"; @Bean public RabbitAdmin rabbitadmin(CachingConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //消息转换器 rabbitTemplate.setMessageConverter(converter()); //生产这confirm rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if(b){ log.info("消息发送成功 ->{}",correlationData.getReturnedMessage()); }else{ log.error("消息->{} 发送失败原因-> {}",correlationData.getReturnedMessage(),s); } } }); //消息回调 //开启强制回调 rabbitTemplate.setMandatory(true); //设置消息回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { log.error("消息->{} 路由失败",message); } }); return rabbitTemplate; } @Bean public MessageConverter converter(){ return new Jackson2JsonMessageConverter(); } @Bean public DirectExchange delayExchange(RabbitAdmin rabbitAdmin){ DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE); rabbitAdmin.declareExchange(exchange); return exchange; } @Bean public DirectExchange dlxExchange(RabbitAdmin rabbitAdmin){ DirectExchange exchange = new DirectExchange(DLX_EXCHANGE); rabbitAdmin.declareExchange(exchange); return exchange; } @Bean public Queue delayQueue(RabbitAdmin rabbitAdmin){ HashMap<String, Object> args = new HashMap<>(); //死信 args.put("x-dead-letter-exchange",DLX_EXCHANGE); //死信queue args.put("x-dead-letter-routing-key","dlx_key"); //队列的ttl,队列中所有消息的过期时间 args.put("x-message-ttl",10000); //持久化,xx,自动删除,参数 Queue queue = new Queue(DELAY_QUEUE,true,false,false,args); rabbitAdmin.declareQueue(queue); return queue; } @Bean public Queue dlxQueue(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(DLX_QUEUE); rabbitAdmin.declareQueue(queue); return queue; } @Bean public Binding delayBind(RabbitAdmin rabbitAdmin){ Binding delay_key = BindingBuilder .bind(delayQueue(rabbitAdmin)) .to(delayExchange(rabbitAdmin)) .with("delay_key"); rabbitAdmin.declareBinding(delay_key); return delay_key; } @Bean public Binding dlxBind(RabbitAdmin rabbitAdmin){ Binding dlx_key = BindingBuilder .bind(dlxQueue(rabbitAdmin)) .to(dlxExchange(rabbitAdmin)) .with("dlx_key"); rabbitAdmin.declareBinding(dlx_key); return dlx_key; } } //生产 @Component public class DelaySendMessage { @Autowired private RabbitTemplate rabbitTemplate; private final static String DELAY_EXCHANGE = "delay_exchange"; private final static String DELAY_QUEUE = "delay_queue"; private final static String DLX_EXCHANGE = "dlx_exchange"; private final static String DLX_QUEUE = "dlx_queue"; public void sendMsg() { Message message = MessageBuilder.withBody( JSONObject.toJSONString(MessageModel.builder().id(1).context("提现成功!!").build()).getBytes()).build(); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); CorrelationData correlationData = new CorrelationData(String.valueOf(1)); rabbitTemplate.convertAndSend(DELAY_EXCHANGE,"delay_key",message,correlationData); } } //消费 @Component public class ReceiveMessage { private final static String DELAY_EXCHANGE = "delay_exchange"; private final static String DELAY_QUEUE = "delay_queue"; private final static String DLX_EXCHANGE = "dlx_exchange"; private final static String DLX_QUEUE = "dlx_queue"; //延时队列 @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = DLX_EXCHANGE, type = ExchangeTypes.DIRECT), key = "delay_key", value = @Queue(value = DLX_QUEUE, autoDelete = "false"), ignoreDeclarationExceptions = "true"), concurrency = "1", // 指定监听该队列的消费者个数 ackMode = "MANUAL"// 手动ack ) public void receiveCode(Channel channel, Message msg, @Headers Map<String, Object> headers) throws IOException, InterruptedException { log.info(msg.toString()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。