基于Netty构建Websocket服务端
- 游戏开发
- 2025-07-21 19:02:45

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。 项目目录: 引入pom依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.69.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>编写SocketServer:
package com.lzq.websocket.config; import com.lzq.websocket.handlers.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Configuration; import java.util.concurrent.TimeUnit; @Slf4j @Configuration public class WebSocketConfig implements CommandLineRunner { private static final Integer PORT = 8888; @Override public void run(String... args) throws Exception { new WebSocketConfig().start(); } public void start() { // 创建EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec()); // 最大数据长度 pipeline.addLast(new HttpObjectAggregator(65536)); // 添加接收websocket请求的url匹配路径 pipeline.addLast(new WebSocketServerProtocolHandler("/websocket")); // 10秒内收不到消息强制断开连接 // pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS)); pipeline.addLast(new WebSocketHandler()); } }); ChannelFuture future = serverBootstrap.bind(PORT).sync(); log.info("websocket server started, port={}", PORT); // 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭 // 阻塞 future.channel().closeFuture().sync(); } catch (Exception e) { log.error("websocket server exception", e); throw new RuntimeException(e); } finally { log.info("websocket server close"); // 关闭EventLoopGroup bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }编写WebSocketHandler:
package com.lzq.websocket.handlers; import com.lzq.websocket.config.NettyConfig; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker webSocketServerHandshaker; private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 创建连接时执行 NettyConfig.group.add(ctx.channel()); log.info("client channel active, id={}", ctx.channel().id().toString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 关闭连接时执行 NettyConfig.group.remove(ctx.channel()); log.info("client channel disconnected, id={}", ctx.channel().id().toString()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 服务端接收客户端发送过来的数据结束之后调用 ctx.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt; log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri()); } } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { // 处理客户端http握手请求 handlerHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { // 处理websocket连接业务 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } /** * 处理websocket连接业务 * * @param ctx * @param frame */ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName()); // 判断是否是关闭websocket的指令 if (frame instanceof CloseWebSocketFrame) { webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain()); return; } // 判断是否是ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new RuntimeException("不支持消息类型:" + frame.getClass().getName()); } String text = ((TextWebSocketFrame) frame).text(); if ("ping".equals(text)) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } log.info("WebSocket message received: {}", text); /** * 可通过客户传输的text,设计处理策略: * 如:text={"type": "messageHandler", "userId": "111"} * 服务端根据type,采用策略模式,自行派发处理 * * 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型), * Handler已经是线程处理,每个用户的请求是线程隔离的 */ // 返回WebSocket响应 ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text)); /*// 群发 TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + " : " + text); NettyConfig.group.writeAndFlush(twsf);*/ } /** * 处理客户端http握手请求 * * @param ctx * @param request */ private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { log.info("handlerHttpRequest>>>>class={}", request.getClass().getName()); // 判断是否采用WebSocket协议 if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) { sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false); webSocketServerHandshaker = wsFactory.newHandshaker(request); if (webSocketServerHandshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { webSocketServerHandshaker.handshake(ctx.channel(), request); } } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) { if (response.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8); response.content().writeBytes(buf); buf.release(); } // 服务端向客户端发送数据 ChannelFuture f = ctx.channel().writeAndFlush(response); if (response.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 非正常断开时调用 log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause); ctx.close(); } }NettyConfig:
package com.lzq.websocket.config; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class NettyConfig { /** * 存储接入的客户端的channel对象 */ public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }使用Apifox测试:
基于Netty构建Websocket服务端由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“基于Netty构建Websocket服务端”