【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
文章目录
- 一、 Netty 模型代码解析
- 二、 Netty 案例服务器端代码
- 1 . 服务器主程序
- 2 . 服务器自定义 Handler 处理者
- 三、 Netty 案例客户端代码
- 1 . 客户端主程序
- 2 . 客户端自定义 Handler 处理者
- 四、 Netty 案例运行
一、 Netty 模型代码解析
1 . 线程池 NioEventLoopGroup :
① NioEventLoopGroup 线程池使用场景 : Netty 模型中的 BossGroup 和 WorkerGroup 都是 NioEventLoopGroup 类型的线程池 ;
② NioEventLoopGroup 默认线程个数 : 系统默认每个线程池中的 NioEventLoop 线程数是 CPU 核数 ×\times× 2 , 下面的代码可以获取运行 Netty 程序的设备的 CPU 核数 ;
// 获取设备的 CPU 核数 NettyRuntime.availableProcessors()③ 指定 NioEventLoopGroup 线程个数 : 如果不想使用 Netty 线程池的默认线程个数 , 可以在 NioEventLoopGroup 构造函数中子星设定线程数 ;
// BossGroup 线程池 : 负责客户端的连接 // 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1 EventLoopGroup bossGroup = new NioEventLoopGroup(1);2 . NioEventLoopGroup 线程池线程分配 :
以客户端连接完成后 , 数据读写场景举例 ;
在 双核 CPU 的服务器上 , NioEventLoopGroup 默认有 444 个线程 ; 按照顺序循环分配 , 为第 nnn 个客户端分配第 n%4n \% 4n%4 个 NioEventLoop 线程 ;
- 客户端 000 与服务器进行数据交互在 NioEventLoop 000 线程中 ;
- 客户端 111 与服务器进行数据交互在 NioEventLoop 111 线程中 ;
- 客户端 222 与服务器进行数据交互在 NioEventLoop 222 线程中 ;
- 客户端 333 与服务器进行数据交互在 NioEventLoop 333 线程中 ;
- 客户端 444 与服务器进行数据交互在 NioEventLoop 000 线程中 ;
- 客户端 555 与服务器进行数据交互在 NioEventLoop 111 线程中 ;
- 客户端 666 与服务器进行数据交互在 NioEventLoop 222 线程中 ;
- 客户端 777 与服务器进行数据交互在 NioEventLoop 333 线程中 ;
3 . NioEventLoopGroup 线程池封装内容 :
① NioEventLoopGroup 中的若干个 NioEventLoop 线程都封装在 children 中 , 线程个数是 CPU 核数 2 倍 ;
② 每个 NioEventLoop 线程中封装了如下内容 :
- 选择器 ( Selector ) , 用于监听客户端的读写 IO 事件 ;
- 任务队列 ( taskQueue ) , 用于存储事件对应的业务逻辑任务 ;
- 线程执行器 ( executor ) , 用于执行线程 ;
4 . ChannelHandlerContext 通道处理者上下文对象封装内容 :
① 用户自定义的 处理者 ( Handler ) , 这里指的是 服务器端的 ServerHandr ( 自定义 ) , 客户端的 ClientHandler ( 自定义 ) ;
② 管道 ( ChannelPipeline ) : 其本质是双向链表 , 该 ChannelHandlerContext 可以获取该链表的前一个 ( prev ) , 后一个管道对象 ( next ) ;
③ 管道 与 通道 :
- 二者都可以通过 通道处理者上下文 ( ChannelHandlerContext ) 获取 ;
- 管道 与 通道 都可以互相从对方获取 ;
④ 管道 ( Pipeline ) 与 通道 ( Channel ) 关联 : 通过管道可以获取通道 , 通过通道也可以获取其对应的管道 ;
5 . 处理者 ( Handler ) :
① 设置 Handler : 给 WorkerGroup 线程池中的 EventLoop 线程对应的管道设置 处理器 ( Handler ) ;
② 自定义 Handler : 一般这个 Handler 都是用户自定义的类 , 继承 ChannelInboundHandlerAdapter 类 ;
③ 运行机制 : 在 BossGroup 中连接客户端成功后 , 将 NioSocketChannel 注册给 WorkerGroup 中的 EventLoop 中的 选择器 ( Selector ) , 如果监听到客户端数据 IO 事件 , 就会调用 管道 ( Pipeline ) 处理该事件 , 管道 ( Pipeline ) 中调用 处理器 ( Handler ) 处理相应的事件 , 该 处理器 ( Handler ) 可以是 Netty 提供的 , 也可以是开发者自定义的 ;
特别注意 : 自定义 Handler 中 , 重写的 ChannelInboundHandlerAdapter 方法 , 将 super() 语句都删除 ;
二、 Netty 案例服务器端代码
1 . 服务器主程序
package kim.hsl.netty;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Netty 案例服务器端*/ public class Server {public static void main(String[] args) {// 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程// NioEventLoop 线程中执行无限循环操作// BossGroup 线程池 : 负责客户端的连接// 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1EventLoopGroup bossGroup = new NioEventLoopGroup(1);// WorkerGroup 线程池 : 负责客户端连接的数据读写EventLoopGroup workerGroup = new NioEventLoopGroup();// 2. 服务器启动对象, 需要为该对象配置各种参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor.channel(NioServerSocketChannel.class) // 设置 NIO 网络套接字通道类型.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列维护的连接个数.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连接状态行为, 保持连接状态.childHandler( // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handlernew ChannelInitializer<SocketChannel>() {// 创建通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 为 管道 Pipeline 设置处理器 Hanedlerch.pipeline().addLast(new ServerHandr());}});System.out.println("服务器准备完毕 ...");ChannelFuture cf = null;try {// 绑定本地端口, 进行同步操作 , 并返回 ChannelFuturecf = bootstrap.bind(8888).sync();System.out.println("服务器开始监听 8888 端口 ...");// 关闭通道 , 开始监听操作cf.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 出现异常后, 优雅的关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }2 . 服务器自定义 Handler 处理者
package kim.hsl.netty;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;/*** Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类** 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类* 才可以设置给 NioEventLoop 线程** 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发*/ public class ServerHandr extends ChannelInboundHandlerAdapter {/*** 读取数据 : 在服务器端读取客户端发送的数据* @param ctx* 通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息* 管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler* 通道 ( Channel ) : 注重数据读写* @param msg* 客户端上传的数据* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 查看 ChannelHandlerContext 中封装的内容System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);// 将客户端上传的数据转为 ByteBuffer// 这里注意该类是 Netty 中的 io.netty.buffer.ByteBuf 类// 不是 NIO 中的 ByteBuffer// io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBufferByteBuf byteBuf = (ByteBuf) msg;// 将 ByteBuf 缓冲区数据转为字符串, 打印出来System.out.println(ctx.channel().remoteAddress() + " 接收到客户端发送的数据 : " + byteBuf.toString(CharsetUtil.UTF_8));}/*** 服务器端读取数据完毕后回调的方法* @param ctx* 通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息* * 管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler* * 通道 ( Channel ) : 注重数据读写* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 数据编码 : 将字符串编码, 存储到 io.netty.buffer.ByteBuf 缓冲区中ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);// 写出并刷新操作 : 写出数据到通道的缓冲区 ( write ), 并执行刷新操作 ( flush )ctx.writeAndFlush(byteBuf);}/*** 异常处理 , 上面的方法中都抛出了 Exception 异常, 在该方法中进行异常处理* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("通道异常, 关闭通道");//如果出现异常, 就关闭该通道ctx.close();} }三、 Netty 案例客户端代码
1 . 客户端主程序
package kim.hsl.netty;import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;public class Client {public static void main(String[] args) {// 客户端只需要一个 时间循环组 , 即 NioEventLoopGroup 线程池EventLoopGroup eventLoopGroup = new NioEventLoopGroup();// 客户端启动对象Bootstrap bootstrap = new Bootstrap();// 设置相关参数bootstrap.group(eventLoopGroup) // 设置客户端的线程池.channel(NioSocketChannel.class) // 设置客户端网络套接字通道类型.handler( // 设置客户端的线程池对应的 NioEventLoop 设置对应的事件处理器 Handlernew ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHandr());}});try {// 开始连接服务器, 并进行同步操作// ChannelFuture 类分析 , Netty 异步模型// sync 作用是该方法不会再次阻塞ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();System.out.println("客户端连接服务器成功 ...");// 关闭通道, 开始监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {// 优雅的关闭eventLoopGroup.shutdownGracefully();}} }2 . 客户端自定义 Handler 处理者
package kim.hsl.netty;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;/*** Handler 处理者, 是 NioEventLoop 线程中处理业务逻辑的类** 继承 : 该业务逻辑处理者 ( Handler ) 必须继承 Netty 中的 ChannelInboundHandlerAdapter 类* 才可以设置给 NioEventLoop 线程** 规范 : 该 Handler 类中需要按照业务逻辑处理规范进行开发*/ public class ClientHandr extends ChannelInboundHandlerAdapter {/*** 通道就绪后触发该方法* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 查看 ChannelHandlerContext 中封装的内容System.out.println("channelActive : ChannelHandlerContext ctx = " + ctx);// 数据编码 : 将字符串编码, 存储到 io.netty.buffer.ByteBuf 缓冲区中ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8);// 写出并刷新操作 : 写出数据到通道的缓冲区 ( write ), 并执行刷新操作 ( flush )ctx.writeAndFlush(byteBuf);System.out.println("客户端向服务器端发送 Hello Server 成功");}/*** 读取数据 : 在服务器端读取客户端发送的数据* @param ctx* 通道处理者上下文对象 : 封装了 管道 ( Pipeline ) , 通道 ( Channel ), 客户端地址信息* 管道 ( Pipeline ) : 注重业务逻辑处理 , 可以关联很多 Handler* 通道 ( Channel ) : 注重数据读写* @param msg* 服务器返回的数据* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 查看 ChannelHandlerContext 中封装的内容System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);// 将服务器下发的数据转为 ByteBuffer// 这里注意该类是 Netty 中的 io.netty.buffer.ByteBuf 类// 不是 NIO 中的 ByteBuffer// io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBufferByteBuf byteBuf = (ByteBuf) msg;// 将 ByteBuf 缓冲区数据转为字符串, 打印出来System.out.println(ctx.channel().remoteAddress() + " 服务器返回的数据 : " + byteBuf.toString(CharsetUtil.UTF_8));}/*** 异常处理 , 上面的方法中都抛出了 Exception 异常, 在该方法中进行异常处理* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("通道异常, 关闭通道");//如果出现异常, 就关闭该通道ctx.close();} }四、 Netty 案例运行
1 . 运行服务器端 : 服务器启动 , 监听 8888 端口 ;
2 . 运行客户端 : 客户端连接服务器的 8888 端口 , 并向服务器端写出 Hello Server 字符串 , 之后便接到服务器端回送的 Hello Client 字符串信息 ;
3 . 查看客户端 : 服务器端接收到客户端信息 , 向客户端写出 Hello Client 字符串 ;
总结
以上是生活随笔为你收集整理的【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 【Netty】Netty 入门案例分析
- 下一篇: 【Netty】 异步任务调度 ( Tas