Bio->Nio->Selector->Epoll->Netty
c10k问题
2000年左右提出的,BIO模型下的10K个socket处理客户端和服务端数据传输慢的问题。
单线程模拟10k个客户端
服务端和客户端通信,在内核里会有两个socket,一是服务器内核listen客户端的socket,二是客户端进来后相互之间通信的socket(先netstat -natp 看java的pid,然后lsof -p pid 就可以看到了)
BIO模型
package io.bio;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket;/*** bio 慢的原因:* 1.accept阻塞, 后 new thread 那一步会发生系统调用 clone,这里用户态系统太切换* 2.客户端连进来后的io流也是阻塞的。* Author: ljf* CreatedAt: 2021/3/31 下午1:38*/ public class SocketBIO {public static void main(String[] args) {try {ServerSocket server = new ServerSocket(9090, 5);System.out.println("step1: new ServerSocket(9090,5)");while (true) {Socket client = server.accept();System.out.println("step2:client \t" + client.getPort());new Thread(new Runnable() {@Overridepublic void run() {InputStream inputStream = null;try {inputStream = client.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String s = reader.readLine();while (true) {if (s != null) {System.out.println(s);} else {inputStream.close();break;}}} catch (IOException e) {e.printStackTrace();}}}).start();}} catch (IOException e) {e.printStackTrace();}} }阻塞发生在服务端accept客户端和服务端等待客户端数据(内核RECV(6)两处。所以慢。
NIO模型
package io.nio;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List;/*** nio 的n 有两个意思:* 1.是accept(5 的时候 是 NON_BLOCKINg,和 RECV(6 的时候非阻塞* 2.java 的new io ,即新io的意思* <p>* Author: ljf* CreatedAt: 2021/3/31 下午3:16*/ public class SocketNIO {public static void main(String[] args) {List<SocketChannel> clients = new LinkedList<>();try {ServerSocketChannel ss = ServerSocketChannel.open();// 服务端开启监听,接收客户端ss.bind(new InetSocketAddress(9090)); // 绑定本地的9090端口ss.configureBlocking(false); // 重点,这里是用非阻塞的方式接收客户端while (true) {// 接收客户端连接 // Thread.sleep(1000);// accept 调用了内核的accept,没有客户端连进来返回值,在BIO的时候一直卡着,NIO不看着,返回-1,java 返回null// 有客户端连进来,accept 返回这个客户端的FD5,client Object// NONBLOCKING 就是代码能往下走了,但是往下走的情况要根据客户端是否连进来有不同SocketChannel client = ss.accept();if (client == null) {System.out.println("null ...");} else {client.configureBlocking(false); // 重点,socket(服务端的listen// socket<连接请求三次握手后,往这里扔,我去通过accept得到连接的socket>,连接socket<往后的数据读写使用的>)int port = client.socket().getPort();System.out.println("client port : " + port);clients.add(client);}ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 这种是直接在服务器内存分配,还有一种是allocate(int capacity) // (这种是在jvm里分配,即堆内存)第一种会分配比较慢,第二种是发生系统内存到堆内存的复制,也不一定快,具体看运行情况// 遍历已经连进来的客户端能不能读写数据for (SocketChannel s : clients) {int num = s.read(byteBuffer); // >0 -1 0 不会阻塞if (num > 0) {byteBuffer.flip();byte[] bytes = new byte[byteBuffer.limit()];byteBuffer.get(bytes);String b = new String(bytes);System.out.println(client.socket().getPort() + " : " + b);byteBuffer.clear();}}}} catch (IOException e) {e.printStackTrace();}} }道理在代码注释里有了,目前这个代码写法有一个毛病是clients随着死循环的增多,遍历会慢,所以会越跑越慢,最后要么报文件描述符不够用错误。
同步异步阻塞非阻塞
同步异步是相对于IO流来说的,阻塞非阻塞是相对于线程是否等待来说的
linux没有异步模型,netty也只是优化了的同步模型,win有iocp异步模型。
内核怎么处理IO中断的
要聊IO模型,从BIO到NIO,再到select、poll,最后到epoll,需要了解内核怎么处理IO中断。
不管软中断IO还是硬中断IO(我也不知道什么是软中断IO和硬中断IO,但是不影响接下来要说的)。发生中断了,一定是发生了系统调用 int 中断号 回调(比如80中断就是 int 80 callback) 这种组合。假设IO的中断号是80(具体是什么我也知道,反正不影响接下来说的,不理它)
BIO的模型是 int 80 RECV(5,callback是RECV阻塞,所以BIO就阻塞等待IO流。
NIO 的模型是 int 80 在内核态和用户态之间切换循环每个FDS,有数据了就交给应用程序处理,继续循环遍历所有连接的FDS。
SELECT/POLL的模型是 int 80 内核态遍历循环里传入的FDS,这些FDS有数据了就交给应用程序处理,继续循环,select 和 poll 的区别是select 有文件描述符个数的限制(普通用户默认是1024,root用户可能是2048),poll没有文件描述符个数限制。
所以select/poll比NIO快是因为省去了遍历每个FDS时的用户态和内核态的切换
EPOLL的模型是 int 80 内核遍历所有FDS,把有数据的FDS放到一个集合里,应用程序直接从这个集合里取数据。
java的Epoll
java 在NIO用selector/poll还是epoll是可以在启动参数上配置
epoll:-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
selector/poll:-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider
单线程的读,可读了直接又写回去给客户端
package io.selector;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;/*** Author: ljf* CreatedAt: 2021/4/1 下午3:14*/ public class SocketMultiplexingSingleThread {public static void main(String[] args) {SocketMultiplexingSingleThread service = new SocketMultiplexingSingleThread();service.start();}private ServerSocketChannel server = null;private Selector selector = null;int port = 9090;public void start() {initServer();System.out.println("服务器启动了。。。");Set<SelectionKey> keys = selector.keys();System.out.println("keys size : " + keys.size());while (true) {try {/*** selector.select(timeUnit time):* 1.select/poll 其实内核调用 select(fd4) poll(fd4)* 2.epoll 其实内核调用 epoll_wait()* time 如果不传或者0,阻塞,如果设置,就是设置了一个阻塞时间** 其实可以调用 selector.wakeup(),因为调用的时候还没有有数据的fd,容易返回0,** 懒加载:其实在触碰到selector.select() 调用的时候出发了 epoll_ctl 的系统调用*/while ((selector.select(500) > 0)) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {/*** 无论是啥多路复用,key 只能返回状态,所以应用程序还得一个一个的取处理R/W,即同步*/SelectionKey key = iterator.next();/*** 如果是客户端连进来,语义上,accept 接收连接且返回新连接的FD,那这个FD怎么处理?* 1.select、poll,因为他们内核没有空间,在jvm保存和前边的fd4哪个listen的一起* 2.epoll,我们希望通过epoll_ctl把新的客户端FD注册到内核空间*/if (key.isAcceptable()) {acceptHandler(key);/*** 连接的R、W的处理,在当前线程,这个方法可能会阻塞,所以提出了 IO THREADS,比如 redis,tomcat8,9的异步处理*/} else if (key.isReadable()) {readHandler(key);}}}} catch (IOException e) {e.printStackTrace();}}}private void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;while (true) {try {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();try {SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buf = ByteBuffer.allocateDirect(8192);client.register(selector, SelectionKey.OP_READ, buf);System.out.println("--------------------------------------");System.out.println("新客户端: " + client.getRemoteAddress());System.out.println("--------------------------------------");} catch (IOException e) {e.printStackTrace();}}private void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));// 如果在epoll模型下,open()-》 epoll_create() -> fd3// 优先选择epoll,select poll 可以通过 -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider 参数修正selector = Selector.open();// 约等于 listen状态的 fd4// register:// 如果是select、poll,jvm里开辟一个数组,把fd4放进去// 如果是epoll,调用 epoll_ctl(fd3,add,fd4,EPOOLINserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}} }单线程的读写
a、isReadable关注了客户端连进来,有客户端连进来注册一个写到多路复用器里。
b、isWriteable关注了send queue里是否为空,说明时候写决定与应用程序
多线程的读写
单线程明显的瓶颈是如果有一个客户端要读写10年,那别的客户端就拜拜了,所以主线程只关注连接,读写交给新的线程。
package io.selector;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;/*** Author: ljf* CreatedAt: 2021/4/3 上午9:24*/ public class SocketMultiplexingSingleThread2 {private ServerSocketChannel server = null;private Selector selector = null;int port = 9090;public static void main(String[] args) {SocketMultiplexingSingleThread2 socketMultiplexingSingleThread2 = new SocketMultiplexingSingleThread2();socketMultiplexingSingleThread2.start();}private void start() {initServer();System.out.println("服务器启动了。。。");try {while (true) {while (selector.select(50) > 0) {Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {readHandler(key);} else if (key.isWritable()) {writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(() -> {System.out.println("write handler ...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {client.write(buffer);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}).start();}private void readHandler(SelectionKey key) {new Thread(() -> {System.out.println("read handler ...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;while (true) {try {read = client.read(buffer);if (read > 0) {client.register(key.selector(), SelectionKey.OP_WRITE, buffer);System.out.println(Thread.currentThread().getName() + " " + read);} else if (read == 0) {break;} else if (read < 0) {client.close();break;}} catch (IOException e) {e.printStackTrace();}}}).start();}private void acceptHandler(SelectionKey key) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();try {SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocateDirect(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("---------------------------------");System.out.println("新客户端: " + client.getRemoteAddress());System.out.println("---------------------------------");} catch (IOException e) {e.printStackTrace();}}private void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}}按道理读写的线程应该池化管理,不过这不重要,实际上没人会直接用java的epoll。
很明显,jdk的epoll只做到接收连接和读写这两个动作的异步,哪个复用器可读写了需要程序自己死循环去跟踪。而我们想要的是哪个复用器可读写交给内核,让内核产生系统调用自动读写,我们只需要关注怎么读写就行了,把读写的具体实现接上系统的读写调用就行。即响应式,即netty。
netty
引入netty依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.9.Final</version></dependency>模拟netty的客户端
@Testpublic void clientMode() throws Exception {// 相当于 java 的 selectorNioEventLoopGroup thread = new NioEventLoopGroup(1);// 相当于 java 的SocketChannelNioSocketChannel client = new NioSocketChannel();ChannelFuture register = thread.register(client);// 注册一个IO的handlerChannelPipeline p = client.pipeline();// 注册一个事件,netty会监听这个事件即时给出动作p.addLast(new MyInHandler());// 连接服务器ChannelFuture connect = client.connect(new InetSocketAddress("192.168.172.3", 9090));// 因为连接是异步的,所以要等待同步连接,否则连接之后的动作才能继续ChannelFuture sync = connect.sync();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);// 因为发送是异步的,所以要同步等待发过去send.sync();// 连接要阻塞住,永远不从服务器断开sync.channel().closeFuture().sync(); } /*** ChannelInboundHandlerAdapter 只是 ChannelHandler 的一个抽象的实现,并非 adapter模式* 如果没有这个实现,要 MyInHandler 要实现接口里的所有方法,很多是没用的**/ class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// buf有get、set和read、write两组字节的操作,前者不改变seek,后者改变seek // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence charSequence = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(charSequence);// 把读到的字节再写回去ctx.writeAndFlush(buf);} }模拟netty的服务器
@Testpublic void serverMode() throws Exception {// 相当于 java 的 selectorNioEventLoopGroup thread = new NioEventLoopGroup(1);// 相当与 java 的 ServerSocketChannelNioServerSocketChannel server = new NioServerSocketChannel();// 绑定客户端的连接ChannelPipeline p = server.pipeline(); // p.addLast(new MyAcceptHandler(thread)); // 把selector 传进去了,接收到的客户端注册进来的读写,应该复用客户端的读写 // p.addLast(new MyAcceptHandler2(thread, new MyInHandler())); // 复用客户端的读写p.addLast(new MyAcceptHandler2(thread, new AcceptHandlerInitiator()));thread.register(server);ChannelFuture bind = server.bind(new InetSocketAddress("192.168.8.103", 9090));// 因为bind是异步的,要用同步方法锁住bind.channel().closeFuture().sync();System.out.println("server closed ....");} /*** 虽然可以加上@ChannelHandler.Sharable 解决不可复用问题,* 但是nio推荐加一层包装,请看 AcceptHandlerInitiator,这也是netty的做法*/ //@ChannelHandler.Sharable class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// buf有get、set和read、write两组字节的操作,前者不改变seek,后者改变seek // CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence charSequence = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(charSequence);// 把读到的字节再写回去ctx.writeAndFlush(buf);} }class MyAcceptHandler extends ChannelInboundHandlerAdapter {private final NioEventLoopGroup selector;public MyAcceptHandler(NioEventLoopGroup selector) {this.selector = selector;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {SocketChannel client = (SocketChannel) msg;ChannelPipeline p = client.pipeline();// 获得客户端了,要注册读写// 每次都 new MyInHandler()// 是新的对象,业务上是没问题的,但是如果有一万个客户端,要有一万个对象,这个对象处理连接和业务,应该分开连接和业务,连接公用一个实例,具体的业务根据需要要自己实现。// 解决办法办法请看2p.addLast(new MyInHandler());selector.register(client);} }class MyAcceptHandler2 extends ChannelInboundHandlerAdapter {private final NioEventLoopGroup selector;private final ChannelHandler handler;public MyAcceptHandler2(NioEventLoopGroup thread, ChannelHandler myInHandler) {this.selector = thread;this.handler = myInHandler;}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registered ....");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {SocketChannel client = (SocketChannel) msg;ChannelPipeline p = client.pipeline();// 获得客户端了,要注册读写// 所以复用 myInhandler,但是不同的客户端复用一个连接,nio不允许这么做,可以加上Sharable注解解决,// nio 的做法是加上一个包装p.addLast(handler);selector.register(client);} }@ChannelHandler.Sharable class AcceptHandlerInitiator extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());// 在 serverMode new AcceptHandlerInitiator 的时候, AcceptHandlerInitiator,在 pipeline 加入了// AcceptHandlerInitiator 本身,在获取到 client 时,又加入了了 myInHandler,// 而我们的业务实际上只需要 MyInhandler,所以 removectx.pipeline().remove(this);} }结合以上的模拟,实际的netty的简单模拟可以写成
@Testpublic void nettyClient() throws Exception {NioEventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new MyInHandler());}}).connect(new InetSocketAddress("192.168.172.3", 9090));Channel client = connect.sync().channel();ByteBuf byteBuf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(byteBuf);send.sync();client.closeFuture().sync();}@Testpublic void nettyServer() throws Exception {NioEventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bs = new ServerBootstrap();ChannelFuture bind = bs.group(group, group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioServerSocketChannel) throws Exception {ChannelPipeline p = nioServerSocketChannel.pipeline();p.addLast(new MyInHandler());}}).bind(new InetSocketAddress("192.168.8.103", 9090));bind.sync().channel().closeFuture().sync();}ChannelInitializer 相当于之前模拟的那个包装 MyInHandler 的类 AcceptHandlerInitiator
总结
以上是生活随笔为你收集整理的Bio->Nio->Selector->Epoll->Netty的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 搭建LVS_DR模型
- 下一篇: linux内核管理pagecache的一