Mina网络通信框架
认识 Mina
Apache Mina Server 是一个网络通信应用框架,与 Netty 出自同一作者,Netty 借鉴了部分 Mina 的设计思路。
Mina 主要是对基于 TCP/IP、UDP/IP 协议栈的通信框架,Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步操作的编程模型,Mina 的异步 IO 默认使用的是 JAVA NIO(New IO)作为底层支持,基于 Channel 的双向通道。Mina 主要有1.x 和2.x 两个分支。Mina 同时提供了网络通信的 Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中提供了一系列接口 API,Mina 的 API 将真正的网络通信与我们的应用程序隔离开来。
Mina 的底层依赖的主要是 Java NIO 库,上层提供的是基于事件的异步接口。其整体的结构如下:
- IoService:最底层的是 IOService,负责具体的 IO 相关工作。这一层的典型代表有 IOSocketAcceptor 和 IOSocketChannel,分别对应 TCP 协议下的服务端和客户端的 IOService。IOService 的意义在于隐藏底层 IO 的细节,对上提供统一的基于事件的异步 IO 接口。每当有数据到达时,IOService 会先调用底层 IO 接口读取数据,封装成 IoBuffer,之后以事件的形式通知上层代码,从而将 Java NIO 的同步 IO 接口转化成了异步 IO。所以从图上看,进来的 low-level IO 经过 IOService 层后变成 IO Event。具体的代码可以参考 org.apache.mina.core.polling.AbstractPollingIoProcessor 的私有内部类 Processor。
- IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector,这是与我们使用 JAVA NIO 编码时的一个不同之处,通常在 JAVA NIO 编码中,我们都是使用一个 Selector,也就是不区分 IoService 与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler。
- IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的 encode 与decode 是最为重要的、也是你在使用 Mina 时最主要关注的地方。
- IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler 可以看成是 Mina 处理流程的终点,每个 IoService 都需要指定一个 IoHandler。
- IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个 IoSession 对应于一个底层的 IO 连接(在 Mina 中 UDP 也被抽象成了连接)。通过 IoSession,可以获取当前连接相关的上下文信息,以及向远程 peer 发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过 IoFilterChain,到达 IoService。但 IoService 上并不会直接调用底层 IO 接口来将数据发送出去,而是会将该次调用封装成一个 WriteRequest,放入 session 的 writeRequestQueue 中,最后由 IoProcessor 线程统一调度 flush 出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考 org.apache.mina.core.filterchain.DefaultIoFilterChain 的内部类 HeadFilter 的 filterWrite 方法。
服务端流程:
- 通过 SocketAcceptor 同客户端建立连接
- 连接建立之后 I/O 的读写交给了 I/O Processor 线程,I/O Processor 是多线程的
- 通过 I/O Processor 读取的数据经过 IoFilterChain 里所有配置的 IoFilter,IoFilter 进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议
- 最后 IoFilter 将数据交给 Handler 进行业务处理,完成了整个读取的过程
写入过程也是类似,只是刚好倒过来,通过 IoSession.write 写出数据,然后 Handler 进行写入的业务处理,处理完成后交给 IoFilterChain,进行消息过滤和协议的转换,最后通过 I/O Processor 将数据写出到 socket 通道。
简单的 TCPServer
第一步:编写 IoService
IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.bind(new InetSocketAddress(9124));第二步:编写过滤器
// 编写过滤器 acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())) );第三步:编写 IoHandler
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class TCPServerHandler extends IoHandlerAdapter {@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {}@Overridepublic void sessionCreated(IoSession session) throws Exception {super.sessionCreated(session);}@Overridepublic void sessionOpened(IoSession session) throws Exception {super.sessionOpened(session);}@Overridepublic void sessionClosed(IoSession session) throws Exception {super.sessionClosed(session);} }把这个 IoHandler 注册到 IoService:
acceptor.setHandler(new TCPServerHandler());当然这段代码也要在 acceptor.bind() 方法之前执行。完成的代码:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset;import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class TCPServer {public static void main(String[] args) throws IOException {IoAcceptor acceptor = new NioSocketAcceptor();acceptor.getSessionConfig().setReadBufferSize(2048);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 编写过滤器acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));//设置handleracceptor.setHandler(new TCPServerHandler());//绑定端口acceptor.bind(new InetSocketAddress(9124));} }简单的 TCPClient
第一步:编写 IoService 并注册过滤器
import java.net.InetSocketAddress; import java.nio.charset.Charset;import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector;public class TCPClient {public static void main(String[] args) {IoConnector connector = new NioSocketConnector();connector.setConnectTimeoutMillis(30000);connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); connector.connect(new InetSocketAddress("localhost", 9124));}}第三步:编写 IoHandler
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class TCPClientHandler extends IoHandlerAdapter {private final String values;public TCPClientHandler(String values) {this.values = values;}@Overridepublic void sessionOpened(IoSession session) {session.write(values);} }注册 IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));Mina网络通信框架
由于传统的 Socket 网络编程基于一个线程对应一个客户端的实现方式,大量的线程创建和销毁导致性能下降,无法应对高并发量的访问,所以基于服务器端的网络通信开发,我们常用 Mina 网络通信框架,即常说的 Java NIO ( java non-blocking IO ) 开发。
首先,我们来看看 Mina 的几个重要接口:
IoServiece :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector,监听是否有连接被建立。 IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector, 这是与我们使用 JAVA NIO 编码时的一个不同之处,通常在 JAVA NIO 编码中,我们都是使用一个 Selector, 也就是不区分 IoService与 IoProcessor 两个功能接口。 另外,IoProcessor 负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler。 IoAccepter :相当于网络应用程序中的服务器端 IoConnector :相当于客户端 IoSession :当前客户端到服务器端的一个连接实例 IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。这也是实际开发过程中需要用户自己编写的部分代码。 IoFilter :过滤器用于悬接通讯层接口与业务层接口,这个接口定义一组拦截器,这些拦截器可以包括日志输 出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的 encode与 decode是 最为重要的、也是你在使用 Mina时最主要关注的地方。接着,我们来看看 Mina 的一个重要的类 IoHandlerAdapter,此类仅仅是实现了 IoHandler 接口,但并不做任何处理。
一个 IoHandler 接口中具有如下一些方法(摘自 MINA 的 API 文档):
下面我们来看看 Mina 服务器网络通信框架开发流程:
1、导入相关 jar 包: mina-core-2.0.13.jar、slf4j-api-1.7.14.jar
2、创建acceptor,绑定Handler,设置Filter,绑定端口
NioSocketAcceptor acceptor = new NioSocketAcceptor();acceptor.setHandler(new MyServerHandler());// 获取拦截器,拦截器作用:把字节转成对象// TextLineCodecFactory 把数据进行加解码acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory()));// 5秒钟服务器和客户端没有进行读写,则进入空闲状态Idleacceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5);//绑定9898端口acceptor.bind(new InetSocketAddress(9898));3、创建自定义的Handler
package com.example.server;import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession;public class MyServerHandler extends IoHandlerAdapter {@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {System.out.println("exceptionCaught");}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {String s = (String) message;System.out.println("messageReceived: " + s);session.write("server replay: " + s);}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("messageSent");}@Overridepublic void sessionClosed(IoSession session) throws Exception {System.out.println("sessionClosed");}@Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println("sessionCreated");}/*** 会话空闲状态*/@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {System.out.println("sessionIdle");}@Overridepublic void sessionOpened(IoSession session) throws Exception {System.out.println("sessionOpened");}}4、由于默认的 Filter: TextLineCodeFactory 只能接收以换行符为结束的消息,有时为满足特定需求,需要自定义一个 Factory
package com.example.server;import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder;public class MyTextLineFactory implements ProtocolCodecFactory {private MyTextLineDecoder mDecoder;private MyTextLineEncoder mEncoder;private MyTextLineCumulativeDecoder mCumulativeDecoder;public MyTextLineFactory() {mDecoder = new MyTextLineDecoder();mEncoder = new MyTextLineEncoder();mCumulativeDecoder = new MyTextLineCumulativeDecoder();}// 加密@Overridepublic ProtocolDecoder getDecoder(IoSession arg0) throws Exception {// return mDecoder;// 解决没检测到\n时的数据丢失问题return mCumulativeDecoder;}// 解密@Overridepublic ProtocolEncoder getEncoder(IoSession arg0) throws Exception {return mEncoder;}} package com.example.server;import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput;public class MyTextLineEncoder implements ProtocolEncoder {@Overridepublic void dispose(IoSession session) throws Exception {}@Overridepublic void encode(IoSession session, Object message,ProtocolEncoderOutput output) throws Exception {String s = null;if (message instanceof String) {s = (String) message;}if (s != null) {// 系统默认的EncoderCharsetEncoder charsetEncoder = (CharsetEncoder) session.getAttribute("encoder");if (charsetEncoder == null) {charsetEncoder = Charset.defaultCharset().newEncoder();session.setAttribute("encoder", charsetEncoder);}IoBuffer ioBuffer = IoBuffer.allocate(s.length());ioBuffer.setAutoExpand(true);// 自动扩展ioBuffer.putString(s, charsetEncoder);ioBuffer.flip();output.write(ioBuffer);}} } package com.example.server;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput;public class MyTextLineDecoder implements ProtocolDecoder {@Overridepublic void decode(IoSession session, IoBuffer ioBuffer,ProtocolDecoderOutput output) throws Exception {// 起始位置int startPosition = ioBuffer.position();// 是否还有数据while (ioBuffer.hasRemaining()) {byte b = ioBuffer.get();// 读取到\nif (b == '\n') {// 当前位置int currentPosition = ioBuffer.position();// 当前总长度,指向末尾int limit = ioBuffer.limit();// 截取行ioBuffer.position(startPosition);ioBuffer.limit(currentPosition);IoBuffer buf = ioBuffer.slice();// 把buf中的数据写入到destbyte[] dest = new byte[buf.limit()];buf.get(dest);String str = new String(dest);output.write(str);// 还原位置ioBuffer.position(currentPosition);ioBuffer.limit(limit);}}}@Overridepublic void dispose(IoSession arg0) throws Exception {}@Overridepublic void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)throws Exception {} }5、为了解决数据丢失问题,我们常使用 CumulativeProtocolDecoder
package com.example.server;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput;/*** 处理数据丢失*/ public class MyTextLineCumulativeDecoder extends CumulativeProtocolDecoder {@Overrideprotected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer,ProtocolDecoderOutput output) throws Exception {// 起始位置int startPosition = ioBuffer.position();// 是否还有数据while (ioBuffer.hasRemaining()) {byte b = ioBuffer.get();// 读取到\nif (b == '\n') {// 当前位置int currentPosition = ioBuffer.position();// 当前总长度,指向末尾int limit = ioBuffer.limit();// 截取行ioBuffer.position(startPosition);ioBuffer.limit(currentPosition);IoBuffer buf = ioBuffer.slice();// 把buf中的数据写入到destbyte[] dest = new byte[buf.limit()];buf.get(dest);String str = new String(dest);output.write(str);// 还原位置ioBuffer.position(currentPosition);ioBuffer.limit(limit);return true;// 读取完成}}ioBuffer.position(startPosition);return false;// 读取未完成} }至此,整个服务器搭建框架就完了,是否比 Socket 开发要简洁呢?下节笔者将为大家带来一个完整案例,并给出相应的 jar 包和源码。
Mina详解:https://www.cnblogs.com/duanxz/p/5143227.html
总结
以上是生活随笔为你收集整理的Mina网络通信框架的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 基因分子生物学~tRNA,mRNA,蛋白
- 下一篇: 分子生物学之蛋白质与氨基酸