志宇-dubbo源码分析
dubbo源码分析
- 文档
- dubbo加载配置文件
- dubboSPI
- dubbo服务提供
- 1.校验配置信息
- 2.创建URL
- 3.本地注册
- 4.远程注册
- 4.1 开启netty服务端
- 4.2 连接注册中心zk,写入信息
- 4.3监听注册中心
- dubbo引用服务
- 1.引用服务入口
- 2.服务引用的invoker创建
- 2.1本地服务引用invoker创建
- 2.2远程服务引用invoker创建
- DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
- RegistryProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
- 3.通过invoker 创建代理对象
- 集群容错
- 服务目录 Directory
- 服务路由 Router
- 总结
- dubbo生产者初始化过程
- 消费者初始化过程
- 服务调用过程
- 消费者向生产者发送请求
- 消费者向生产者发送参数的编码
- 生产者接收请求解码过程
- 生产者根据请求信息调用指定方法
- 消费者对生产者发送的参数解码
- 消费者接收生产者发送的参数
- 消费者接收生产者响应,如何找到消费者发起请求的线程
- 经典问题
- dubbo 怎么如何追踪dubbo id的
文档
官方文档
gitbook 文档
dubbo加载配置文件
dubbo-2.6.4.jar 中spring.handlers 文件中指明解析xml的类
文件内容如下
DubboNamespaceHandler 的init 方法会
创建多个DubboBeanDefinitionParser 在其parse方法中会解析xml
parse方法会将xml中配置的每个对象放在一个BeanDefinition中
上面代码将xml配置信息解析成BeanDefinition 放到了spring容器中
dubboSPI
目的,用来在启动时候不初始化实现类,再调用时候选择初始化实现类
spi中可以通过spring进行属性自动注入
dubbo服务提供
dubbo提供服务主要有如下步骤
1.将dubbo的配置信息在spring容器中获得,同时校验配置信息
2.配置信息参数校验,根据配置信息创建url对象
-------(服务端和客户端相互调用的主要信息都存储在URL中)
3.创建Invoker对象
-------(Invoker对象使用Javassist对指定接口进行包装,对其进行参数校验 同时调用时才加载实现类)
4.本地注册 (将服务根据名字和Invoker放到一个map中)
5.远程注册
5.1 开启netty服务端
5.2 连接注册中心zk,写入信息
5.3 监听注册中心
1.校验配置信息
dubbo提供服务的入口在ServiceBean类中,先观察下ServiceBean类吧
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {}先分析下 InitializingBean 接口的方法 afterPropertiesSet()
这个方法会在项目启动时spring调用初始化方法时候执行
接下来分析 ApplicationContextAware 接口中的方法 setApplicationContext()
这个方法会在spring初始化回调注入ApplicationContext对象时调用
方法中 通过反射将ServiceBean(自己)添加到了spring的监听器中,
serviceBean是ApplicationListener 接口实现类,也就是一个监听器
部分代码如下
接下来分析ApplicationListener 类中的方法 onApplicationEvent(ContextRefreshedEvent event)
spring初始化完成后会调用下面方法
2.创建URL
我们通常会做如下配置,这个配置用对应存储到ServiceConfig中
<dubbo:service interface="com.alibaba.dubbo.config.spring.api.DemoService" ref="demoService" />接下来继续研究 ServiceBean 类中的 export() 方法
//首先进行一系列校验 //interfaceName 变量是xml 中dubbo:service 中配置的interface属性 //加载下类,判断这个类是否存在interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());//导出服务doExportUrls();接下来继续研究 doExportUrls()方法
private void doExportUrls() {//获得所有的注册中心地址,dubbo可以配置多个注册中心List<URL> registryURLs = loadRegistries(true);//每个注册中心可以配置多个协议for (ProtocolConfig protocolConfig : protocols) {//根据协议和注册中心 提供服务doExportUrlsFor1Protocol(protocolConfig, registryURLs);} }接下来继续研究 doExportUrlsFor1Protocol(protocolConfig,registryURLs) 方法
//将配置的信息取出来放到一个map中,然后用这个map去创建URL对象 URL url = new URL(name, host, port,provider.getContextpath() + path, map); //url中的方法 getParameter(key,defaultValue)用来获得map中的信息这时URL对象就创建好了
3.本地注册
doExportUrlsFor1Protocol() 方法继续往下走
if (!"remote".equalsIgnoreCase(scope)){//本地服务注册exportLocal(url);}接下来研究 exportLocal(url)方法
private void exportLocal(URL url) {if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL) //Constants.LOCAL_PROTOCOL这个值是injvm,所以会调用InjvmProtocol.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));//在这里会调用Protocol.class 的实现类 InjvmProtocol中的export方法Exporter<?> exporter = protocol.export(//这个ref是<dubbo:service> 中的ref 属性(引用一个bean的id名字)//这里会创建一个InvokerproxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");}}接下来研究proxyFactory 的 getInvoker(ref,interfaceClass,local) 方法,这个方法会创建一个Invoker对象
因ProxyFactory接口上注解为@SPI(“javassist”),所以会调用JavassistProxyFactory类
getInvoker方法如下
Wrapper 对象是怎么创建的呢,咱们接着分析 Wrapper.getWrapper(ref)
public static Wrapper getWrapper(Class<?> c) {while (ClassGenerator.isDynamicClass(c)) c = c.getSuperclass();//如果是object类型直接返回OBJECT_WRAPPER一个简单的对象if (c == Object.class)return OBJECT_WRAPPER;//看缓存中有没有Wrapper ret = WRAPPER_MAP.get(c);if (ret == null) {//缓存中没有则直接创建这个ref类的代理对象ret = makeWrapper(c);//将对象放到缓存WRAPPER_MAP.put(c, ret);}return ret; }接下来我们分析 makeWrapper(ref) 方法,它到底是如何创建Wrapper 对象的呢
//这个类中的代码很多,它主要创建一个对象的代理,这个代理对象会根据被调用时根据参数加载要调用的实现类 //首先将所有的字段都创建for (Field f : c.getFields()) {String fn = f.getName();Class<?> ft = f.getType();if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))continue;c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");pts.put(fn, ft);} //然后将所有的方法都一次创建for (Method m : methods) {if (m.getDeclaringClass() == Object.class) //ignore Object's method.continue;String mn = m.getName();c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");int len = m.getParameterTypes().length;c3.append(" && ").append(" $3.length == ").append(len);boolean override = false;for (Method m2 : methods) {if (m != m2 && m.getName().equals(m2.getName())) {override = true;break;}}if (override) {if (len > 0) {for (int l = 0; l < len; l++) {c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"").append(m.getParameterTypes()[l].getName()).append("\")");}}}c3.append(" ) { ");if (m.getReturnType() == Void.TYPE)c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");elsec3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");c3.append(" }");mns.add(mn);if (m.getDeclaringClass() == c)dmns.add(mn);ms.put(ReflectUtils.getDesc(m), m);} //get set is 开头方法的通过正则匹配再进行处理for (Map.Entry<String, Method> entry : ms.entrySet()) {String md = entry.getKey();Method method = (Method) entry.getValue();if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {Class<?> pt = method.getParameterTypes()[0];String pn = propertyName(matcher.group(1));c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");pts.put(pn, pt);}}//根据信息生成class文件Class<?> wc = cc.toClass();// setup static field.wc.getField("pts").set(null, pts);wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));wc.getField("mns").set(null, mns.toArray(new String[0]));wc.getField("dmns").set(null, dmns.toArray(new String[0]));int ix = 0;for (Method m : ms.values())wc.getField("mts" + ix++).set(null, m.getParameterTypes());return (Wrapper) wc.newInstance();这时Invoke 对象创建好了,这个对象会对传入对象进行包装,解决调用时才加载实现类
因为是本地注册服务,所以该将创建的对象存起来了
接下来研究 exportLocal(url)方法中 InjvmProtocol 类执的 expor 方法
4.远程注册
接着研究doExportUrlsFor1Protocol(protocolConfig,registryURLs)的后续代码
代码如下
4.1 开启netty服务端
接下来研究 protocol.export(wrapperInvoker) 方法
首先protocol是怎么创建的呢
接下来我们研究下 getAdaptiveExtension() 方法
这个方法会动态创建 Protocol的代理类,代理类代码如下
根据上面代码得出是根据传入参数 url 中的 protocol 参数调用指定的实现类
这时 protocol 的参数是 registry 所以调用 RegistryProtocol 类中的 export() 方法
接下来分析export()中开启Netty服务的代码
接下来分析 doLocalExport(originInvoker) 方法,代码如下
String key = getCacheKey(originInvoker);ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);bounds.put(key, exporter);}}}return exporter;上面代码最主要就 protocol.export(invokerDelegete) 方法
接下来分析这个方法
首先这个 protocol.export() 会调用哪个实现类,这里protocol对象是这个类的成员变量
这个类的实现类一般是 DubboProtocol 那么接下来看 DubboProtocol 的方法 exprot()
接下来分析 openServer(url) 方法
代码如下
接下分析 createServer(url) 方法
主要代码如下
接下来就分析 Exchangers.bind(url, requestHandler); 方法吧
调用过程就不细说了,最后会调用 HeaderExchanger 类的 bind方法
代码如下
接下来分析 Transporters.bind(url,handler) 方法,经过spi调用会调用到实现类NettyTransporter
代码如下,我们分析Netty3.x版本的吧
接下来我们看下 NettyServer(url, listener); 都做了什么
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {//这里对handler进行了包装//包装成了如下类型 //new MultiMessageHandler(new HeartbeatHandler(//new AllChannelHandler(new DecodeHandler(//new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }接下来分析 new NettyServer(url, listener);的父类构造方法做了什么
跳过简单调用,直接看主要代码
这样就开启NettyServer 服务端,那么服务端是如何处理请求的呢
我们先将处理请求的类记下,用于后续分析
adapter.getDecoder() //编码器 使用 NettyCodecAdapter 类
adapter.getEncoder() //解码器 使用 NettyCodecAdapter 类
nettyHandler //处理请求的handler 使用如下类进行了包装
new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))))))
4.2 连接注册中心zk,写入信息
接下来接着分析RegistryProtocol 类的 export(Invoker) 方法
export部分代码如下
我们先分析下getRegistry(originInvoker);方法
然后再分析register(registryUrl, registeredProviderUrl);方法
在 getRegistry(originInvoker); 方法中 如果注册中心以zk为例子,跳过调用代码最后会调用到CuratorZookeeperClient 类的CuratorZookeeperClient(URL url)方法,
下面代码为连接zk,代码如下
接下来分析 register(registryUrl, registeredProviderUrl);方法
去掉调用代码,代码如下
接下来我们分析 doRegister(url) 方法
代码如下,下面代码会在zk中创建一个持久化节点
4.3监听注册中心
接下来接着分析RegistryProtocol 类的 export(Invoker) 方法,export部分代码如下
//创建urlfinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);//创建一个监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);//放到缓存中overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);//订阅消息registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);dubbo引用服务
1.引用服务入口
服务引用要在xml中配置如下信息
<!-- 增加引用远程服务配置 --> <dubbo:reference id=“xxxService” interface=“com.xxx.XxxService” /> <!-- 和本地服务一样使用远程服务 --> <bean id=“xxxAction” class=“com.xxx.XxxAction”> <property name=“xxxService” ref=“xxxService” /> </bean>上面的配置信息会被解析到 ReferenceConfig 类中
服务引用的入口类是 ReferenceBean,接下来我们分析它,先看下它的实现
先看 InitializingBean 接口的 afterPropertiesSet() 方法
方法中获得配置信息存储到 ReferenceConfig 的成员变量中
如果不是懒加载则直接调用getObject()方法创建对象
接下来看 FactoryBean 的接口实现,既然是FactoryBean类型那么就看下它能创建什么对象吧
代码如下
接下来分析 get() 吧
代码如下
接下来分析 init() 方法吧
主要代码如下
上面代码加载文件是为了
通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段
2.服务引用的invoker创建
接下来分析createProxy(map); 中的方法
代码如下(省略非主要代码)
2.1本地服务引用invoker创建
我们先看一下 InjvmProtocol 中的 refprotocol.refer(interfaceClass, url)方法
代码如下
直接创建 InjvmInvoker 对象,挺简单的
2.2远程服务引用invoker创建
DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
接下来我们来看DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
代码如下
下面最主要的方法是getClients(url),代码如下
private ExchangeClient[] getClients(URL url) {// whether to share connectionboolean service_share_connect = false;int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (service_share_connect) {// 获取共享客户端clients[i] = getSharedClient(url);} else {clients[i] = initClient(url);}}return clients;}接下来分析getSharedClient(url); 方法
代码如下
接下来分析 initClient(url) 方法
代码如下
接下来研究 Exchangers.connect(url, requestHandler) 方法,
跳过调用代码,直接进入HeaderExchanger 类的connect(url,hanlder)方法
接下来分析Transporters.connect(url,hanlder)方法
跳过调用代码进入到 NettyTransporter 类的connect(url, listener)方法
代码如下
接下来new NettyClient(url, listener)都做了什么
代码如下
我们先看下wrapChannelHandler(url,handler) 对handler做了哪些包装,一会再看 NettyClient 父类的构造方法,wrapChannelHandler(url,handler) 代码如下
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);return ChannelHandlers.wrap(handler, url); }接下来分析ChannelHandlers.wrap(handler, url)方法
代码如下
现在的handler类型就是这样的
new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))
看上去比较恶心,但是他就是要用这样的handler,我们要记住他,以后分析
看完handler的包装再看NettyClient 父类的构造方法吧
主要代码如下
下面来看下doOpen()方法
protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();bootstrap = new ClientBootstrap(channelFactory);// config// @see org.jboss.netty.channel.socket.SocketChannelConfigbootstrap.setOption("keepAlive", true);bootstrap.setOption("tcpNoDelay", true);bootstrap.setOption("connectTimeoutMillis", getTimeout());//再一次对handler进行包装现在handler包装的信息如下//new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new //AllChannelHandler(new DecodeHandler(new //HeaderExchangeHandler(new ExchangeHandlerAdapter())))))final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});}上面代码只创建对象不进行连接
下面来看下connect()方法,代码如下
这样客户端的服务器起来了
我们要记住客户端的handler包装信息(和服务端的一样)有
new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))
RegistryProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
接下来分析 RegistryProtocol 类的 refer 方法
代码如下
接下来看doRefer方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建 RegistryDirectory 实例RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);// 设置注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());// 生成urlURL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);// 注册服务消费者,在 consumers 目录下新节点if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}// 订阅 providers、configurators、routers 等节点数据directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker; }如上,doRefer 方法创建一个 RegistryDirectory 实例(RegistryDirectory 会同步生产者的服务信息,根据可以调用的服务创建对应的DubboInvoker对象存储到RegistryDirectory 中),然后生成消费者url,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。
由于一个服务可能部署在多台服务器上,这样生产者就会在注册中心上注册多个服务,这个时候就需要 Cluster 在多个服务中选择一个invoker(负载均衡)返回
3.通过invoker 创建代理对象
接下来研究 ReferenceConfig 的 proxyFactory.getProxy(invoker) 方法,就是对invoker进行下代理
接下来看JavassistProxyFactory的 getProxy 方法
最后生成的代理类是这样的
package org.apache.dubbo.common.bytecode;public class proxy0 implements org.apache.dubbo.demo.DemoService {public static java.lang.reflect.Method[] methods;private java.lang.reflect.InvocationHandler handler;public proxy0() {}public proxy0(java.lang.reflect.InvocationHandler arg0) {handler = $1;}public java.lang.String sayHello(java.lang.String arg0) {Object[] args = new Object[1];args[0] = ($w) $1;Object ret = handler.invoke(this, methods[0], args);return (java.lang.String) ret;} }集群容错
官方文档上说集群容错分为
服务目录 Directory、服务路由 Router、集群 Cluster、负载均衡 LoadBalance
服务目录是什么: 生产者启动时会将可以调用的服务存储到zk上。消费者启动后拉取zk上可以调用的服务,根据每个服务信息创建一个Invoker对象,将所有的invoker存储到服务目录 Directory中,Directory中存储的invoker信息会随着生产者提供服务信息变化。
服务路由 Router的作用: 服务路由规定了服务消费者可调用哪些服务提供者。
服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,条件路由规则信息如下
| [服务消费者ip] => [服务提供者ip] |
说明服务消费者ip 可以调用此服务提供者的ip
集群 Cluste: 在dubbo中 Cluste 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理打印日志还是抛异常),现在都交给集群模块去处理
负载均衡 LoadBalance:调用时在多个invoker中通过权重和策略的参数选择一个invoker
服务目录 Directory
通常使用RegistryDirectory,类图如下
Directory 继承自 Node 接口,这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。
AbstractDirectory 实现了 Directory 接口,这个接口包含了一个重要的方法定义,list(Invocation),用于获取所有 Invoke对象,这里的invoke对象可以调用生产者的服务。
RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,可以通过此接口方法得到变更invoke信息
RegistryDirectory是在哪里初始化的呢,肯定是消费者使用到它了
在RegistryProtocol 的 doRefer 方法进行的初始化
接下来分析 RegistryDirectory 是如何将zk中的生产者服务信息同步到每个invoker对象中的
先看父类的 list 方法,它可以返回所有的invoker对象,代码如下
接下来看下 RegistryDirectory 的 notify方法,如果zk中生产者提供的服务信息发生变化则会调用此方法,在方法中根据变化的信息同步到对应的invoker对象,代码如下
public synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();for (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category)|| Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}// configuratorsif (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls);}// routersif (routerUrls != null && !routerUrls.isEmpty()) {List<Router> routers = toRouters(routerUrls);if (routers != null) { // null - do nothingsetRouters(routers);}}List<Configurator> localConfigurators = this.configurators; // local reference// merge override parametersthis.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);}}// providersrefreshInvoker(invokerUrls);}上面代码最主要方法是 refreshInvoker(invokerUrls)
代码如下
将上面代码是将url信息转换成成invoker存储到 methodInvokerMap 和 urlInvokerMap 成员变量中
服务路由 Router
我们什么时候要用到Router,当我们要配置dubbo服务的白名单黑名单 或一个服务提供给不同的消费者。在服务字典获所有invoker的时候,会通过路由过滤掉不能调用的invoker
先看下Router 的关系图
有两个实现类
条件路由 ConditionRouter 和 脚本路由 ScriptRouter
先看下条件路由 ConditionRouter 吧
ConditionRouter 的构造方法代码如下
根据上面代码得出,根据传入的 url 对象,通过 parseRule 方法解析,将解析的信息赋值给成员变量下面我们来看下 parseRula方法吧,代码如下
private static Map<String, MatchPair> parseRule(String rule)throws ParseException {// 定义条件映射集合Map<String, MatchPair> condition = new HashMap<String, MatchPair>();if (StringUtils.isBlank(rule)) {return condition;}MatchPair pair = null;Set<String> values = null;// 通过正则表达式匹配路由规则,ROUTE_PATTERN = ([&!=,]*)\s*([^&!=,\s]+)// 这个表达式看起来不是很好理解,第一个括号内的表达式用于匹配"&", "!", "=" 和 "," 等符号。// 第二括号内的用于匹配英文字母,数字等字符。举个例子说明一下:// host = 2.2.2.2 & host != 1.1.1.1 & method = hello// 匹配结果如下:// 括号一 括号二// 1. null host// 2. = 2.2.2.2// 3. & host// 4. != 1.1.1.1 // 5. & method// 6. = hellofinal Matcher matcher = ROUTE_PATTERN.matcher(rule);while (matcher.find()) {// 获取括号一内的匹配结果String separator = matcher.group(1);// 获取括号二内的匹配结果String content = matcher.group(2);// 分隔符为空,表示匹配的是表达式的开始部分if (separator == null || separator.length() == 0) {// 创建 MatchPair 对象pair = new MatchPair();// 存储 <匹配项, MatchPair> 键值对,比如 <host, MatchPair>condition.put(content, pair); } // 如果分隔符为 &,表明接下来也是一个条件else if ("&".equals(separator)) {// 尝试从 condition 获取 MatchPairif (condition.get(content) == null) {// 未获取到 MatchPair,重新创建一个,并放入 condition 中pair = new MatchPair();condition.put(content, pair);} else {pair = condition.get(content);}} // 分隔符为 =else if ("=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule ...");values = pair.matches;// 将 content 存入到 MatchPair 的 matches 集合中values.add(content);} // 分隔符为 != else if ("!=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule ...");values = pair.mismatches;// 将 content 存入到 MatchPair 的 mismatches 集合中values.add(content);}// 分隔符为 ,else if (",".equals(separator)) {if (values == null || values.isEmpty())throw new ParseException("Illegal route rule ...");// 将 content 存入到上一步获取到的 values 中,可能是 matches,也可能是 mismatchesvalues.add(content);} else {throw new ParseException("Illegal route rule ...");}}return condition; }上面代码做了什么呢,它就是将 url对象中的信息进行解析,然后将解析的数据返回
接下来看ConditionRouter 的 route 方法
上面代码主要通过 matchWhen、matchThen 方法进行匹配路由,看传递过来的参数判断是否可以访问
总结
dubbo生产者初始化过程
首先通过 DubboNamespaceHandler 解析配置文件,每有一个dubbo:service标签即创建一个ServiceBean 类
每个servicebean会在项目启动的时候通过 setApplicationContext 方法 向spring添加一个监听器
监听器会初始化生产者的服务
监听器是怎么初始化生产者的服务的下面继续分析
首先循环遍历所有注册中心,然后遍历所有协议,
根据注册中心和协议 对每个服务进行导出
根据配置信息创建URL对象
进行导出服务,如果要导出服务到本地
–》根据实现类ref对象和URL对象创建 invoker对象,然后缓存起来
进行导出服务,如果要导出服务到远程
–》根据实现类ref对象和URL对象创建 invoker对象,invoker就是对ref对象进行简单的代理
–》根据invoker创建DubboExporter对象(用于处理请求),DubboExporter对invoker对象进行包装
–》开启netty服务端
–》获得注册中心地址,连接注册中心,检测注册中心开启情况,如果变化给所有监听器进行回调
–》每有一个服务则在注册中心上创建一条信息
–》然后订阅 override 数据
消费者初始化过程
首先通过 DubboNamespaceHandler 解析配置文件, 每有一个 dubbo:reference 创建一个ReferenceBean 类
ReferenceBean 是一个factorybean, 可以通过它来创建一个单例的代理类(ref 要调用接口的实现类)
这个factorybean会创建一个什么样的代理对象呢,下面进行分析
如果是本地引用则 根据实现类(ref) 创建一个invoker对象
如果是远程引用则 根据实现类(ref) 创建一个DubboInvoker对象
–> 每有一个服务提供者则创建一个 DubboInvoker,DubboInvoker对象的invoker方法会调用netty的client的send方法(DubboInvoker可以向服务端发起调用请求)
–> 一个服务有多个提供者的话,会创建多个DubboInvoker放到集合中,然后创建AbstractClusterInvoker实现类下的对象进行集群管理
–> 集群管理包括(调用服务失败后抛异常还是调用别的生产者、负载均衡策略选择)
开启netty客户端,连接注册中心,向注册中心中写入服务,初始化服务字典RegistryDirectory
在服务字典(服务端每有一个服务则创建一个DubboInvoker存储到服务字典中)中添加监听。
根据invoker创建对应的代理proxy对象返回。 proxy是对invoker对象进行了包装。
服务调用过程
消费者向生产者发送请求
DubboProductInterface obj=(DubboProductInterface)application.getBean("DubboProductInterface") obj.sayHello();上面代码会通过getBean() 方法 创建 DubboProductInterface 接口的代理对象,每次调用这个代理对象的方法都会发起远程调用,这个代理对象是什么样子呢?
在spring容器中通过单例的factorybean会创建一个DubboProductInterface 类型的代理对象 如果生产者非集群部署(一个服务的提供者只有一个) --->也就是生产者只有一个则直接根据 生产者提供的服务信息 返回一个DubboInvoker对象。 如果生产者是集群部署的话 (一个服务的提供者有多个) --->会通过AbstractClusterInvoker 将多个DubboInvoker进行包装,包装后对多个DubboInvoker进行集群管理 --->(一个服有几个生产者提供就有几个DubboInvoker) --->然后返回AbstractClusterInvoker 对象AbstractClusterInvoker 的 invoke 方法做了什么?
假如调用的AbstractClusterInvoker的实现类是 FailoverClusterInvoker 他的作用是 调用失败自动切换生产者,也就是调用生产者失败后调用别的生产者 FailoverClusterInvoker 的doinvoke方法会先获得重试次数(失败后最多调用几次) 通过LoadBalance 的实现类进行负责均衡(从多个DubboInvoker中选择一个返回), 调用AbstractInvoker中的invoke方法,也就是DubboInvoker中的doinvoke方法 如果调用失败则根据重试次数进行多次调用,调用了重试次数次 都失败则抛异常。在DubboInvoker中调用分为很多种包括,异步有返回值调用、异步没有返回值调用、同步调用(同步调用可定有返回值,空也算返回值)
在这就分析同步调用吧,在DubboInvoker 的doinvoke 方法中
currentClient.request(inv, timeout).get(); 在这里的get() 会阻塞等待返回的数据
调用流程如下
消费者向生产者发送参数的编码
接下来分析编码调用流程
在 NettyClient中设置编码器, pipeline.addLast("encoder", adapter.getEncoder()); adapter.getEncoder() 会创建一个内部类 InternalEncoder() InternalEncoder() 的encode()方法 调用codec.encode(channel, buffer, msg); 也就是 ExchangeCodec 的encode方法 的 encodeRequest()方法 encodeRequest() 方法中通过channel将对象信息发送给服务端生产者接收请求解码过程
解码调用过程如下
ExchangeCodec --》decodeExchangeCodec --》decode(channel, buffer, readable, header)ExchangeCodec --》decodeBody(channel, is, header)DubboCodec --》decodeBody() 创建一个new DecodeableRpcInvocation() 对象返回DecodeableRpcInvocation ---》 decode() 最终会返回一个DecodeableRpcInvocation 对象 解码流程结束,将发送的信息封装到一个request对象返回, 这个request对象的类型是ecodeableRpcInvocation生产者根据请求信息调用指定方法
接下来就是将这个request对象交给hanlder进行处理,处理request的handler经过一系列包装,包装类层次如下
new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))))))接下来看调用栈吧
NettyHandler -》 messageReceived() netty框架用来接收请求的入口MultiMessageHandler -》 receivedHeartbeatHandler -》 receivedAllChannelHandler -》 received 开启线程ChannelEventRunnable -》 run() 根据通道类型进行调用不同方法DecodeHandler -》receivedHeaderExchangeHandler -》 received 调用方法,将调用方法的结果返回的结果发送回去channel.send(response);HeaderExchangeHandler -》 handleRequest(exchangeChannel, request); 处理请求ExchangeHandlerAdapter -》reply 也就是 DubboProtocol -》reply 根据传递的参数 获得对应的DubboExporterDubboExporter 中包含着对应的invokerAbstractProxyInvoker -》invoke JavassistProxyFactory -》getInvoker 即可调用指定方法消费者对生产者发送的参数解码
消费者接收生产者响应解码
NettyClient 的 adapter.getEncoder() 方法进行解码, 最后会到 ExchangeCodec 的 encodeResponse(channel, buffer, (Response) msg); 方法 然后DubboCodec 的 decodeBody 方法 返回一个new DecodeableRpcResult 请求对象 DecodeableRpcResult ==》decode() 方法,将参数存储到DecodeableRpcResult对象中消费者接收生产者发送的参数
解码完成后,接收使用handler解析DecodeableRpcResult
还是那个包装了很多次的handler 跳过部分handler直接进入主要的逻辑 HeaderExchangeHandler received HeaderExchangeHandler handleResponse(channel, (Response) message); 在这里移除message 同时将请求传递的response参数放到DefaultFuture的成员变量中然后唤醒用户线程消费者接收生产者响应,如何找到消费者发起请求的线程
消费者调用生产者
生产者没返回调用结果时,消费者会阻塞,在哪里阻塞的呢
先看下DefaultFuture 中都做了什么
DefaultFuture 中有个静态map,存储着返回信息请求id 和 DefaultFuture对象 DefaultFuture 静态代码块会开启一个守护线程,如果相应超过固定时间则将DefaultFuture从静态map中移除DefaultFuture 的get方法都做了什么,将代码简写
private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();private volatile Response response;public Object get(int timeout) throws RemotingException {//如果返回值为空if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {//在lock锁中进行阻塞,相当于在synchronized中使用wait,要用notify进行唤醒//synchronized 中的notify是随机唤醒,不能指定唤醒//lock锁中的 await可以被指定唤醒done.await(timeout, TimeUnit.MILLISECONDS);//是否超过响应时间if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} finally {//释放锁lock.unlock();}}return returnFromResponse(); }在生产者返回响应是如何唤醒线程的呢
在解码后交给handler进行处理,直接看核心代码
HeaderExchangeHandler —》 handleResponse(channel, (Response) message);
接下来看 DefaultFuture.received(channel, response)
public static void received(Channel channel, Response response) {try {//根据id获得对应的DefaultFuture //通过id将静态map中的DefaultFuture 移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//在方法中调用 done.signal(); 进行唤醒future.doReceived(response);} }} finally {CHANNELS.remove(response.getId());} }接下来看 future.doReceived(response);
private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {//进行唤醒done.signal();}} finally {lock.unlock();}if (callback != null) {invokeCallback(callback);} }经典问题
Dubbo 中注册中心可以使用哪些,协议可以使用哪些
Dubbo 中失败重试次数怎么设置
Dubbo 请求是否异步,异步是否需要返回值,怎么配置
Douuo 消费者调用生产者,消费者阻塞生产者返回结果,如何根据结果找到消费者上正在阻塞的线程
dubbo 怎么如何追踪dubbo id的
如果非集群服务则会调用 DubboInvoker 的 doInvoke方法,如果是异步有返回值则调用 ResponseFuture future = currentClient.request(inv, timeout); 方法 //HeaderExchangeChannel中创建Future 首先 DefaultFuture中有一个静态 Map,map的key为id,value为本次请求的DefaultFuture,同时会开一个守护线程来将超时的DefaultFuture移除 假如是异步请求,每当一次请求,就创建一个 本次请求的DefaultFuture还维护着一个lock锁 用于阻塞和释放当前线程(使用wait和notify的话不能唤醒指定线程), 为了防止静态 Map中本次请求的DefaultFuture被移除导致找不到DefaultFuture, RpcContext将本次请求的DefaultFuture存储到在threadlocal中,DefaultFuture中也有id)总结
以上是生活随笔为你收集整理的志宇-dubbo源码分析的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: ESD失效怎么办
- 下一篇: 虚拟化运维中:为什么对网络流量监控这么重