主页 > 互联网  > 

Netty笔记3:NIO编程

Netty笔记3:NIO编程

Netty笔记1:线程模型

Netty笔记2:零拷贝

Netty笔记3:NIO编程

Netty笔记4:Epoll

Netty笔记5:Netty开发实例

Netty笔记6:Netty组件

Netty笔记7:ChannelPromise通知处理

Netty笔记8:ByteBuf使用介绍

Netty笔记9:粘包半包

Netty笔记10:LengthFieldBasedFrameDecoder很简单

Netty笔记11:编解码器

Netty笔记12:模拟Web服务器

Netty笔记13:序列化

文章目录 前言编程示例总结

前言

想要快速理解NIO编程,需要先理解上篇的零拷贝技术和线程模型,本篇是对这两个知识的实践,也是netty的过度。

编程示例

我们尝试写一个NIO程序:

需要注意的是:

进行网络传输时,涉及到的数据,必须要经过缓冲区,不管是发送还是接收,结合用户态和内核态的切换过程就可以明白;NIO中的缓冲可以使用堆内存缓存和直接内存缓冲,这个需要结合零拷贝技术可以理解;多路复用使用selector模式,需要循环遍历socket;

注:buf在堆上。在进行数据发送时,如果使用堆内存,在JVM之外创建一个DirectBuf,然后把堆上的数据拷贝的这个DirectBuf,再写到SendBuf中,因为JVM中存在GC机制,如果使用引用方式,在拷贝过程中出现GC,会重新分配地址,导致数据出现问题。

服务端:

public class ServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverSocketChannel; public ServerHandle(int port) { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); // channel必须处于非阻塞模式下,不然会报错,所以不能同FileChannel一起使用 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 注册对应的事件,这里注册的是accept事件,只要监听到就会调用对应的处理器 // 注册还可以添加附加对象,也就是第三个参数,获取方式:key.attachment(); // 如果注册了事件,需要取消,需要调用channel.cancel() serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务端已准备好:" + port); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { while (true) { try { // 阻塞直到通道就绪,这边设置了超时时间 // 返回值:有多少通道就绪 selector.select(1000); // 在通道就绪时,获取对应的键,也就是事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 根据事件的类型进行对应的处理 handlerInput(key); } } catch (Exception e) { throw new RuntimeException(e); } } } private void handlerInput(SelectionKey key) throws IOException { // 在这个循环中,可能存在key.cancel() 或者移除,所以这里需要判断是否有效 if (!key.isValid()) { return; } try { if (key.isAcceptable()) { // 这里处理客户端连接服务端的事件 ServerSocketChannel channel = (ServerSocketChannel)key.channel(); try { // 接收请求 SocketChannel sc = channel.accept(); System.out.println("-----建立连接------"); // 设置该通道非阻塞 sc.configureBlocking(false); // 并注册read事件,监听着 sc.register(selector, SelectionKey.OP_READ); } catch (IOException e) { System.out.println("连接客户端失败!"); key.cancel(); channel.close(); } } if (key.isReadable()) { SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = sc.read(buffer); if (read > 0) { // 反转:将这个缓冲中的数据从现在的位置变成从0开始 buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, StandardCharsets.UTF_8); System.out.println("服务器收到消息:" + msg); doWrite(sc, "hello,收到消息:" + msg); } } } catch (IOException e) { System.out.println("数据处理失败!"); // 再处理失败,或异常时要退出通道,不然每次循环都会检查通道导致一致报错 key.channel().close(); key.cancel(); } } private void doWrite(SocketChannel sc, String msg) throws IOException { byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); sc.write(buffer); } }

服务端启动:

public static void main(String[] args) { ServerHandle serverHandle = new ServerHandle(8080); new Thread(serverHandle).start(); }

客户端:

public class ClientHandle implements Runnable{ private String ip; private int port; private SocketChannel socketChannel; private Selector selector; public ClientHandle(String ip, int port) { try { this.ip = ip; this.port = port; selector = Selector.open(); socketChannel = SocketChannel.open(); // 非阻塞状态 socketChannel.configureBlocking(false); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { try { // 启动后,执行连接操作 if (socketChannel.connect(new InetSocketAddress(ip, port))) { // 连接服务端成功后,注册读取事件 socketChannel.register(selector, SelectionKey.OP_READ); } else { // 如果连接失败,则再注册连接事件,之后再进行处理 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } catch (IOException e) { throw new RuntimeException(e); } while (true) { try { // 阻塞1000秒直到有通道就绪 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); handleInput(key); } } catch (IOException e) { throw new RuntimeException(e); } } } public void sendMsg(String msg) throws IOException { byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); socketChannel.write(buffer); } private void handleInput(SelectionKey key) throws IOException { if (!key.isValid()) { return; } SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); } else { System.exit(1); } } if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); int read = sc.read(buffer); if (read > 0) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes, StandardCharsets.UTF_8); System.out.println("客户端收到消息:" + msg); } else if (read < 0) { key.cancel();; sc.close(); } } } }

客户端启动:

public static void main(String[] args) throws IOException { ClientHandle handle = new ClientHandle("localhost", 8080); new Thread(handle).start(); Scanner scanner = new Scanner(System.in); // 死循环保持监听 while (true) { // 每次控制台输入,就发送给服务端 handle.sendMsg(scanner.nextLine()); } }

总结

该示例对应于reactor单线程模型,服务端是一个单线程,通过selector单线程循环接收客户端的请求,并识别客户端请求事件类型,进行分发处理,相对于BIO很明显的区别,就是它不会等到上一个请求处理完成。在消息接收与发送的过程中,我们需要对缓冲数据进行处理,也就是对应于零拷贝知识点中提到的缓冲区概念,实例中用到了ByteBuffer对象,它是NIO中一个比较重要的对象,下一篇会进行说明。

标签:

Netty笔记3:NIO编程由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Netty笔记3:NIO编程