【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码
- 人工智能
- 2025-08-16 17:15:04

参考资料
RabbitMQ官方网站RabbitMQ官方文档噼咔噼咔-动力节点教程 文章目录 七、延迟队列7.1 什么是延迟队列7.2 延迟队列的解决方案7.2.1 定时任务7.2.2 **被动取消**7.2.3 JDK的延迟队列7.2.3 采用消息中间件(rabbitMQ7.2.3.1 适用专门优化后的死信队列实现延迟队列7.2.3.2 :star:实例代码7.2.3.2 测试结果 7.2.4 使用rabbitmq_delayed_message_exchange插件.7.2.4.1 插件下载7.2.4.2 :star:如何在docker环境下安装插件7.2.4.3 :star: 代码示例:如何使用该插件7.2.4.4 测试结果 7.3 问题:多个消息的延迟时间不同该如何解决?7.3.1 解决方案一:用延迟队列区分7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange 七、延迟队列 7.1 什么是延迟队列正常的MQ应用场景中,我们希望消息可以快速稳定的传递。但是有一些场景中,希望在指定的延迟后再消费信息,比如订单支付场景(订单15部分内未支付则关闭订单)。
这类实现延迟任务的场景,就可以采用延迟队列来实现。
以下介绍一下其他的一些方法。
7.2 延迟队列的解决方案 7.2.1 定时任务每隔n秒扫描一次数据库,查询数据库装为过期的订单进行处理。
实现方式
spring schedule、quartz、xxljob等
优点
简单,容易实现;
缺点
存在延迟(受定时器延迟时间限制性能较差,每次扫描数据库,如果订单量交大,会给数据库造成较大压力。 7.2.2 被动取消当用户主动查询订单时,判断订单是否超时,超时则取消
优点:服务器压力小缺点:如果用户长时间不查询,则会造成统计异常;而且用户打开订单页面会变慢,严重的话会影响用户体验 7.2.3 JDK的延迟队列DelayedQueue:无界阻塞队列,该队列只有在延迟期满后,才能从中获取元素。
优点
实现简单,任务的延迟低。
缺点
服务器重启宕机,数据会丢失只适用于单机版订单量大时,可能会造成内存不足:OOM 7.2.3 采用消息中间件(rabbitMQRabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递(前面提到的死信队列)。.
把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费者可以从DLX的队列中取走消息。
7.2.3.1 适用专门优化后的死信队列实现延迟队列在上面的mq方案中,存在两个不同的交换机,我们可以利用直连交换机的特性,将交换机优化成一个交换机,同时通过不同的routingKey指定普通队列和死信队列。
思路解释
生产者发送消息到交换机X,并指定ttl的key消息被交换机传递到ttl队列中(指定了消息过期时间的队列同时,ttl队列还指定的死信交换机DLX为自身的交换机X,但是指定的routingKey为死信队列的key这样,当消息在ttl队列中到期后,这条消息就会被传递到死信队列中,提供给消费者 7.2.3.2 ⭐️实例代码为了便于测试,将发送和接收写在同一个服务中
配置信息
@Configuration public class DelayExchangeConfig { public static String exchangeName = "order.ttl.exchange"; public static String orderQ = "order.ttl.queue"; public static String dlxQ = "order.dlx.queue"; @Bean public DirectExchange delayedExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } @Bean public Queue orderQueue(){ // 指定该队列的过期时间和死信队列 Map<String , Object> properties = new HashMap<>(); properties.put("x-message-ttl" , 15000); properties.put("x-dead-letter-exchange" , exchangeName); properties.put("x-dead-letter-routing-key" , "dead-letter"); return QueueBuilder.durable(orderQ).withArguments(properties).build(); } @Bean public Queue dlxQueue(){ return QueueBuilder.durable(dlxQ).build(); } @Bean public Binding dlxBinding1(){ return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("dead-letter"); } @Bean public Binding ttlBinding1(){ return BindingBuilder.bind(dlxQueue()).to(this.delayedExchange()).with("order"); } }测试代码
@RestController @RequestMapping("/delay") @Slf4j public class DelayedController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/{msg}") public void sentErrorMsg(@PathVariable("msg") String msg) { log.info("(延迟队列)准备发送的信息:{} , 路由键 :{}", msg, "order"); // 发送到普通的延时列表中 rabbitTemplate.convertAndSend(exchangeName, "order", msg.getBytes(StandardCharsets.UTF_8)); log.info("(延迟队列)成功发送!发送时间{}" , LocalDateTimeUtil.now()); } @RabbitListener(queues = "order.dlx.queue") public void receiveDelayedMsg(Message message){ log.info("(延迟队列)接受到的消息是:{}" , new String(message.getBody())); } } 7.2.3.2 测试结果配置正确
控制台打印正确:15秒后接收到的了之前发送的信息
7.2.4 使用rabbitmq_delayed_message_exchange插件. 7.2.4.1 插件下载
插件下载地址
.rabbitmq /community-plugins.html github /rabbitmq/rabbitmq-delayed-message-exchange/releases 根据自己的rabbit版本,我这里用的是3.9 7.2.4.2 ⭐️如何在docker环境下安装插件参考文章: juejin /post/7138717546894589966
将下载到的文件,移动到容器内
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins进入容器bash指令,并启动插件
docker exec -it rabbitmq bash root@rabbit:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 使用下面的指令查看插件列表 rabbitmq-plugins list进入控制台新建交换机,能查看到新的交换机类型
7.2.4.3 ⭐️ 代码示例:如何使用该插件官方说明文档: github /rabbitmq/rabbitmq-delayed-message-exchange#usage
理解原理:delay exchange在接受到消息后,会先存在内部数据库中,检查x-delay延迟时间(头部
代码使用思路
要创建自定义的交换机类型,要使用CustomExchange()来创建。几个参数的解释如下:
name:rabbit中交换机的名称type:交换机类型 (x-delayed-message)durable:是否持久autoDelete:是否自动删除arguments:参数信息arguments:参数信息从官方文档中获取
// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...交换机创建好后,只需要创建一条队列即可,并进行绑定
注意:消息发送需要在头部存放信息:headers.put("x-delay", 延迟时间)。不需要使用自带的expiration来控制延迟时间了
配置类
@Configuration public class DelayPluginConfig { public static String exchangeName = "delay-x-plugin.x"; public static String key = "demo"; @Bean public CustomExchange customExchange(){ // 参考官方文档,创建插件提供的自定义交换机 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); // public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) return new CustomExchange(exchangeName, "x-delayed-message" , true , false , args); } @Bean public Queue delayDemoQueue(){ return QueueBuilder.durable("delay-x-plugin.queue.demo").build(); } @Bean public Binding delayPluginBinding(){ return BindingBuilder .bind(delayDemoQueue()) .to(customExchange()) .with(key) .noargs(); } }生产者
@RestController @RequestMapping("/delay/plugin") @Slf4j public class DelayedPluginController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/{delay}/{msg}") public void sentErrorMsg(@PathVariable("msg") String msg, @PathVariable("delay") Long delay) { log.info("(延迟插件队列)准备发送的信息:{} ,延迟时间:{} 路由键 :{}", msg, delay , "demo"); // 在头部设置过期时间 MessageProperties properties = new MessageProperties(); properties.setHeader("x-delay", delay); Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build(); // 发送信息 rabbitTemplate.convertAndSend(exchangeName, "demo", message); log.info("(延迟插件队列)成功发送!发送时间:{}", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())); } @RabbitListener(queues = "delay-x-plugin.queue.demo") public void receiveDelayedMsg(Message message) { log.info("(延迟插件队列)接受到的消息是:{}", new String(message.getBody())); } } 7.2.4.4 测试结果生成交换机和队列
访问路径/delay/plugin/25000/一条25秒过期的信息:查看日志打印:成功
7.3 问题:多个消息的延迟时间不同该如何解决?由于队列先进先出的特性,如果不同消息的延迟时间不同,一旦出现后进的消息延迟时间小于先进的队列,那么消息过期的时间就会出错。
7.3.1 解决方案一:用延迟队列区分要解决这个问题,就需要将队列的延迟时间统一,将不同的延迟的消息发送到对应延迟的队列中。
保证队列的延迟时间和消息的延迟时间是一样的即可。
如下
7.3.2 使用延迟队列插件rabbitmq_delayed_message_exchange由于该插件的原理并不是单纯的队列实现,而是使用rabbit内部数据库时间,所以可以很好的解决问题。
可以进行一个简单测试验证:
先发送一条25秒过期的信息,再发送3条5秒过期的信息
查看结果:正常消费,解决问题
【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【学习笔记】RabbitMQ04:延迟队列的原理以及实现代码”