欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

高性能IO -Reactor模式的实现

发布时间:2025/7/25 编程问答 44 豆豆
生活随笔 收集整理的这篇文章主要介绍了 高性能IO -Reactor模式的实现 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

在了解Reactor模式之前, 首先了解什么是NIO.

java.nio全称java non-blocking IO 即非阻塞IO.这个地方要明白,非阻塞不等于异步。

非阻塞:当一个线上读取数据,没有数据时该线程可以干其他的事情。就是调用了之后立马返回。

异步IO: 在一个IO操作中,用户态的线程完全不用考虑数据的读取过程,都交给操作系统完成,完成之后通知用户线程即可。这才是真正的异步操作。

同步IO  每个请求必须逐个的被处理,一个流程的处理会导致整个流程的暂时等待。

阻塞:  某个请求发出后,该请求操作需要的条件不满足,请求会一直阻塞,不会返回,直到条件满足。

 

 其中java NIO 中的 Select 在Linux中基于epoll实现。基于IO多路复用。就是一个线程来管理多个IO.

epoll全称eventpoll 是linux内核针对IO多路复用的实现。在linux中,和epoll类似的由select和poll。

其中epoll监听的fd集合是一直在内核存在的,有三个系统调用:epoll_create epoll_wait epoll_ctl 通过epoll_wait可以多次监听同一个fd结合,只返回可读写的那部分。

select只有一个系统调用,就是每次都需要将要监听的所有集合都传给操作系统,当有事件发生时。操作系统在返回给你整个集合。

 

NIO核心包含三个部分: Channels Buffers Selectors.

Channel: 在NIO中,所有的IO过程都是从建立一个Channel开始的,数据可以从channel中读取到Buffer中 也可以从Buffer中写入到channel中。channel就好像BIO中的流。但是channel时双向的,我感觉这样更贴近于现实,毕竟TCP连接是全双工的。

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

channel分为这四种,分别对应着文件,UDP TCP网络IO.

Buffer buffer即为缓冲区,也就是数据块。

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

java基础数据类型中除了boolean都有对应的buffer实现。

Selector (选择器): 他是NIO中的关键所在,我们在程序中可以通过它来实现一个线程同时处理多个Channel 也就是多个连接。

如上图,一个Selectot监听五个通道,在使用时首先需要将通道以及对应感兴趣的事件(Accept   read  writer等 )注册到Selector上 。当发生对应的事件时,操作系统回通知我们的程序。在Selector中可以读取到对应的Channel 根据事件类型做出相应的操作。

零拷贝

java NIO中提供的FileChannel拥有transferTo和transferFrom两个方法,可以直接把FileChannel中的数据拷贝到另一个Channel,或者把另一个Channel中的数据拷贝到FileChannel .在操作系统的支持下,通过这个方法传输数据不需要将原数据从内核态拷贝到用户态,再从用户态拷贝到内核态。

 

Reactor实现一个简单的Echo服务器  基于单个线程同时处理多个连接。这样一个Selector同时完成Accept  Read Write事件的监听,同时业逻辑也和Selector在同一个线程中执行。这里可以优化一下将业务逻辑在新的线程中执行。

public class EchoService {private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector, SelectionKey.OP_ACCEPT);while (true){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iteratorKey=keys.iterator();while (iteratorKey.hasNext()){SelectionKey key=iteratorKey.next();if (key.isAcceptable()){ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel();SocketChannel socketChannel=serverChannel.accept();socketChannel.configureBlocking(false).register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024));}if (key.isReadable()){SocketChannel sc= (SocketChannel) key.channel();ByteBuffer buffer= (ByteBuffer) key.attachment();buffer.clear();int readCount= sc.read(buffer);if (readCount<0){iteratorKey.remove();continue;}buffer.flip();sc.write(buffer);System.out.print(new String(buffer.array(),0,readCount));}iteratorKey.remove();}}} catch (Exception e) {e.printStackTrace();}finally {System.out.println("exit");}} }

现在计算机的核数越来越多,仅仅用一个核心来处理IO连接有点让费系统资源,因此我们可以多见几个Reactor  .其中住Reactor负责TCP的连接(Accept),连接之后分配到子Reactor来处理IO的读写事件。

并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作自始至终旨在一个线程处理。这样保证了同一个请求的所有状态和上下文在同一个线程中,方便监控请求相应状态。

具体代码实现 EchoService为例:

https://github.com/WJ1020/reactor

public class EchoService {private static final Logger logger= LoggerFactory.getLogger(EchoService.class);private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){logger.info("echo service start......");try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector,SelectionKey.OP_ACCEPT);int coreNum = Runtime.getRuntime().availableProcessors();Processor[] processors = new Processor[coreNum];for (int i = 0; i < processors.length; i++) {logger.info("creat processor :{}",i+1);processors[i] = new Processor();}int index=0;while (Status.running){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey selectionKey=iterator.next();iterator.remove();if (selectionKey.isAcceptable()){ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=currServerSocketChannel.accept();socketChannel.configureBlocking(false);logger.info("Accept request from {}",socketChannel.getRemoteAddress());Processor processor=processors[(++index)%coreNum];processor.addChannel(socketChannel);}}}} catch (IOException e) {logger.error("io exception {}",e.getMessage());}}} public class Processor {private static final Logger logger= LoggerFactory.getLogger(Processor.class);private static final ExecutorService service=Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());private final Selector selector;private volatile boolean running=true;public Processor() throws IOException {this.selector= SelectorProvider.provider().openSelector();}public void addChannel(SocketChannel socketChannel){try {socketChannel.register(this.selector, SelectionKey.OP_READ);if (running){running=false;start();}wakeup();} catch (ClosedChannelException e) {logger.error("register channel error :{}",e.getMessage());}}private void wakeup(){this.selector.wakeup();}private void start(){service.submit(new ProcessorTask(selector));} } public class ProcessorTask implements Runnable {private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class);private Selector selector;ProcessorTask(Selector selector) {this.selector = selector;}@Overridepublic void run() {logger.info("{}\tsub reactor start listener",Thread.currentThread().getName());while (Status.running){try {selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey key=iterator.next();iterator.remove();if (key.isReadable()){ByteBuffer buffer= ByteBuffer.allocate(1024);SocketChannel socketChannel= (SocketChannel) key.channel();int count=socketChannel.read(buffer);if (count<0){socketChannel.close();key.cancel();logger.info("{}\t Read ended",socketChannel);}else if (count==0){logger.info("{}\t Message size is 0",socketChannel);}else {buffer.flip();socketChannel.write(buffer);logger.info("{}\t Read message{}",socketChannel,new String(buffer.array()));}}}} catch (IOException e) {logger.error("select error :{}",e.getMessage());}}} }

在EchoService中 ,主Reactor接受到新的连接后,将channel注册到subReactor的Selector中。每个子Reactor都有一个自己的Selector对象,并有独立的一个线程处理。

转载于:https://my.oschina.net/wang520/blog/3036562

总结

以上是生活随笔为你收集整理的高性能IO -Reactor模式的实现的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。