RPC实现Consumer 远程调用
生活随笔
收集整理的这篇文章主要介绍了
RPC实现Consumer 远程调用
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样。具体调用过程如下图所示:
下面来看代码实现,创建RpcProxy 类:
import java.lang.reflect.Proxy; public class RpcProxy {public static <T> T create(Class<?> clazz){//clazz 传进来本身就是interfaceMethodProxy proxy = new MethodProxy(clazz);Class<?> [] interfaces = clazz.isInterface() ?new Class[]{clazz} :clazz.getInterfaces();T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);return result;} }在RpcProxy 类的内部实现远程方法调用的代理类,即由Netty 发送网络请求,具体代码如下:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class RpcProxy {public static <T> T create(Class<?> clazz){}private static class MethodProxy implements InvocationHandler {private Class<?> clazz;public MethodProxy(Class<?> clazz){this.clazz = clazz;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//如果传进来是一个已实现的具体类(本次演示略过此逻辑)if (Object.class.equals(method.getDeclaringClass())) {try {return method.invoke(this, args);} catch (Throwable t) {t.printStackTrace();}//如果传进来的是一个接口(核心)} else {return rpcInvoke(proxy,method, args);}return null;}/*** 实现接口的核心方法* @param method* @param args* @return*/public Object rpcInvoke(Object proxy,Method method,Object[] args){//传输协议封装InvokerProtocol msg = new InvokerProtocol();msg.setClassName(this.clazz.getName());msg.setMethodName(method.getName());msg.setValues(args);msg.setParames(method.getParameterTypes());final RpcProxyHandler consumerHandler = new RpcProxyHandler();EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//自定义协议解码器/** 入参有5 个,分别解释如下maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置lengthFieldLength:长度字段的长度:如:长度字段是int 型表示,那么这个值就是4(long 型就是8)lengthAdjustment:要添加到长度字段值的补偿值initialBytesToStrip:从解码帧中去除的第一个字节数*/pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定义协议编码器pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//对象参数类型编码器pipeline.addLast("encoder", new ObjectEncoder());//对象参数类型解码器pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler",consumerHandler);}});ChannelFuture future = b.connect("localhost", 8080).sync();future.channel().writeAndFlush(msg).sync();future.channel().closeFuture().sync();} catch(Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}return consumerHandler.getResponse();}} }接收网络调用的返回值
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class RpcProxyHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("client exception is general");} }完成客户端调用代码:
public class RpcConsumer {public static void main(String [] args){IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);System.out.println(rpcHello.hello("Tom 老师"));IRpcService service = RpcProxy.create(IRpcService.class);System.out.println("8 + 2 = " + service.add(8, 2));System.out.println("8 - 2 = " + service.sub(8, 2));System.out.println("8 * 2 = " + service.mult(8, 2));System.out.println("8 / 2 = " + service.div(8, 2));} }
总结
以上是生活随笔为你收集整理的RPC实现Consumer 远程调用的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: RPC实现Provider服务端业务逻辑
- 下一篇: Channel 与ChannelPipe