NIO详解(四):NIO编程
1. NIO类库简介
1.1 缓冲区Buffer
Buffer是一个对象,它包含了一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库和原来I/O的一个重要区别。在NIO库中,所有的数据都是用缓冲区处理的。在读取数据时,它是直接读取到缓冲区中的;在写入到缓冲区时。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
缓冲区实质上是一个数组。通常它是一个字节数据(ByteBuffer),也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区还提供了对数据的结构化访问以及维护读写位置(limit)等信息。
- ByteBuffer:字节缓冲区
- CharBuffer:字符缓冲区
- ShortBuffer:短整形缓冲区
- IntBuffer:整形缓冲区
- LongBuffer:长整形缓冲区
- FloatBuffer:浮点型缓冲区
- DoubleBuffer:双精度浮点型缓冲区
缓冲区的继承关系如下:
1.2 通道Channel
Channel是一个通道,它就像自来水管道一样,网络数据通过Channel读取和写入。通道与流不同之处在于通道它是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而通道可以用于读、写或者二者同时进行。因为Channel是全双工的,所以它可以比流更加映射底层操作系统地API。从类图中可以看出,实际上Channel可以分为两大类:用于网络读写的SelectabaleChannel和用于文件操作的FileChannel。ServerSocketChannel是一个可以监听新进来的TCP连接的通道,就像标准IO中的ServerSocket一样。
1.3 多路复用器Selector
Select会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel集合,进行后续的I/O操作。一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的轮询,就可以接入成千上万的客户端。
2. NIO服务端序列图
一。 打开ServerSocketChannel,用于监听客户端的连接。
ServerSocketChannel servChannel=ServerSocketChannel.open();二。绑定监听端口,设置连接为非阻塞状态。
servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024);三。创建Reactor线程,创建多路复用器并启动线程。
Selector selector = Selector.open();四。将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件
servChannel.register(selector, SelectionKey.OP_ACCEPT);五。多路复用器在线程run方法的无线循环体内轮询准备就绪的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路。
if (key.isAcceptable()) {// Accept the new connectionServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept(); }七。设置客户端链路为非阻塞模式
sc.configureBlocking(false);八。将接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,读取客户端发送的网络消息。
sc.register(selector, SelectionKey.OP_READ);九。异步读取客户端请求消息到缓冲区。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}十。对ByteBuffer进行编码解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }十一。将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端。
socketChannel.write(buffer).3. NIO客户端序列图
一。打开SocketChannel,绑定客户端本地地址。
SocketChannel clientChannel = SocketChannel.open();二。设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数。
socketChannel.configureBlocking(false); socketChannel.socket().setReuseAddress(true);三。异步连接服务器。判断是否连接成功,如果连接成功,则直接注册读取状态到多路复用器中,如果当前没有连接成功,则向Reactor的多路复用器注册OP_CONNECT状态为,监听服务器端的TCP ACK应答。
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答if (socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel);} elsesocketChannel.register(selector, SelectionKey.OP_CONNECT);四。创建Reactor线程,创建多路复用器并启动线程。
Selector selector = Selector.open();五。多路复用器在线程run方法的无线循环体内轮询准备就绪的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。接受connect事件处理。判断连接结果,如果连接成功,注册连接事件到多路复用器。注册读事件到多路复用器中。 ```java if (key.isConnectable()) {if (sc.finishConnect()) {sc.register(selector, SelectionKey.OP_READ);doWrite(sc);} elseSystem.exit(1);// 连接失败,进程退出}七。异步读取客户端请求消息到缓冲区。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}八。对ByteBuffer进行编码解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }九。将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端。
socketChannel.write(buffer).4. 总结
通过源码分析,我们发现NIO编程的难度确实比同步阻塞BIO的大很多,我们的NIO程序中还没有考虑“半包读”和“半包写”,如果加上这些,代码会更加复杂。使用NIO编程的优点如下:
- 客户端发起连接的操作是异步的,可以通过多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
- SocketChannel的读写操作是异步的,如果没有可读写的数据它不会等待,直接返回,这样I/O通信线程就可以处理其他链路,不需要同步等待这个链路可用。
- 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄的限制。
总结
以上是生活随笔为你收集整理的NIO详解(四):NIO编程的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: Netty详解(二)Linux 网络IO
- 下一篇: NIO详解(五):Buffer详解