主页 > 软件开发  > 

直播弹幕系统(二)-整合RabbitMQ进行消息广播和异步处理

直播弹幕系统(二)-整合RabbitMQ进行消息广播和异步处理

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理 前言一. Socket服务整合RabbitMQ二. 弹幕服务创建2.1 创建一个公共maven项目2.2 弹幕服务项目创建2.2.1 创建队列和广播型交换机2.2.2 生产者发送最终弹幕数据2.2.3 消费者监听原始弹幕数据 2.3 Socket服务监听弹幕数据并返回前端2.3.1 配置类2.3.2 消费者 2.4 测试

前言

上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。

一. Socket服务整合RabbitMQ

我们页面上,通过WebSocket发送弹幕信息的时候,后端通过@OnMessage注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket服务它只负责消息的传递和WebSocket信息的维护,业务逻辑啥也不做。

1.添加pom依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2.配置文件bootstrap.yml,添加RabbitMQ相关配置

server: port: 81 spring: application: name: tv-service-socket cloud: nacos: discovery: server-addr: 你的Nacos地址:8848 rabbitmq: username: guest password: guest # 虚拟主机,默认是/ virtual-host: / # 超时时间 connection-timeout: 30000 listener: simple: # 消费模式,手动 acknowledge-mode: manual # 并发数 concurrency: 5 # 最大并发数 max-concurrency: 10 # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。 # 因此数值越大,内存占用越大,还需要考虑消费的速度 prefetch: 10 addresses: 你的RabbitMQ地址:5672

3.RabbitMQ配置类:

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Zong0915 * @date 2022/12/15 下午1:29 */ @Configuration public class RabbitMQConfig { @Bean public Queue initDirectQueue() { return new Queue("originBullet-queue", true); } @Bean DirectExchange initDirectExchange() { return new DirectExchange("bulletPreProcessor-exchange", true, false); } @Bean Binding initBindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage"); } }

4.写一个简单的消息体OriginMessage,发送到MQ的:

import lombok.Data; /** * @author Zong0915 * @date 2022/12/15 下午1:30 */ @Data public class OriginMessage { private String sessionId; private String userId; private String roomId; private String message; }

5.MQ生产者OriginMessageSender:

/** * @author Zong0915 * @date 2022/12/15 下午1:29 */ @Component public class OriginMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(OriginMessage originMessage) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString());// 唯一ID Map<String, Object> map = new HashMap<>(); map.put("message", JSONObject.toJSONString(originMessage)); // 发送给消息预处理队列 rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称 "bullet.originMessage",// 路由Key map, correlationData); } }

6.我们再对WebSocket的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API即可。只需要注意一下多例下属性的注入方式是怎么写的即可。

