手撕 RPC 1
rpc要素
rpc 最 low 的描述:调用远程服务像调用本地方法一样,也就是面向 interface 开发,最基本的,要做到像本地方法一样调用,意味着 consumer 要知道 provider 有什么服务( interface 名字)是什么,方法( 接口里的方法)是什么,参数(方法的传参)是什么,返回(方法的返回类型)什么。要解决这些问题,需要的知识点有:
- provider 和 consumer 的通信
- provider(至少一个)和 consumer(一般是多个)之间连接的数量,管理(多个 consumer 不能相互打扰),
- 拆包:连接建立后,怎么拆成正确的对象以获取正确的信息
- 动态代理:服务要用代理通过数据包分发
- 协议封装:不同的服务可以自己封装不同的协议
- 连接池
- 其他 ....
摘要
这是第一版,所有代码写在一个类里。单机模拟客户端和服务器,只有一个 consumer 和一个 provider。线程池管理只有一个线程,没有服务分发,注册和发现,通信用netty,代理用jdk的动态代理,序列化用jdk的Serializable,用header模拟协议,拆包写死。
本来想用门闩模拟线程阻塞,进而控制客户端的连接,但是 netty 的事件是异步的,无论在获取客户端之前还是之后声明门闩,程序总能正确解开门闩,模拟失败,不知道原因,所以啊,在写多线程的程序时,线程的调度要千万千万小心啊,程序很可能不按自己预想的运行.
package rpc;import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Test;import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch;/*** 1.假设一个需求,写一个 rpc* 2.来回通信,连接数量,拆包* 3.动态代理,序列化,协议封装* 4.连接池*/ public class MyRpcTest {public void startServer() {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);NioEventLoopGroup worker = eventExecutors;ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture localhost = serverBootstrap.group(eventExecutors, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ServerRequestHandler());}}).bind(new InetSocketAddress("localhost", 9090));try {localhost.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void get() {// 开启一个线程模拟服务器new Thread(() -> {startServer();}).start();// 线程等待一下,让服务端启动起来先try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("server start ....");// 获取服务Car car = ProxyGet(Car.class);// 使用服务car.race("BMW 530Li");}private static <T> T ProxyGet(Class<T> interfaceInfo) {// 实现各个版本的动态代理ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] interfaces = {interfaceInfo};// 用 jdk 的动态代理实现return (T) Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1.调用,服务、方法、参数,封装成 contentString name = interfaceInfo.getName(); // 服务名String methodName = method.getName(); // 方法名Class<?>[] parameterTypes = method.getParameterTypes(); // 方法的返回类型// 2.把调用服务的信息封装成一个可以序列化的对象// 先封装 bodyMyContent content = new MyContent();content.setName(name);content.setMethodName(methodName);content.setParameterTypes(parameterTypes);content.setArgs(args);// 把 content 做成字节数组准备写出去ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(content);byte[] msgBody = bos.toByteArray();// 再封装 header,header 需要 body 信息,MyHeader myHeader = createHeader(msgBody);bos.reset();oos = new ObjectOutputStream(bos);oos.writeObject(myHeader);byte[] msgHeader = bos.toByteArray();// 3.服务准备好了,接下来准备连接,模拟一个 size 是1的连接池ClientFactory factory = ClientFactory.getFactory();// 想模拟主线程不指令冲排序执行,但是 netty 的时间是异步的,试了好多次,无论门闩在获取客户端之前创建还是之后创建,不知道为什么每次都能正确解开门闩,模拟不成功synchronized (this) {NioSocketChannel client = factory.getClient(new InetSocketAddress("localhost", 9090));// 加个门闩,阻塞住,一个 client 一个 client 的处理,在处理返回的后再打开门闩,使程序继续运行,门闩和创建CountDownLatch countDownLatch = new CountDownLatch(1);// 4.发送,走 IO,走 nettylong requestId = myHeader.getRequestId();ResponseHandler.addCallBack(requestId, () -> {countDownLatch.countDown();});System.out.println("shuan zhu ....");ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);byteBuf.writeBytes(msgHeader);byteBuf.writeBytes(msgBody);ChannelFuture channelFuture = client.writeAndFlush(byteBuf);channelFuture.sync();System.out.println("before await ....");countDownLatch.await();System.out.println("after await ....");}return null;}});}static MyHeader createHeader(byte[] msgBytes) {MyHeader header = new MyHeader();int size = msgBytes.length;// 用16进制的,32 位可以做很多事情int f = 0x14141414;long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());header.setFlag(f);header.setRequestId(requestId);header.setDataLen(size);return header;} }/*** 模拟服务*/ interface Car {void race(String msg); }/*** 头部定义三个标志* 1.方法的标记,用32位的位标记* 2.请求的 id* 3.请求体的长度*/ class MyHeader implements Serializable {int flag;long requestId;long dataLen;public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}public long getRequestId() {return requestId;}public void setRequestId(long requestId) {this.requestId = requestId;}public long getDataLen() {return dataLen;}public void setDataLen(long dataLen) {this.dataLen = dataLen;} }/*** 模拟请求体*/ class MyContent implements Serializable {// 服务名String name;// 方法名String methodName;// 返回值类型Class<?>[] parameterTypes;// 参数Object[] args;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getArgs() {return args;}public void setArgs(Object[] args) {this.args = args;} }/*** 模拟客户端的创建,用单例*/ class ClientFactory {int pollSize = 1;NioEventLoopGroup clientWorker;Random random = new Random();private static final ClientFactory factory;private ClientFactory() {}static {factory = new ClientFactory();}public static ClientFactory getFactory() {return factory;}ConcurrentHashMap<InetSocketAddress, ClientPool> outboxes = new ConcurrentHashMap<InetSocketAddress, ClientPool>();public synchronized NioSocketChannel getClient(InetSocketAddress address) {ClientPool clientPool = outboxes.get(address);if (clientPool == null) {outboxes.putIfAbsent(address, new ClientPool(pollSize));clientPool = outboxes.get(address);}int i = random.nextInt(pollSize);// 如果有就返回if (clientPool.clients[i] != null && clientPool.clients[i].isActive()) {return clientPool.clients[i];}// 没有就创建synchronized (clientPool.locks[i]) {return clientPool.clients[i] = create(address);}}private NioSocketChannel create(InetSocketAddress address) {// 基于 netty 的客户端创建方式clientWorker = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(clientWorker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ClientResponses()); // 根据 requestId 找到给谁}}).connect(address);try {NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;} catch (InterruptedException e) {e.printStackTrace();}return null;} }/*** 模拟线连接池*/ class ClientPool {NioSocketChannel[] clients;Object[] locks;ClientPool(int size) {clients = new NioSocketChannel[size];locks = new Object[size];for (int i = 0; i < size; i++) {locks[i] = new Object();}} }/*** 客户端注册,连接成功后放开门闩,让主线程继续运行*/ class ClientResponses extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;if (byteBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];byteBuf.readBytes(bytes); // 指针偏移到 msgHeader 的末尾ByteArrayInputStream bos = new ByteArrayInputStream(bytes);ObjectInputStream oin = new ObjectInputStream(bos);MyHeader header = (MyHeader) oin.readObject();System.out.println("client reponse id : " + header.getRequestId());System.out.println("channelRead ....");// 然后放开门闩ResponseHandler.runCallBack(header.getRequestId());}} }/*** 用于主线程的阻塞的控制*/ class ResponseHandler {static ConcurrentHashMap<Long, Runnable> mapping = new ConcurrentHashMap<>();public static void addCallBack(long requestId, Runnable cb) {mapping.putIfAbsent(requestId, cb);}public static void runCallBack(long requestId) {Runnable runnable = mapping.get(requestId);runnable.run();removeCallBack(requestId);}public static void removeCallBack(long requestId) {mapping.remove(requestId);} }/*** 服务端注册的事件* 没有具体的业务逻辑,只接收客户端连接并打印出客户端要请求的服务(方法都不打印)*/ class ServerRequestHandler extends ChannelInboundHandlerAdapter {// provider@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf b = (ByteBuf) msg;ByteBuf sendBuf = b.copy();// 82 是打断点跟踪得出来的 header 的长度if (sendBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];// 指针移动到 header 的末尾b.readBytes(bytes);ByteArrayInputStream bis = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bis);MyHeader myHeader = (MyHeader) objectInputStream.readObject();System.out.println("server response id : " + myHeader.getRequestId());// 读取请求体,并答应服务名if (b.readableBytes() >= myHeader.getDataLen()) {byte[] data = new byte[(int) myHeader.getDataLen()];b.readBytes(data);ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);MyContent myContent = (MyContent) ois.readObject();System.out.println("server response name : " + myContent.getName());}}// 把数据包打印回客户端ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);channelFuture.sync();} }很明显这是不能用的,第一,客户端只有一个,那么客户端变成多个呢?紧接着手撕 RPC 2
总结
- 上一篇: 系统调用回答为什么要用buffer写
- 下一篇: 手撕 RPC 2