SpringBoot+RabbitMQ 实现延迟队列
2,094 total views, 3 views today
RabbitMQ 实现延迟队列,方法有两种。
第一种是安装延迟队列插件;
第二种就是利用死信队列的方式;
这里采用第二种方式。
rabbitmq 自身的一些概念,可以去网上或者书上获得。rabbitmq 延迟队列的实现原理,网上资料很多,简单盗图一张。

简单说明一下原理。
将消息发送到一个队列中去,消息自身有一个 TTL,即失效时间,如果到期还是为消费该消息,那么该消息就成为死信,将死信移到专门的死性队列,然后消费者只需要消费死信队列中的消息,变相的实现了延迟消息的功能。
基于 Springboot 的具体代码实现:
POM
1 2 3 4 5 |
<!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
spring: rabbitmq: host: 111.22.1.2 port: 5672 username: user password: passw template: mandatory: true #支持发布确认与返回 publisher-confirms: true publisher-returns: true listener: simple: #是否自动开始监听消息队列 auto-startup: false #手动应答 acknowledge-mode: manual #监听容器数及最大数 concurrency: 1 max-concurrency: 1 #是否支持重试 retry: enabled: true |
配置文件 TopicRabbitConfig.Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package com.mine.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class TopicRabbitConfig { //延迟队列的名字 public static final String delayQueueName = "delay_machine_check_queue"; //延迟队列的exchange名字 public static final String delayExchangeName = "delay_machine_check_exchange"; //死信exchange的名称 public static final String deadLetterProcessQueueName = "dead_letter_process_queue"; //死信exchange的名称 public static final String deadLetterProcessExchangeName = "dead_letter_machine_check_exchange"; public static final String routingKey = "machine_check"; //延迟队列的exchange @Bean public TopicExchange delayExchange() { return new TopicExchange(delayExchangeName, true, false); } //死信队列的exchange @Bean public TopicExchange deadLetterProcessExchange() { return new TopicExchange(deadLetterProcessExchangeName, true, false); } //延迟队列 @Bean Queue delayQueue() { Map<String, Object> args = new HashMap<>(); //args.put("x-message-ttl", 20000); args.put("x-dead-letter-exchange", deadLetterProcessExchangeName); //DLX,dead letter发送到的exchange args.put("x-dead-letter-routing-key", routingKey); return new Queue(delayQueueName, true, false, false, args); } //死信队列 @Bean public Queue deadLetterProcessQueue() { return new Queue(deadLetterProcessQueueName, true, false, false); } //绑定延迟队列,延迟Exchange,routing关系 @Bean Binding bindingDelayExchange(Queue delayQueue, TopicExchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with(routingKey); } //绑定死信队列,死信Exchange,routing关系 //参数根据Spring命名约定的方式,会将上面的Queue实例和exchange实例注入进来,形成绑定关系 @Bean Binding bindingDeadLetterProcessExchange(Queue deadLetterProcessQueue, TopicExchange deadLetterProcessExchange) { return BindingBuilder.bind(deadLetterProcessQueue).to(deadLetterProcessExchange).with(routingKey); } } |
监听代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//只监听听死信队列即可 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = TopicRabbitConfig.deadLetterProcessQueueName, durable = "true", autoDelete = "false"), exchange = @Exchange(name = TopicRabbitConfig.deadLetterProcessExchangeName, durable = "true", type = "topic", autoDelete = "false"), key = TopicRabbitConfig.routingKey), autoStartup = "true", id = "myconsumer") public void receiveRabbitMQ(Message message, Channel channel) throws Exception { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); LogHelper.log4j2.info("收到消息:" + new String(message.getBody())); } catch (Exception ex) { LogHelper.log4j2.error("receive", ex); } } |
上面 2 段代码,实现的效果就是如下:
发送消息的代码:RabbitMQTopicSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package com.mine.utils; import com.mine.config.TopicRabbitConfig; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @Component public class RabbitMQTopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String message, Long delaySecond) { /*** * 方法参数说明 * https://docs.spring.io/spring-amqp/docs/latest_ga/api/org/springframework/amqp/rabbit/core/RabbitTemplate.html convertAndSend(String exchange, String routingKey, Object object) Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. ***/ message = message + "【" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "】"; LogHelper.log4j2.info("message:" + message); MessageProperties props = new MessageProperties(); props.setExpiration(Long.toString(delaySecond * 1000));//消息的延迟时间 Message ttlMessage = new Message(message.getBytes(), props); rabbitTemplate.convertAndSend(TopicRabbitConfig.delayExchangeName, TopicRabbitConfig.routingKey, ttlMessage); LogHelper.log4j2.info("消息发送成功"); } } |
至此调用 send 方法,即可发送延迟队列。
以上代码亲测有效。
原创文章,转载请注明出处!http://www.javathings.top/springbootrabbitmq实现延迟队列/