import kz.cache.SocketCache; import kz.entity.OriginMessage; import kz.producer.OriginMessageSender; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.atomic.AtomicLong; import lombok.Getter; /** * @author Zong0915 * @date 2022/12/9 下午3:45 */ @Component @ServerEndpoint("/websocket/live/{roomId}/{userId}") @Slf4j @Getter public class BulletScreenServer { /** * 多例模式下的赋值方式 */ private static OriginMessageSender originMessageSender; /** * 多例模式下的赋值方式 */ @Autowired private void setOriginMessageSender(OriginMessageSender originMessageSender) { BulletScreenServer.originMessageSender = originMessageSender; } private static final AtomicLong count = new AtomicLong(0); private Session session; private String sessionId; private String userId; private String roomId; /** * 打开连接 * @param session * @OnOpen 连接成功后会自动调用该方法 */ @OnOpen public void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) { // 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用try count.incrementAndGet(); log.info("*************WebSocket连接次数: {} *************", count.longValue()); this.userId = userId; this.roomId = roomId; // 保存session相关信息到本地 this.sessionId = session.getId(); this.session = session; SocketCache.put(sessionId, this); } /** * 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接 */ @OnClose public void closeConnection() { SocketCache.remove(sessionId); } /** * 客户端发送消息给服务端 * @param message */ @OnMessage public void onMessage(String message) { if (StringUtils.isBlank(message)) { return; } // 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的 originMessageSender.send(buildMessage(message)); } private OriginMessage buildMessage(String message) { OriginMessage originMessage = new OriginMessage(); originMessage.setMessage(message); originMessage.setRoomId(roomId); originMessage.setSessionId(sessionId); originMessage.setUserId(userId); return originMessage; } }

备注:记得将另一个Socket项目也改造成同样的代码。

二. 弹幕服务创建 2.1 创建一个公共maven项目

我们创建一个maven项目:service-bulletcommon。先看下最终的项目架构:

1.pom依赖添加一些常用的工具:

<groupId>bullet-service</groupId> <artifactId>service-bulletcommon</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache mons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.79</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> </dependencies>

2.创建一个常量定义类SocketConstants:

/** * @author Zong0915 * @date 2022/12/15 下午3:59 */ public class SocketConstants { /** * 这条消息是否处理过 */ public static final String CORRELATION_SET_PRE = "Correlation_Set_"; /** * 同一个房间里面有哪些SessionID */ public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_"; public static final String MESSAGE = "message"; public static final String ID = "id"; /** * 原始消息所在队列 */ public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue"; /** * 广播队列A */ public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA"; /** * 广播队列B */ public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB"; /** * 弹幕预处理交换机 */ public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange"; /** * 弹幕广播交换机 */ public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange"; /** * 弹幕预处理路由Key */ public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage"; }

3.创建一个消息传输体OriginMessage:

import lombok.Data; /** * @author Zong0915 * @date 2022/12/15 下午2:07 */ @Data public class OriginMessage { private String sessionId; private String userId; private String roomId; private String message; } 2.2 弹幕服务项目创建

1.我们创建一个maven项目:service-bulletscreen。先看下最终的项目架构:

1.pom文件:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2.2.1.RELEASE</version> <exclusions> <exclusion> <artifactId>archaius-core</artifactId> <groupId>com.netflix.archaius</groupId> </exclusion> <exclusion> <artifactId>commons-io</artifactId> <groupId>commons-io</groupId> </exclusion> <exclusion> <artifactId>commons-lang3</artifactId> <groupId>org.apache mons</groupId> </exclusion> <exclusion> <artifactId>fastjson</artifactId> <groupId>com.alibaba</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>httpclient</artifactId> <groupId>org.apache.httpcomponents</groupId> </exclusion> <exclusion> <artifactId>servo-core</artifactId> <groupId>com.netflix.servo</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.6.7</version> <exclusions> <exclusion> <artifactId>log4j-api</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>bullet-service</groupId> <artifactId>service-bulletcommon</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>

2.application.properties:

spring.application.name=tv-service-bulletscreen spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848

3.bootstrap.yml文件:

server: port: 83 spring: application: name: tv-service-bulletscreen redis: database: 0 # Redis数据库索引(默认为0) host: 你的Redis地址 # Redis的服务地址 port: 6379 # Redis的服务端口 password: 密码 jedis: pool: max-active: 8 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 8 # 连接池中的最大空闲连接 min-idle: 0 # 连接池中的最小空闲链接 timeout: 30000 # 连接池的超时时间(毫秒) cloud: nacos: discovery: server-addr: 你的Nacos地址:8848 rabbitmq: username: guest password: guest # 虚拟主机,默认是/ virtual-host: / # 超时时间 connection-timeout: 30000 listener: simple: # 消费模式,手动 acknowledge-mode: manual # 并发数 concurrency: 5 # 最大并发数 max-concurrency: 10 # 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。 # 因此数值越大,内存占用越大,还需要考虑消费的速度 prefetch: 10 addresses: 你的RabbitMQ地址:5672

4.Redis配置类RedisConfig:

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { /** * 实例化 RedisTemplate 对象 * * @return */ @Bean public RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); initDomainRedisTemplate(redisTemplate, redisConnectionFactory); return redisTemplate; } /** * 设置数据存入 redis 的序列化方式,并开启事务 * * @param redisTemplate * @param factory */ private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) { //如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String! redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // 开启事务 redisTemplate.setEnableTransactionSupport(true); redisTemplate.setConnectionFactory(factory); } @Bean @ConditionalOnMissingBean(StringRedisTemplate.class) public StringRedisTemplate stringRedisTemplate( RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } } 2.2.1 创建队列和广播型交换机

创建一个广播模式的交换机bulletFanOut-exchange:其实用direct也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。

分别为两个Socket服务创建个队列,用来接收处理好的消息(练习下广播模式):

bulletSocket-queueAbulletSocket-queueB

再分别为他们和上述创建好的交换机进行绑定。

我们的弹幕服务主要做两件事:

监听预处理队列,数据来自:originBullet-queue。将处理完的消息通过广播,发送给bulletSocket-queueA/B两个队列。

RabbitMQ配置类如下:

import kz mon.SocketConstants; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Zong0915 * @date 2022/12/15 下午1:29 */ @Configuration public class RabbitMQConfig { @Bean public Queue initDirectQueue() { return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true); } @Bean public Queue initFanoutSocketQueueA() { return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true); } @Bean public Queue initFanoutSocketQueueB() { return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true); } @Bean DirectExchange initDirectExchange() { return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false); } @Bean("fanoutExchange") FanoutExchange initFanoutExchange() { return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false); } @Bean Binding initBindingDirect() { return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY); } @Bean Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange); } @Bean Binding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange); } } 2.2.2 生产者发送最终弹幕数据

