013-2订单支付超时自动取消订单(rocketmqjpa)
- 开源代码
- 2025-09-12 22:45:01

文章目录 完整实现代码(基于Spring Boot + RocketMQ + MySQL)1. 项目依赖(pom.xml)2. 应用配置(application.yml)3. 实体类(Order.java)4. 数据库仓库(OrderRepository.java)5. 服务层(OrderService.java)6. RocketMQ生产者(OrderTimeoutProducer.java)7. RocketMQ消费者(OrderTimeoutConsumer.java)8. 缓存配置(CacheConfig.java)9. 订单控制器(OrderController.java)10. 对账服务(每日补偿)系统架构图关键特性说明使用说明 其他
实体类(Entity):定义订单的数据结构。
数据库访问层(Repository):使用JPA或JDBC进行数据库操作。
服务层(Service):处理业务逻辑,包括创建订单、支付订单、取消订单等。
消息生产者(Producer):发送延迟消息到RocketMQ。
消息消费者(Consumer):消费延迟消息,处理订单超时取消。
配置类(Configuration):配置RocketMQ、数据源等。
异常处理:处理可能出现的异常,保证系统稳定性。
测试类:验证各个功能是否正常。
订单实体需要包含订单ID、状态、创建时间等字段。服务层需要处理订单的创建、支付和取消,其中取消订单需要结合数据库的条件更新和内存检查。消息生产者需要在订单创建时发送延迟消息,消费者在消息到达时处理超时取消。
在并发处理方面,要确保在支付和取消同时发生时,数据库的条件更新能够保证原子性,避免状态覆盖。此外,消息消费者需要处理幂等性,防止重复消费导致错误。
还需要考虑性能优化,比如使用缓存(如Caffeine)来减少数据库查询的压力,同时保证缓存与数据库的一致性。可能还需要引入分布式锁(如Redisson)来处理高并发下的资源竞争问题。
异常处理方面,需要处理RocketMQ消息发送失败、数据库操作失败等情况,使用重试机制或事务管理来保证数据一致性。
测试部分需要模拟并发场景,验证在高并发下订单状态是否正确变更,延迟消息是否按时触发,以及系统的稳定性和性能表现。
最后,需要验证整个流程是否顺畅,各个组件是否协同工作,没有遗漏任何关键点。例如,订单创建后是否成功发送延迟消息,消息消费者是否能正确处理超时订单,支付和取消的并发操作是否不会导致状态错误等。
完整实现代码(基于Spring Boot + RocketMQ + MySQL) 1. 项目依赖(pom.xml) <dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- Caffeine Cache --> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> </dependency> <!-- Redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.17.0</version> </dependency> </dependencies> 2. 应用配置(application.yml) spring: datasource: url: jdbc:mysql://localhost:3306/order_db?useSSL=false username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver jpa: hibernate: ddl-auto: update show-sql: true rocketmq: name-server: localhost:9876 producer: group: order-producer-group caffeine: spec: maximumSize=10000,expireAfterWrite=5m redisson: config: classpath:redisson.yaml 3. 实体类(Order.java) @Entity @Table(name = "orders") public class Order { @Id private String orderId; @Enumerated(EnumType.STRING) private OrderStatus status; private Long createTime; @Version private Integer version; // Getters and Setters } public enum OrderStatus { UNPAID, PAID, CANCELLED } 4. 数据库仓库(OrderRepository.java) public interface OrderRepository extends JpaRepository<Order, String> { @Modifying @Query("UPDATE Order o SET o.status = 'CANCELLED', o.version = o.version + 1 " + "WHERE o.orderId = :orderId AND o.status = 'UNPAID'") int cancelUnpaidOrder(@Param("orderId") String orderId); @Modifying @Query("UPDATE Order o SET o.status = 'PAID', o.version = o.version + 1 " + "WHERE o.orderId = :orderId AND o.status = 'UNPAID'") int payOrder(@Param("orderId") String orderId); } 5. 服务层(OrderService.java) @Service @RequiredArgsConstructor public class OrderService { private final OrderRepository orderRepository; private final Cache<String, OrderStatus> orderCache; private final RedissonClient redisson; @Transactional public String createOrder() { String orderId = UUID.randomUUID().toString(); Order order = new Order(); order.setOrderId(orderId); order.setStatus(OrderStatus.UNPAID); order.setCreateTime(System.currentTimeMillis()); orderRepository.save(order); orderCache.put(orderId, OrderStatus.UNPAID); return orderId; } public boolean safePay(String orderId) { // 分布式锁保证同一订单的支付原子性 RLock lock = redisson.getLock("order:pay:" + orderId); try { if (lock.tryLock(1, 10, TimeUnit.SECONDS)) { // 双重检查 if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) { return false; } int affected = orderRepository.payOrder(orderId); if (affected > 0) { orderCache.put(orderId, OrderStatus.PAID); return true; } } return false; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } finally { lock.unlock(); } } @Transactional public boolean safeCancel(String orderId) { // 内存缓存快速过滤 if (orderCache.getIfPresent(orderId) != OrderStatus.UNPAID) { return false; } // 数据库条件更新 int affected = orderRepository.cancelUnpaidOrder(orderId); if (affected > 0) { orderCache.put(orderId, OrderStatus.CANCELLED); return true; } return false; } } 6. RocketMQ生产者(OrderTimeoutProducer.java) @Component @RequiredArgsConstructor public class OrderTimeoutProducer { private final RocketMQTemplate rocketMQTemplate; public void sendTimeoutMessage(String orderId) { Message<String> message = MessageBuilder.withPayload(orderId) .setHeader(RocketMQHeaders.KEYS, orderId) .build(); // 使用延迟等级4对应30分钟(需提前配置Broker) rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", message, 3000, 4); } } 7. RocketMQ消费者(OrderTimeoutConsumer.java) @Service @RocketMQMessageListener( topic = "ORDER_TIMEOUT_TOPIC", consumerGroup = "order-timeout-consumer", selectorType = SelectorType.TAG, messageModel = MessageModel.CLUSTERING ) public class OrderTimeoutConsumer implements RocketMQListener<MessageExt> { private final OrderService orderService; private final Cache<String, Boolean> processedMsgCache = Caffeine.newBuilder() .expireAfterWrite(1, TimeUnit.HOURS) .maximumSize(10000) .build(); @Override public void onMessage(MessageExt message) { String msgId = message.getMsgId(); if (processedMsgCache.getIfPresent(msgId) != null) { return; } String orderId = new String(message.getBody()); try { boolean success = orderService.safeCancel(orderId); if (success) { processedMsgCache.put(msgId, true); log.info("订单超时取消成功:{}", orderId); } } catch (Exception e) { log.error("订单取消处理失败:{}", orderId, e); throw new RuntimeException(e); } } } 8. 缓存配置(CacheConfig.java) @Configuration public class CacheConfig { @Bean public Cache<String, OrderStatus> orderStatusCache() { return Caffeine.newBuilder() .maximumSize(10_000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); } } 9. 订单控制器(OrderController.java) @RestController @RequestMapping("/orders") @RequiredArgsConstructor public class OrderController { private final OrderService orderService; private final OrderTimeoutProducer timeoutProducer; @PostMapping public String createOrder() { String orderId = orderService.createOrder(); timeoutProducer.sendTimeoutMessage(orderId); return orderId; } @PostMapping("/{orderId}/pay") public ResponseEntity<?> payOrder(@PathVariable String orderId) { return orderService.safePay(orderId) ? ResponseEntity.ok().build() : ResponseEntity.status(HttpStatus.CONFLICT).build(); } } 10. 对账服务(每日补偿) @Component @RequiredArgsConstructor public class ReconciliationTask { private final OrderRepository orderRepository; private final RocketMQTemplate rocketMQTemplate; @Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行 public void checkUnpaidOrders() { LocalDateTime threshold = LocalDateTime.now().minusMinutes(30); orderRepository.findUnpaidExpiredOrders(threshold) .forEach(order -> { rocketMQTemplate.syncSend("ORDER_TIMEOUT_TOPIC", order.getOrderId()); }); } } 系统架构图 graph TD A[用户] --> B[创建订单] B --> C[发送延迟消息] C --> D[RocketMQ Broker] D -->|30分钟后| E[消费者处理] E --> F[条件更新取消订单] G[支付请求] --> H[分布式锁] H --> I[状态校验] I --> J[数据库更新] K[定时对账] --> L[补偿未处理订单] classDef service fill:#e1f5fe,stroke:#039be5; classDef mq fill:#f0f4c3,stroke:#afb42b; classDef db fill:#dcedc8,stroke:#689f38; class B,H,I,J,K,L service class C,D,E mq class F,J db 关键特性说明多层防御体系 内存缓存快速过滤(Caffeine) 数据库条件更新保证原子性 分布式锁(Redisson)协调支付操作 消息幂等处理(Caffeine缓存)
异常处理机制 消息消费失败自动重试 每日对账补偿异常订单 分布式锁自动释放保证
性能优化 缓存命中率监控(Micrometer) 批量消息处理优化 数据库连接池配置(HikariCP)
监控指标
@Bean public MeterRegistryCustomizer<MeterRegistry> metrics() { return registry -> { CaffeineCacheMetrics.monitor( registry, orderStatusCache, "order_status_cache" ); }; } 使用说明启动前准备
# 创建MySQL数据库 CREATE DATABASE order_db CHARACTER SET utf8mb4; # 配置RocketMQ延迟等级(broker.conf) messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hAPI调用示例
# 创建订单 POST /orders Response: {"orderId": "5d8f8c7e-1234-5678-90ab-cdef12345678"} # 支付订单 POST /orders/5d8f8c7e-1234-5678-90ab-cdef12345678/pay # 查询订单 GET /orders/5d8f8c7e-1234-5678-90ab-cdef12345678压力测试
# 使用wrk模拟并发创建订单 wrk -t4 -c100 -d30s http://localhost:8080/orders # 同时发起支付请求 ab -n 1000 -c 100 http://localhost:8080/orders/{orderId}/pay 其他JPA的全称是Java Persistence API,它是Java EE(现在Jakarta EE)的一部分,用于对象关系映射(ORM),让开发者可以通过面向对象的方式来操作数据库。JPA本身是一个规范,不是具体的实现,常见的实现有Hibernate(最流行的实现,功能丰富)、EclipseLink(Oracle 官方参考实现)、OpenJPA(Apache 项目)等。
考虑不同的场景和需求。可能的替代方案包括:
MyBatis:与JPA不同,MyBatis更偏向于SQL映射,允许开发者直接编写SQL语句,提供了更大的灵活性,特别是对于复杂查询或需要优化SQL性能的情况。
JDBC:这是最基础的数据库连接方式,直接使用SQL语句,没有ORM功能。虽然灵活,但需要大量样板代码,容易出错。
Spring Data JDBC:Spring提供的另一种数据访问方式,比纯JDBC更简单,支持一些简单的ORM功能,但不提供JPA的复杂特性如缓存、延迟加载等。
NoSQL数据库的驱动程序:如果项目使用MongoDB、Redis等NoSQL数据库,通常会使用对应的客户端库,如MongoTemplate、Lettuce等,这些不属于JPA的范畴。
其他ORM框架:比如JOOQ,它基于SQL的模式生成类型安全的SQL查询,适合喜欢SQL但希望有类型安全的开发者。
新兴技术:如Micronaut Data、Quarkus的Hibernate Panache,它们可能在简化数据访问层方面提供不同的方法。
MyBatis适合需要高度控制SQL的场景,而JOOQ适合喜欢类型安全SQL的开发者。 如果用户在处理复杂的领域模型,JPA的级联操作和声明式事务可能更有优势;如果项目需要高性能的定制SQL,MyBatis或JOOQ可能更合适。
JPA 的核心特点
特性说明对象关系映射将 Java 对象(Entity)映射到数据库表,自动处理增删改查操作JPQL类似 SQL 的查询语言,但面向对象(如 SELECT u FROM User u WHERE u.age > 18)事务管理通过 @Transactional 注解管理数据库事务延迟加载按需加载关联数据(如 @OneToMany(fetch = FetchType.LAZY))缓存机制一级缓存(Session 级别)和二级缓存(应用级别)提升性能JPA 的常见替代方案 MyBatis 类型:SQL 映射框架(半自动化 ORM) 特点: 开发者需手动编写 SQL,但通过 XML 或注解映射到 Java 对象。 灵活性强,适合复杂查询和高度定制的 SQL 优化。 学习曲线低于 JPA,但需维护更多 SQL 代码。
适用场景: 需要精细控制 SQL 语句。 遗留数据库结构复杂,难以通过 JPA 自动映射。
Spring Data JDBC 类型:轻量级 ORM(Spring 生态)
特点: 简化 JDBC 操作,支持基本的 CRUD 和简单查询。 无 JPA 的复杂特性(如延迟加载、缓存),性能更高。 适合对 SQL 有一定控制需求但不想写纯 JDBC 代码的场景。
JOOQ(Java Object Oriented Querying) 类型:类型安全的 SQL 构建工具
特点: 通过代码生成器将数据库表结构映射为 Java 类。 支持编写类型安全的 SQL 查询,避免 SQL 注入。 适合喜欢 SQL 但希望有编译时检查的开发者。
纯 JDBC 类型:Java 数据库连接基础 API
特点: 直接通过 SQL 操作数据库,无任何 ORM 抽象。 灵活性最高,但需要手动处理资源(Connection、ResultSet)和异常。 适合简单应用或需要极致性能的场景。
NoSQL 数据库驱动 类型:非关系型数据库专用客户端
示例: MongoDB:使用 MongoTemplate 或 Spring Data MongoDB。 Redis:使用 Lettuce 或 Jedis 客户端。 Cassandra:使用 Spring Data Cassandra。
如何选择替代方案?
场景推荐方案理由需要快速开发简单 CRUDJPA(Hibernate)自动化程度高,减少样板代码复杂 SQL 或性能优化需求MyBatis 或 JOOQ直接控制 SQL,灵活优化轻量级应用,避免 JPA 复杂特性Spring Data JDBC简单易用,无延迟加载等复杂机制非关系型数据库(如 MongoDB)Spring Data MongoDB专为 NoSQL 设计,语法与 Spring Data JPA 相似遗留系统或高度定制 SQL纯 JDBC完全控制 SQL 执行流程总结 JPA 适合:快速开发、对象关系映射需求明确、团队熟悉 ORM 概念。 替代方案适用场景: 需要精细控制 SQL → MyBatis/JOOQ 轻量级 ORM → Spring Data JDBC 非关系型数据库 → 专用 NoSQL 客户端 极致性能或遗留系统 → 纯 JDBC
013-2订单支付超时自动取消订单(rocketmqjpa)由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“013-2订单支付超时自动取消订单(rocketmqjpa)”
上一篇
c语言笔记数组篇