生活随笔
收集整理的这篇文章主要介绍了
手撕 RPC 2
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
把单个客户端改成多个呢?
在成功连接几个client后报错了,是拆解包的时候出错,为什么呢?看一下多个客户端连接时候的 netty 的模型:
因为服务程序什么时候从内核的buffer里读走数据跟客户端往buffer里写数据是两个独立的动作,所以在多客户端(多个socket)的场景下是不能整齐的读到正确的包,所以报错解码错误,IO是双向的,服务端解码错误的问题在客户端也一样,怎么处理?
处理通信流的解码问题
出现这种问题底层的原因是
因为多个socket往一个buffer里写,所以要保证程序读取的时候每次都能跟包的头部(header和body的头部)对齐,而且每次readChannel()的之前程序把该次请求的包发完整。强大的 netty 给了我们 ByteToMessageDecoder ,在 pipeline 的业务事件之前加上解码事件就可以了。
另外解决一些问题
1.服务器使用20个线程处理 listen socket 连接和 IO socket。
2.多个 client 连接,维护一个线程池来管理这些连接。
3.使用 netty 的 ByteToMessageDecoder 解决解码不正确问题。
4.使用 CompletableFuture 来获取客户端调用方法的返回。
package rpc
;import io
.netty
.bootstrap
.Bootstrap
;
import io
.netty
.bootstrap
.ServerBootstrap
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.PooledByteBufAllocator
;
import io
.netty
.channel
.*
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.nio
.NioServerSocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
import io
.netty
.handler
.codec
.ByteToMessageDecoder
;
import org
.junit
.Test
;import java
.io
.*
;
import java
.lang
.reflect
.InvocationHandler
;
import java
.lang
.reflect
.Method
;
import java
.lang
.reflect
.Proxy
;
import java
.net
.InetSocketAddress
;
import java
.util
.List
;
import java
.util
.Random
;
import java
.util
.UUID
;
import java
.util
.concurrent
.CompletableFuture
;
import java
.util
.concurrent
.ConcurrentHashMap
;
import java
.util
.concurrent
.atomic
.AtomicInteger
;
public class MyRpcTest {@Testpublic void startServer() {
NioEventLoopGroup eventExecutors
= new NioEventLoopGroup(20);NioEventLoopGroup worker
= eventExecutors
;ServerBootstrap serverBootstrap
= new ServerBootstrap();ChannelFuture localhost
= serverBootstrap
.group(eventExecutors
, worker
).channel(NioServerSocketChannel
.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel
) {ChannelPipeline pipeline
= nioSocketChannel
.pipeline();pipeline
.addLast(new MyDecoder());pipeline
.addLast(new ServerRequestHandler());}}).bind(new InetSocketAddress("localhost", 9090));try {localhost
.sync().channel().closeFuture().sync();} catch (InterruptedException e
) {e
.printStackTrace();}}@Testpublic void get() {new Thread(() -> {startServer();}).start();try {Thread
.sleep(2000);} catch (InterruptedException e
) {e
.printStackTrace();}System
.out
.println("server start ....");AtomicInteger atomicInteger
= new AtomicInteger();int size
= 30;Thread
[] threads
= new Thread[size
];for (int i
= 0; i
< size
; i
++) {threads
[i
] = new Thread(() -> {Car car
= proxyGet(Car
.class);int args
= atomicInteger
.incrementAndGet();String s
= car
.race("client: " + args
);System
.out
.println("client got args: " + s
+ " --- " + args
);});}for (Thread thread
: threads
) {thread
.start();}try {System
.in
.read();} catch (IOException e
) {e
.printStackTrace();}}private static <T> T
proxyGet(Class
<T> interfaceInfo
) {ClassLoader classLoader
= interfaceInfo
.getClassLoader();Class
<?>[] interfaces
= {interfaceInfo
};return (T
) Proxy
.newProxyInstance(classLoader
, interfaces
, new InvocationHandler() {@Overridepublic Object
invoke(Object proxy
, Method method
, Object
[] args
) throws Throwable
{String name
= interfaceInfo
.getName(); String methodName
= method
.getName(); Class
<?>[] parameterTypes
= method
.getParameterTypes(); MyContent content
= new MyContent();content
.setName(name
);content
.setMethodName(methodName
);content
.setParameterTypes(parameterTypes
);content
.setArgs(args
);ByteArrayOutputStream bos
= new ByteArrayOutputStream();ObjectOutputStream oos
= new ObjectOutputStream(bos
);oos
.writeObject(content
);byte[] msgBody
= bos
.toByteArray();MyHeader myHeader
= createHeader(msgBody
);bos
.reset();oos
= new ObjectOutputStream(bos
);oos
.writeObject(myHeader
);byte[] msgHeader
= bos
.toByteArray();ClientFactory factory
= ClientFactory
.getFactory();NioSocketChannel client
= factory
.getClient(new InetSocketAddress("localhost", 9090));ByteBuf byteBuf
= PooledByteBufAllocator
.DEFAULT
.directBuffer(msgHeader
.length
+ msgBody
.length
);CompletableFuture
<String> future
= new CompletableFuture<>();ResponseMappingHandler
.addCallBack(myHeader
.getRequestId(), future
);byteBuf
.writeBytes(msgHeader
);byteBuf
.writeBytes(msgBody
);ChannelFuture channelFuture
= client
.writeAndFlush(byteBuf
);channelFuture
.sync();return future
.get();}});}static MyHeader
createHeader(byte[] msgBytes
) {MyHeader header
= new MyHeader();int size
= msgBytes
.length
;int f
= 0x14141414;long requestId
= Math
.abs(UUID
.randomUUID().getLeastSignificantBits());header
.setFlag(f
);header
.setRequestId(requestId
);header
.setDataLen(size
);return header
;}
}
interface Car {String
race(String msg
);
}
class MyHeader implements Serializable {int flag
;long requestId
;long dataLen
;public int getFlag() {return flag
;}public void setFlag(int flag
) {this.flag
= flag
;}public long getRequestId() {return requestId
;}public void setRequestId(long requestId
) {this.requestId
= requestId
;}public long getDataLen() {return dataLen
;}public void setDataLen(long dataLen
) {this.dataLen
= dataLen
;}
}
class MyContent implements Serializable {String name
;String methodName
;Class
<?>[] parameterTypes
;Object
[] args
;String res
;public String
getRes() {return res
;}public void setRes(String res
) {this.res
= res
;}public String
getName() {return name
;}public void setName(String name
) {this.name
= name
;}public String
getMethodName() {return methodName
;}public void setMethodName(String methodName
) {this.methodName
= methodName
;}public Class
<?>[] getParameterTypes() {return parameterTypes
;}public void setParameterTypes(Class
<?>[] parameterTypes
) {this.parameterTypes
= parameterTypes
;}public Object
[] getArgs() {return args
;}public void setArgs(Object
[] args
) {this.args
= args
;}
}
class ClientFactory {int pollSize
= 50; NioEventLoopGroup clientWorker
;Random random
= new Random();private static final ClientFactory factory
;private ClientFactory() {}static {factory
= new ClientFactory();}public static ClientFactory
getFactory() {return factory
;}ConcurrentHashMap
<InetSocketAddress, ClientPool> outboxes
= new ConcurrentHashMap<InetSocketAddress, ClientPool>();public synchronized NioSocketChannel
getClient(InetSocketAddress address
) {ClientPool clientPool
= outboxes
.get(address
);if (clientPool
== null
) {outboxes
.putIfAbsent(address
, new ClientPool(pollSize
));clientPool
= outboxes
.get(address
);}int i
= random
.nextInt(pollSize
);if (clientPool
.clients
[i
] != null
&& clientPool
.clients
[i
].isActive()) {return clientPool
.clients
[i
];}synchronized (clientPool
.locks
[i
]) {return clientPool
.clients
[i
] = create(address
);}}private NioSocketChannel
create(InetSocketAddress address
) {clientWorker
= new NioEventLoopGroup(1);Bootstrap bs
= new Bootstrap();ChannelFuture connect
= bs
.group(clientWorker
).channel(NioSocketChannel
.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel
) {ChannelPipeline pipeline
= nioSocketChannel
.pipeline();pipeline
.addLast(new MyDecoder());pipeline
.addLast(new ClientResponses());}}).connect(address
);try {NioSocketChannel client
= (NioSocketChannel
) connect
.sync().channel();return client
;} catch (InterruptedException e
) {e
.printStackTrace();}return null
;}
}
class ClientPool {NioSocketChannel
[] clients
;Object
[] locks
;ClientPool(int size
) {clients
= new NioSocketChannel[size
];locks
= new Object[size
];for (int i
= 0; i
< size
; i
++) {在这里插入代码片locks
[i
] = new Object();}}
}class ClientResponses extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{MyDataPackage data
= (MyDataPackage
) msg
;ResponseMappingHandler
.runCallBack(data
);}
}
class ResponseMappingHandler {static ConcurrentHashMap
<Long, CompletableFuture> mapping
= new ConcurrentHashMap<>();public static void addCallBack(long requestId
, CompletableFuture cb
) {mapping
.putIfAbsent(requestId
, cb
);}public static void runCallBack(MyDataPackage data
) {mapping
.get(data
.getHeader().getRequestId()).complete(data
.getContent().getRes());removeCallBack(data
.getHeader().getRequestId());}public static void removeCallBack(long requestId
) {mapping
.remove(requestId
);}
}
class ServerRequestHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{MyDataPackage data
= (MyDataPackage
) msg
;String ioThreadName
= Thread
.currentThread().getName();ctx
.executor().parent().next().execute(() -> {
String execThreadName
= Thread
.currentThread().getName();MyContent content
= new MyContent();String s
= "io thread: " + ioThreadName
+ " exec thread: " + execThreadName
+ " from args: " + data
.getContent().getArgs()[0];content
.setRes(s
);byte[] contentByte
= MySerializeUtil
.serialize(content
);MyHeader header
= new MyHeader();header
.setFlag(0x14141424);header
.setRequestId(data
.getHeader().getRequestId());header
.setDataLen(contentByte
.length
);byte[] headerByte
= MySerializeUtil
.serialize(header
);ByteBuf byteBuf
= PooledByteBufAllocator
.DEFAULT
.directBuffer(headerByte
.length
+ contentByte
.length
);byteBuf
.writeBytes(headerByte
);byteBuf
.writeBytes(contentByte
);ctx
.writeAndFlush(byteBuf
);});}
}
class MyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext
, ByteBuf byteBuf
, List
<Object> list
) throws Exception
{while (byteBuf
.readableBytes() >= 82) {byte[] bytes
= new byte[82];byteBuf
.getBytes(byteBuf
.readerIndex(), bytes
);ByteArrayInputStream bis
= new ByteArrayInputStream(bytes
);ObjectInputStream objectInputStream
= new ObjectInputStream(bis
);MyHeader myHeader
= (MyHeader
) objectInputStream
.readObject();if (byteBuf
.readableBytes() >= myHeader
.getDataLen()) {byteBuf
.readBytes(82); byte[] data
= new byte[(int) myHeader
.getDataLen()];byteBuf
.readBytes(data
);ByteArrayInputStream byteArrayInputStream
= new ByteArrayInputStream(data
);ObjectInputStream ois
= new ObjectInputStream(byteArrayInputStream
);if (myHeader
.getFlag() == 0x14141414) {MyContent myContent
= (MyContent
) ois
.readObject();list
.add(new MyDataPackage(myHeader
, myContent
));} else if (myHeader
.getFlag() == 0x14141424) {MyContent myContent
= (MyContent
) ois
.readObject();list
.add(new MyDataPackage(myHeader
, myContent
));}} else {break;}}}
}class MyDataPackage {private MyHeader header
;private MyContent content
;public MyDataPackage(MyHeader myHeader
, MyContent myContent
) {this.header
= myHeader
;this.content
= myContent
;}public MyHeader
getHeader() {return header
;}public void setHeader(MyHeader header
) {this.header
= header
;}public MyContent
getContent() {return content
;}public void setContent(MyContent content
) {this.content
= content
;}
}
总结
以上是生活随笔为你收集整理的手撕 RPC 2的全部内容,希望文章能够帮你解决所遇到的问题。
如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。