创建FanoutMessageProducer类:记得向我们上面绑定的广播交换机发送数据。

import com.alibaba.fastjson.JSONObject; import kz.entity.OriginMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @author Zong0915 * @date 2022/12/15 下午2:51 */ @Component public class FanoutMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(OriginMessage originMessage) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(UUID.randomUUID().toString());// 唯一ID Map<String, Object> map = new HashMap<>(); map.put("message", JSONObject.toJSONString(originMessage)); rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称 "",// 路由Key map, correlationData); } } 2.2.3 消费者监听原始弹幕数据

创建OriginMessageConsumer类:

import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import kz.service.BulletScreenService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** - @author Zong0915 - @date 2022/12/15 下午1:57 */ @Component @Slf4j public class OriginMessageConsumer { @Autowired private BulletScreenService bulletScreenService; /** * 处理原始消息 * * @param testMessage Map类型的消息体 * @param headers 消息头 * @param channel 消息所在的管道 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "originBullet-queue", durable = "true"), // 默认的交换机类型就是direct exchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"), key = "bullet.originMessage" ) ) @RabbitHandler public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException { log.info("***********消费开始*************"); log.info("消费体:{}", JSONObject.toJSONString(testMessage)); bulletScreenService.processMessage(testMessage, headers, channel); } }

2.创建BulletScreenService类用于原始弹幕的业务处理,主要考虑的几个点:

消息的合法性校验。消息的幂等性保证,这里用了Redis做个存储。将原始数据处理完后,在丢给MQ进行广播。 import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import kz mon.SocketConstants; import kz.entity.OriginMessage; import kz.producer.FanoutMessageProducer; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.StringUtils; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author Zong0915 * @date 2022/12/9 下午3:45 */ @Service @Slf4j public class BulletScreenService { @Autowired private StringRedisTemplate redisTemplate; @Autowired private FanoutMessageProducer fanoutMessageProducer; @Async public void processMessage(Map testMessage, Map<String, Object> headers, Channel channel) throws IOException { OriginMessage originMessage = getOriginMessage(testMessage); // 合法性校验 if (!validMessage(testMessage, headers, originMessage)) { return; } // 处理消息 log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage()); String correlationId = headers.get(SocketConstants.ID).toString(); // 存入Redis并设置过期时间1天 redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId); redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS); // 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装 // 本案例就不做处理了,原样返回 fanoutMessageProducer.send(originMessage); // 确认消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } public OriginMessage getOriginMessage(Map testMessage) { String messageJson = (String) testMessage.get(SocketConstants.MESSAGE); if (StringUtils.isBlank(messageJson)) { return null; } OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class); return originMessage; } /** * 对消息进行合法性校验 */ public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) { // 判空 if (testMessage == null || testMessage.size() == 0 || originMessage == null) { return false; } if (headers == null || headers.size() == 0) { return false; } // 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉 UUID correlationId = (UUID) headers.get(SocketConstants.ID); Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString()); return !Optional.ofNullable(exist).orElse(false); } }

最后就是启动类BulletScreenApplication:

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.scheduling.annotation.EnableAsync; /** * @author Zong0915 * @date 2022/12/10 下午9:44 */ @SpringBootApplication @EnableDiscoveryClient @EnableAsync public class BulletScreenApplication { public static void main(String[] args) { SpringApplication.run(BulletScreenApplication.class, args); } } 2.3 Socket服务监听弹幕数据并返回前端

记得在pom依赖中引入上面的公共包:

<dependency> <groupId>bullet-service</groupId> <artifactId>service-bulletcommon</artifactId> <version>1.0-SNAPSHOT</version> </dependency> 2.3.1 配置类

RabbitMQ配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA

@Bean public Queue initFanoutSocketQueueA() { return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true); } @Bean("fanoutExchange") FanoutExchange initFanoutExchange() { return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false); } @Bean Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange); }

