主页 > 其他  > 

将RocketMQ集成到了SpringBoot项目中,实现站内信功能

将RocketMQ集成到了SpringBoot项目中,实现站内信功能
1. 添加依赖

首先,在pom.xml中添加RocketMQ的依赖:

<dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Starter Data JPA --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- H2 Database (或你选择的其他数据库) --> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <!-- Redis Cache --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency> </dependencies> 2. 配置文件

在application.properties中配置RocketMQ的相关信息:

# Redis Configuration spring.redis.host=localhost spring.redis.port=6379 # RocketMQ Configuration rocketmq.name-server=localhost:9876 rocketmq.producer.group=my-producer-group 3. 数据模型

定义Message实体类:

package com.example.inbox.model; import javax.persistence.*; import java.time.LocalDateTime; @Entity public class Message { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String senderId; private String receiverId; private String subject; private String body; private LocalDateTime timestamp; private boolean read; // Getters and Setters } 4. Repository接口

创建MessageRepository接口:

package com.example.inbox.repository; import com.example.inbox.model.Message; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface MessageRepository extends JpaRepository<Message, Long> { } 5. Service层

在Service层中集成缓存和RocketMQ的消息发送与接收:

package com.example.inbox.service; import com.example.inbox.model.Message; import com.example.inbox.repository.MessageRepository; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import java.util.List; @Service public class MessageService { @Autowired private MessageRepository messageRepository; @Autowired private RocketMQTemplate rocketMQTemplate; @Cacheable(value = "messages", key = "#receiverId") public List<Message> getMessagesByReceiver(String receiverId) { return messageRepository.findByReceiverId(receiverId); } @CacheEvict(value = "messages", key = "#message.receiverId") public void sendMessage(Message message) { rocketMQTemplate.convertAndSend("messageTopic", message); } } 6. 消息监听器

创建一个监听器来处理来自RocketMQ的消息:

package com.example.inbox.listener; import com.alibaba.fastjson.JSON; import com.example.inbox.model.Message; import com.example.inbox.service.MessageService; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "messageTopic", consumerGroup = "my-consumer-group") public class MessageListener implements RocketMQListener<String> { @Autowired private MessageService messageService; @Override public void onMessage(String messageJson) { Message message = JSON.parseObject(messageJson, Message.class); message.setTimestamp(java.time.LocalDateTime.now()); messageService.saveMessage(message); } } 7. Controller

创建Controller来处理HTTP请求:

package com.example.inbox.controller; import com.example.inbox.model.Message; import com.example.inbox.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.List; @RestController @RequestMapping("/messages") public class MessageController { @Autowired private MessageService messageService; @GetMapping("/{receiverId}") public List<Message> getMessages(@PathVariable String receiverId) { return messageService.getMessagesByReceiver(receiverId); } @PostMapping public void sendMessage(@RequestBody Message message) { messageService.sendMessage(message); } } 8. 启动类

确保你的Spring Boot应用启动类包含必要的注解:

package com.example.inbox; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; @SpringBootApplication @EnableCaching public class InboxApplication { public static void main(String[] args) { SpringApplication.run(InboxApplication.class, args); } } 总结

通过以上步骤,我们成功地将RocketMQ集成到了Spring Boot项目中,实现了站内信系统的异步处理。主要步骤包括:

添加RocketMQ依赖:在pom.xml中添加RocketMQ相关的依赖。配置RocketMQ:在application.properties中配置RocketMQ的相关参数。数据模型和Repository:定义实体类和Repository接口。Service层:在Service层中集成RocketMQ的消息发送和Redis缓存。消息监听器:使用@RocketMQMessageListener注解创建消息监听器,处理接收到的消息。Controller:创建RESTful API来处理HTTP请求。
标签:

将RocketMQ集成到了SpringBoot项目中,实现站内信功能由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“将RocketMQ集成到了SpringBoot项目中,实现站内信功能