主页 > IT业界  > 

007订单支付超时自动取消订单(rabbitmq死信队列mybatis)

007订单支付超时自动取消订单(rabbitmq死信队列mybatis)

文章目录 死信队列RabbitMQ 配置类 RabbitMQConfig.java生产者 OrderTimeoutProducer.java消费者 OrderTimeoutConsumer.java应用配置 application.ymlpom.xml 依赖实体类 Order.java(不变)Mapper 接口 OrderMapper.java(不变)服务层 OrderService.java(不变)缓存配置 CacheConfig.java(不变)对账服务 ReconciliationTask.java(不变)控制器 OrderController.java(不变)

死信队列

在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。

不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。

另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。

还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。

总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。

+---------------------+ | RabbitMQ Message | | (携带唯一messageId) | +----------+----------+ | v +----------------+ +-------+-------+ +-----------------+ | 消息到达消费者 | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 | +----------------+ +-------+-------+ +-----------------+ | | 不存在 v +-------+-------+ +-----------------+ | 执行业务逻辑处理 | ----> | 成功:存入缓存并ACK | +---------------+ +-----------------+

缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间) 计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间

缓存击穿空值缓存对不存在的key也进行缓存(需设置较短过期时间)缓存穿透布隆过滤器在缓存前增加过滤层消费者重启持久化存储配合数据库记录处理状态网络分区最终一致性依赖对账服务修正状态 组件类型作用说明processedMsgCacheCaffeine缓存存储已处理消息的唯一标识messageId字符串消息唯一标识(需生产者保证唯一性)deliveryTag长整型RabbitMQ消息投递标识 sequenceDiagram participant RabbitMQ participant Consumer participant Cache participant DB RabbitMQ->>Consumer: 投递消息(messageId=123) Consumer->>Cache: 查询messageId=123 alt 存在缓存 Cache-->>Consumer: 返回true Consumer->>RabbitMQ: 发送ACK else 无缓存 Consumer->>DB: 执行取消操作 alt 操作成功 Consumer->>Cache: 写入messageId=123 Consumer->>RabbitMQ: 发送ACK else 操作失败 Consumer->>RabbitMQ: 发送NACK(requeue=true) end end RabbitMQ 配置类 RabbitMQConfig.java import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 订单超时相关配置 public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange"; public static final String ORDER_DELAY_QUEUE = "order.delay.queue"; public static final String ORDER_DELAY_ROUTING_KEY = "order.delay"; // 死信队列配置 public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange"; public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue"; public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter"; // 声明延迟队列(设置死信参数) @Bean public Queue orderDelayQueue() { return QueueBuilder.durable(ORDER_DELAY_QUEUE) .withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE) .withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY) .build(); } // 声明延迟交换机 @Bean public DirectExchange orderDelayExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE); } // 绑定延迟队列到交换机 @Bean public Binding delayBinding() { return BindingBuilder.bind(orderDelayQueue()) .to(orderDelayExchange()) .with(ORDER_DELAY_ROUTING_KEY); } // 声明死信队列 @Bean public Queue deadLetterQueue() { return new Queue(ORDER_DEAD_LETTER_QUEUE, true); } // 声明死信交换机 @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE); } // 绑定死信队列到交换机 @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with(ORDER_DEAD_LETTER_ROUTING_KEY); } // JSON 消息转换器 @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } } 生产者 OrderTimeoutProducer.java import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Component public class OrderTimeoutProducer { private final RabbitTemplate rabbitTemplate; public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendTimeoutMessage(String orderId) { // 设置消息过期时间为30分钟(单位:毫秒) MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("1800000"); return message; } }; rabbitTemplate.convertAndSend( RabbitMQConfig.ORDER_DELAY_EXCHANGE, RabbitMQConfig.ORDER_DELAY_ROUTING_KEY, orderId, messagePostProcessor ); } } 消费者 OrderTimeoutConsumer.java import com.github.benmanes.caffeine.cache.Cache; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; @Component public class OrderTimeoutConsumer { private final OrderService orderService; private final Cache<String, Boolean> processedMsgCache; public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) { this.orderService = orderService; this.processedMsgCache = Caffeine.newBuilder() .expireAfterWrite(1, TimeUnit.HOURS) .maximumSize(10000) .build(); } @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE) public void processMessage(Message message, Channel channel) throws IOException { String orderId = new String(message.getBody(), StandardCharsets.UTF_8); String messageId = message.getMessageProperties().getMessageId(); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 幂等性检查 if (processedMsgCache.getIfPresent(messageId) != null) { channel.basicAck(deliveryTag, false); return; } boolean success = orderService.safeCancel(orderId); if (success) { processedMsgCache.put(messageId, true); System.out.println("订单超时取消成功: " + orderId); } channel.basicAck(deliveryTag, false); } catch (Exception e) { // 记录错误日志,重新放回队列 channel.basicNack(deliveryTag, false, true); System.err.println("处理订单超时取消失败: " + orderId); e.printStackTrace(); } } } 应用配置 application.yml spring: rabbitmq: host: ${RABBITMQ_HOST:localhost} port: 5672 username: ${RABBITMQ_USER:guest} password: ${RABBITMQ_PASSWORD:guest} virtual-host: / connection-timeout: 5000 template: retry: enabled: true max-attempts: 3 initial-interval: 1000ms listener: simple: acknowledge-mode: manual # 手动确认模式 prefetch: 10 # 每次预取数量 retry: enabled: true max-attempts: 3 initial-interval: 1000ms pom.xml 依赖 <!-- 移除 RocketMQ 依赖 --> <!-- 添加 RabbitMQ 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 实体类 Order.java(不变) public class Order { // 保持原有实现 } Mapper 接口 OrderMapper.java(不变) @Mapper public interface OrderMapper { // 保持原有SQL操作 } 服务层 OrderService.java(不变) @Service public class OrderService { // 保持原有业务逻辑 } 缓存配置 CacheConfig.java(不变) @Configuration public class CacheConfig { // 保持原有缓存配置 } 对账服务 ReconciliationTask.java(不变) @Component public class ReconciliationTask { // 保持原有定时任务逻辑 } 控制器 OrderController.java(不变) @RestController @RequestMapping("/orders") public class OrderController { // 保持原有API接口 }

关键差异对比

功能点RocketMQ 实现RabbitMQ 实现延迟机制内置延迟级别TTL+死信队列消息存储持久化到CommitLog内存+磁盘持久化消费确认自动ACK手动ACK+重试机制消息追踪原生支持消息轨迹需要额外实现集群方案主从复制镜像队列

部署注意事项 队列初始化:确保首次启动时自动创建所需交换机和队列 消息持久化:所有队列声明时设置durable=true 监控配置:需要监控以下指标: 死信队列消息堆积量 消费者处理耗时 消息TTL过期情况 集群配置:建议至少3节点RabbitMQ集群 故障转移:配置镜像队列实现高可用

性能优化建议 批量处理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory") public void processMessages(List<Message> messages) { // 批量查询缓存 Map<String, Boolean> existMap = cache.getAllPresent(messageIds); // 过滤已处理消息 // 批量执行数据库操作 }

异步更新:

CompletableFuture.runAsync(() -> { processedMsgCache.put(messageId, true); });

分级缓存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build(); Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。 看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。 另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。 所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。

标签:

007订单支付超时自动取消订单(rabbitmq死信队列mybatis)由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“007订单支付超时自动取消订单(rabbitmq死信队列mybatis)