另一个Socket项目,添加以下配置:绑定bulletSocket-queueB

@Bean public Queue initFanoutSocketQueueB() { return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true); } @Bean("fanoutExchange") FanoutExchange initFanoutExchange() { return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false); } @Bean Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange); }

再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:

public class SocketCache { public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) { ArrayList<BulletScreenServer> res = new ArrayList<>(); if (StringUtils.isBlank(roomId)) { return res; } for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) { ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue(); if (map == null || map.size() == 0) { continue; } for (BulletScreenServer server : map.values()) { if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) { res.add(server); } } } return res; } } 2.3.2 消费者

重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer类:

import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import kz.cache.SocketCache; import kz mon.SocketConstants; import kz.entity.OriginMessage; import kz.service.BulletScreenServer; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.StringUtils; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; /** * @author Zong0915 * @date 2022/12/15 下午1:57 */ @Component @Slf4j public class FanOutMessageConsumer { /** * 处理弹幕消息,开始广播 * * @param testMessage Map类型的消息体 * @param headers 消息头 * @param channel 消息所在的管道 */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "bulletSocket-queueA", durable = "true"), // 默认的交换机类型就是direct exchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout") ) ) @RabbitHandler public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException { log.info("***********消费开始, Socket服务A接收到广播消息*************"); log.info("消费体:{}", JSONObject.toJSONString(testMessage)); OriginMessage originMessage = getOriginMessage(testMessage); if (originMessage == null) { return; } // 根据roomID去找到同一个直播间下的所有用户并广播消息 List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId()); for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) { bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage)); } // 确认消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } public OriginMessage getOriginMessage(Map testMessage) { String messageJson = (String) testMessage.get(SocketConstants.MESSAGE); if (StringUtils.isBlank(messageJson)) { return null; } OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class); return originMessage; } }

另一个Socket服务则改一下消费者的监听队列和日志内容即可:

2.4 测试

打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上: 此时Socket服务A:

Socket服务B: 页面A中随便发送一条弹幕: 页面B中随便发送一条弹幕:

1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。 2.service-bulletscreen服务,监听到预处理队列数据,开始进行处理。

3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列: 4.Socket服务B接收到消息:

Socket服务A接收到广播消息:

5.前端页面展示:

页面A:

页面B:

到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。

标签:

直播弹幕系统(二)-整合RabbitMQ进行消息广播和异步处理由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“直播弹幕系统(二)-整合RabbitMQ进行消息广播和异步处理