欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

志宇-dubbo源码分析

发布时间:2024/3/26 编程问答 53 豆豆
生活随笔 收集整理的这篇文章主要介绍了 志宇-dubbo源码分析 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

dubbo源码分析

  • 文档
  • dubbo加载配置文件
  • dubboSPI
  • dubbo服务提供
    • 1.校验配置信息
    • 2.创建URL
    • 3.本地注册
    • 4.远程注册
    • 4.1 开启netty服务端
    • 4.2 连接注册中心zk,写入信息
    • 4.3监听注册中心
  • dubbo引用服务
    • 1.引用服务入口
    • 2.服务引用的invoker创建
      • 2.1本地服务引用invoker创建
      • 2.2远程服务引用invoker创建
        • DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
        • RegistryProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
    • 3.通过invoker 创建代理对象
  • 集群容错
    • 服务目录 Directory
    • 服务路由 Router
  • 总结
    • dubbo生产者初始化过程
    • 消费者初始化过程
  • 服务调用过程
    • 消费者向生产者发送请求
    • 消费者向生产者发送参数的编码
    • 生产者接收请求解码过程
    • 生产者根据请求信息调用指定方法
    • 消费者对生产者发送的参数解码
    • 消费者接收生产者发送的参数
      • 消费者接收生产者响应,如何找到消费者发起请求的线程
    • 经典问题
      • dubbo 怎么如何追踪dubbo id的

文档

官方文档
gitbook 文档

dubbo加载配置文件

dubbo-2.6.4.jar 中spring.handlers 文件中指明解析xml的类
文件内容如下

http\://dubbo.apache.org/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

DubboNamespaceHandler 的init 方法会
创建多个DubboBeanDefinitionParser 在其parse方法中会解析xml
parse方法会将xml中配置的每个对象放在一个BeanDefinition中

上面代码将xml配置信息解析成BeanDefinition 放到了spring容器中

dubboSPI

目的,用来在启动时候不初始化实现类,再调用时候选择初始化实现类
spi中可以通过spring进行属性自动注入

dubbo服务提供

dubbo提供服务主要有如下步骤
1.将dubbo的配置信息在spring容器中获得,同时校验配置信息
2.配置信息参数校验,根据配置信息创建url对象
-------(服务端和客户端相互调用的主要信息都存储在URL中)
3.创建Invoker对象
-------(Invoker对象使用Javassist对指定接口进行包装,对其进行参数校验 同时调用时才加载实现类)
4.本地注册 (将服务根据名字和Invoker放到一个map中)
5.远程注册
5.1 开启netty服务端
5.2 连接注册中心zk,写入信息
5.3 监听注册中心

1.校验配置信息

dubbo提供服务的入口在ServiceBean类中,先观察下ServiceBean类吧

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {}

先分析下 InitializingBean 接口的方法 afterPropertiesSet()
这个方法会在项目启动时spring调用初始化方法时候执行

方法中 将spring容器中 ModuleConfig.class、RegistryConfig.class、 MonitorConfig.class、 ProtocolConfig.class 类型的 BeanDefinition 对象全部获得,同时校验对象信息

接下来分析 ApplicationContextAware 接口中的方法 setApplicationContext()
这个方法会在spring初始化回调注入ApplicationContext对象时调用
方法中 通过反射将ServiceBean(自己)添加到了spring的监听器中,
serviceBean是ApplicationListener 接口实现类,也就是一个监听器
部分代码如下

//这样在spring启动完成会触发ApplicationListener 类中的 onApplicationEvent() Method method = applicationContext.getClass().getMethod("addApplicationListener", new Class<?>[]{ApplicationListener.class}); method.invoke(applicationContext, new Object[]{this});

接下来分析ApplicationListener 类中的方法 onApplicationEvent(ContextRefreshedEvent event)
spring初始化完成后会调用下面方法

public void onApplicationEvent(ContextRefreshedEvent event) {//如果是非延迟导出 && !导出过 && !不用导出if (isDelay() && !isExported() && !isUnexported()) {if (logger.isInfoEnabled()) {logger.info("The service ready on spring started. service: " + getInterface());}//导出服务 就这一句话实现服务提供//接下来分析export()方法吧export();} }

2.创建URL

我们通常会做如下配置,这个配置用对应存储到ServiceConfig中

<dubbo:service interface="com.alibaba.dubbo.config.spring.api.DemoService" ref="demoService" />

接下来继续研究 ServiceBean 类中的 export() 方法

//首先进行一系列校验 //interfaceName 变量是xml 中dubbo:service 中配置的interface属性 //加载下类,判断这个类是否存在interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());//导出服务doExportUrls();

接下来继续研究 doExportUrls()方法

private void doExportUrls() {//获得所有的注册中心地址,dubbo可以配置多个注册中心List<URL> registryURLs = loadRegistries(true);//每个注册中心可以配置多个协议for (ProtocolConfig protocolConfig : protocols) {//根据协议和注册中心 提供服务doExportUrlsFor1Protocol(protocolConfig, registryURLs);} }

接下来继续研究 doExportUrlsFor1Protocol(protocolConfig,registryURLs) 方法

//将配置的信息取出来放到一个map中,然后用这个map去创建URL对象 URL url = new URL(name, host, port,provider.getContextpath() + path, map); //url中的方法 getParameter(key,defaultValue)用来获得map中的信息

这时URL对象就创建好了

3.本地注册

doExportUrlsFor1Protocol() 方法继续往下走

if (!"remote".equalsIgnoreCase(scope)){//本地服务注册exportLocal(url);}

接下来研究 exportLocal(url)方法

private void exportLocal(URL url) {if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL) //Constants.LOCAL_PROTOCOL这个值是injvm,所以会调用InjvmProtocol.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));//在这里会调用Protocol.class 的实现类 InjvmProtocol中的export方法Exporter<?> exporter = protocol.export(//这个ref是<dubbo:service> 中的ref 属性(引用一个bean的id名字)//这里会创建一个InvokerproxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");}}

接下来研究proxyFactory 的 getInvoker(ref,interfaceClass,local) 方法,这个方法会创建一个Invoker对象
因ProxyFactory接口上注解为@SPI(“javassist”),所以会调用JavassistProxyFactory类
getInvoker方法如下

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// 创建一个包装类,返回的Invoker对象执行所有的方法都调用包装类中的invokeMethod方法final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}

Wrapper 对象是怎么创建的呢,咱们接着分析 Wrapper.getWrapper(ref)

public static Wrapper getWrapper(Class<?> c) {while (ClassGenerator.isDynamicClass(c)) c = c.getSuperclass();//如果是object类型直接返回OBJECT_WRAPPER一个简单的对象if (c == Object.class)return OBJECT_WRAPPER;//看缓存中有没有Wrapper ret = WRAPPER_MAP.get(c);if (ret == null) {//缓存中没有则直接创建这个ref类的代理对象ret = makeWrapper(c);//将对象放到缓存WRAPPER_MAP.put(c, ret);}return ret; }

接下来我们分析 makeWrapper(ref) 方法,它到底是如何创建Wrapper 对象的呢

//这个类中的代码很多,它主要创建一个对象的代理,这个代理对象会根据被调用时根据参数加载要调用的实现类 //首先将所有的字段都创建for (Field f : c.getFields()) {String fn = f.getName();Class<?> ft = f.getType();if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))continue;c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");pts.put(fn, ft);} //然后将所有的方法都一次创建for (Method m : methods) {if (m.getDeclaringClass() == Object.class) //ignore Object's method.continue;String mn = m.getName();c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");int len = m.getParameterTypes().length;c3.append(" && ").append(" $3.length == ").append(len);boolean override = false;for (Method m2 : methods) {if (m != m2 && m.getName().equals(m2.getName())) {override = true;break;}}if (override) {if (len > 0) {for (int l = 0; l < len; l++) {c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"").append(m.getParameterTypes()[l].getName()).append("\")");}}}c3.append(" ) { ");if (m.getReturnType() == Void.TYPE)c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");elsec3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");c3.append(" }");mns.add(mn);if (m.getDeclaringClass() == c)dmns.add(mn);ms.put(ReflectUtils.getDesc(m), m);} //get set is 开头方法的通过正则匹配再进行处理for (Map.Entry<String, Method> entry : ms.entrySet()) {String md = entry.getKey();Method method = (Method) entry.getValue();if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {Class<?> pt = method.getParameterTypes()[0];String pn = propertyName(matcher.group(1));c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");pts.put(pn, pt);}}//根据信息生成class文件Class<?> wc = cc.toClass();// setup static field.wc.getField("pts").set(null, pts);wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));wc.getField("mns").set(null, mns.toArray(new String[0]));wc.getField("dmns").set(null, dmns.toArray(new String[0]));int ix = 0;for (Method m : ms.values())wc.getField("mts" + ix++).set(null, m.getParameterTypes());return (Wrapper) wc.newInstance();

这时Invoke 对象创建好了,这个对象会对传入对象进行包装,解决调用时才加载实现类
因为是本地注册服务,所以该将创建的对象存起来了
接下来研究 exportLocal(url)方法中 InjvmProtocol 类执的 expor 方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);}//InjvmExporter的构造方法如下,可以看到就是将Invoker放到了exporterMap中 InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);this.key = key;this.exporterMap = exporterMap;//存储invoker exporterMap.put(key, this);}

4.远程注册

接着研究doExportUrlsFor1Protocol(protocolConfig,registryURLs)的后续代码
代码如下

if (registryURLs != null && !registryURLs.isEmpty()) {//循环遍历所有注册中心的地址for (URL registryURL : registryURLs) {//将URL中的protocol 替换成registry 为了后面 protocol.export()调用RegistryProtocol实现类URL monitorUrl = loadMonitor(registryURL);//创建远程提供服务的invoker//这里逻辑和本地导出创建invoker的逻辑相同,都是调用JavassistProxyFactory中的getInvoker()Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));//对invoker和serviceConfig 对象进行包装DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//通过invoker对象进行提供服务Exporter<?> exporter = protocol.export(wrapperInvoker);//添加到缓存exporters.add(exporter);}}

4.1 开启netty服务端

接下来研究 protocol.export(wrapperInvoker) 方法
首先protocol是怎么创建的呢

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

接下来我们研究下 getAdaptiveExtension() 方法
这个方法会动态创建 Protocol的代理类,代理类代码如下

public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");}public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");}public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");com.alibaba.dubbo.common.URL url = arg1;String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);}public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null)throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}}

根据上面代码得出是根据传入参数 url 中的 protocol 参数调用指定的实现类
这时 protocol 的参数是 registry 所以调用 RegistryProtocol 类中的 export() 方法
接下来分析export()中开启Netty服务的代码

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {//开启netty服务final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);}

接下来分析 doLocalExport(originInvoker) 方法,代码如下

String key = getCacheKey(originInvoker);ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);bounds.put(key, exporter);}}}return exporter;

上面代码最主要就 protocol.export(invokerDelegete) 方法
接下来分析这个方法
首先这个 protocol.export() 会调用哪个实现类,这里protocol对象是这个类的成员变量
这个类的实现类一般是 DubboProtocol 那么接下来看 DubboProtocol 的方法 exprot()

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.put(key, exporter);Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}//这个方法最主要,用来开启Netty服务openServer(url);optimizeSerialization(url);return exporter;}

接下来分析 openServer(url) 方法
代码如下

private void openServer(URL url) {// find server.String key = url.getAddress();//client can export a service which's only for server to invokeboolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) {//判断缓存中有没有 没有则开服务ExchangeServer server = serverMap.get(key);if (server == null) {serverMap.put(key, createServer(url));} else {// server supports reset, use together with overrideserver.reset(url);}}}

接下分析 createServer(url) 方法
主要代码如下

try {//这里传入了个requestHandler对象 这个对象是是在DubboProtocol声明的 ExchangeHandlerAdapter(){}//这个对象在最后会用来处理请求服务 先记下这个类server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}

接下来就分析 Exchangers.bind(url, requestHandler); 方法吧
调用过程就不细说了,最后会调用 HeaderExchanger 类的 bind方法
代码如下

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//在这里对刚才传入的handler进行了两次包装//那么现在的handler包装成什么样子了呢//new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))//要记下这个handler,因这个handler最后会处理请求服务return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }

接下来分析 Transporters.bind(url,handler) 方法,经过spi调用会调用到实现类NettyTransporter
代码如下,我们分析Netty3.x版本的吧

public Server bind(URL url, ChannelHandler listener) throws RemotingException {//经过复杂的调用终于看到了开启Netty的代码return new NettyServer(url, listener); }

接下来我们看下 NettyServer(url, listener); 都做了什么

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {//这里对handler进行了包装//包装成了如下类型 //new MultiMessageHandler(new HeartbeatHandler(//new AllChannelHandler(new DecodeHandler(//new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }

接下来分析 new NettyServer(url, listener);的父类构造方法做了什么
跳过简单调用,直接看主要代码

protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();//创建boss线程池ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));//创建worker线程池ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));//创建工厂ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));//创建核心配置类bootstrap = new ServerBootstrap(channelFactory);//创建一个hadlder,对NettyServer进行包装,NettyServer的父类成员变量中存储着传入的handlerfinal NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();bootstrap.setOption("child.tcpNoDelay", true);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();//当超时执行/*int idleTimeout = getIdleTimeout();if (idleTimeout > 10000) {pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));}*/pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// bindchannel = bootstrap.bind(getBindAddress()); }

这样就开启NettyServer 服务端,那么服务端是如何处理请求的呢
我们先将处理请求的类记下,用于后续分析
adapter.getDecoder() //编码器 使用 NettyCodecAdapter 类
adapter.getEncoder() //解码器 使用 NettyCodecAdapter 类
nettyHandler //处理请求的handler 使用如下类进行了包装
new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))))))

4.2 连接注册中心zk,写入信息

接下来接着分析RegistryProtocol 类的 export(Invoker) 方法
export部分代码如下

//获得注册中心地址 URL registryUrl = getRegistryUrl(originInvoker); //在这里根据注册中心地址链接注册中心 final Registry registry = getRegistry(originInvoker); //移除一些参数 final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registeredProviderUrl.getParameter("register", true); //包装下 然后存起来将 invoker ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);if (register) {//执行注册register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); }

我们先分析下getRegistry(originInvoker);方法
然后再分析register(registryUrl, registeredProviderUrl);方法
在 getRegistry(originInvoker); 方法中 如果注册中心以zk为例子,跳过调用代码最后会调用到CuratorZookeeperClient 类的CuratorZookeeperClient(URL url)方法,
下面代码为连接zk,代码如下

public CuratorZookeeperClient(URL url) {super(url);try {//连接zk服务端CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);String authority = url.getAuthority();if (authority != null && authority.length() > 0) {builder = builder.authorization("digest", authority.getBytes());}client = builder.build();client.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {if (state == ConnectionState.LOST) {CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);} else if (state == ConnectionState.CONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);} else if (state == ConnectionState.RECONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);}}});//连接client.start();} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}

接下来分析 register(registryUrl, registeredProviderUrl);方法
去掉调用代码,代码如下

public void register(URL url) {super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// Sending a registration request to the server sidedoRegister(url);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyfailedRegistered.add(url);}}

接下来我们分析 doRegister(url) 方法
代码如下,下面代码会在zk中创建一个持久化节点

@Overrideprotected void doRegister(URL url) {try {zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}}

4.3监听注册中心

接下来接着分析RegistryProtocol 类的 export(Invoker) 方法,export部分代码如下

//创建urlfinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);//创建一个监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);//放到缓存中overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);//订阅消息registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

dubbo引用服务

1.引用服务入口

服务引用要在xml中配置如下信息

<!-- 增加引用远程服务配置 --> <dubbo:reference id=“xxxService” interface=“com.xxx.XxxService” /> <!-- 和本地服务一样使用远程服务 --> <bean id=“xxxAction” class=“com.xxx.XxxAction”> <property name=“xxxService” ref=“xxxService” /> </bean>

上面的配置信息会被解析到 ReferenceConfig 类中
服务引用的入口类是 ReferenceBean,接下来我们分析它,先看下它的实现

public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {}

先看 InitializingBean 接口的 afterPropertiesSet() 方法
方法中获得配置信息存储到 ReferenceConfig 的成员变量中
如果不是懒加载则直接调用getObject()方法创建对象

Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);for (ConsumerConfig config : consumerConfigMap.values()) {setConsumer(consumerConfig);}Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);for (ApplicationConfig config : applicationConfigMap.values()) {setApplication(applicationConfig);}Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);for (ModuleConfig config : moduleConfigMap.values()) {setModule(moduleConfig);}Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);for (RegistryConfig config : registryConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {registryConfigs.add(config);}}if (registryConfigs != null && !registryConfigs.isEmpty()) {super.setRegistries(registryConfigs);}Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);for (MonitorConfig config : monitorConfigMap.values()) {setMonitor(monitorConfig);}Boolean b = isInit();if (b == null && getConsumer() != null) {b = getConsumer().isInit();}if (b != null && b.booleanValue()) {getObject();}

接下来看 FactoryBean 的接口实现,既然是FactoryBean类型那么就看下它能创建什么对象吧
代码如下

@Overridepublic Object getObject() throws Exception {return get();}//返回创建对象的类型@Overridepublic Class<?> getObjectType() {//这个对象的类型是ReferenceConfig 类中的 interfaceClass//<dubbo:reference id=“xxxService” interface=“com.xxx.XxxService” />//也就是xml中 dubbo:reference 的interface属性的值return getInterfaceClass();}@Override@Parameter(excluded = true)//创建的对象是否是单例的public boolean isSingleton() {return true;}

接下来分析 get() 吧
代码如下

public synchronized T get() {if (destroyed) {throw new IllegalStateException("Already destroyed!");}//这个ref 就是要返回的对象if (ref == null) {//这个对象没有就去创建它init();}return ref; }

接下来分析 init() 方法吧
主要代码如下

// 从系统变量中获取与接口名对应的属性值String resolve = System.getProperty(interfaceName);String resolveFile = null;if (resolve == null || resolve.length() == 0) {// 从系统属性中获取解析文件路径resolveFile = System.getProperty("dubbo.resolve.file");if (resolveFile == null || resolveFile.length() == 0) {// 从指定位置加载配置文件File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");if (userResolveFile.exists()) {// 获取文件绝对路径resolveFile = userResolveFile.getAbsolutePath();}}if (resolveFile != null && resolveFile.length() > 0) {Properties properties = new Properties();FileInputStream fis = null;try {fis = new FileInputStream(new File(resolveFile));// 从文件中加载配置properties.load(fis);} catch (IOException e) {throw new IllegalStateException("Unload ..., cause:...");} finally {try {if (null != fis) fis.close();} catch (IOException e) {logger.warn(e.getMessage(), e);}}// 获取与接口名对应的配置resolve = properties.getProperty(interfaceName);}}//创建interfaceClass接口对应的 invoker 对象缓存起来String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();//构建map 一系列参数存储其中appendParameters(map, application);appendParameters(map, module);appendParameters(map, consumer, Constants.DEFAULT_KEY);appendParameters(map, this);//通过map创建 最终想要的对象ref = createProxy(map);

上面代码加载文件是为了
通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段

2.服务引用的invoker创建

接下来分析createProxy(map); 中的方法
代码如下(省略非主要代码)

private T createProxy(Map<String, String> map) {// 如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 trueboolean isJvmRefer=InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);//本地服务引用if (isJvmRefer) {//Constants.LOCAL_PROTOCOL的值是injvm//所以后面refprotocol.refer(interfaceClass, url) 调用的是InjvmProtocol类URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);//创建调用本地服务的invoker//调用 InjvmProtocol 的refer方法invoker = refprotocol.refer(interfaceClass, url);} else {// url 不为空,表明用户可能想进行点对点调用if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(interfaceName);}if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}} else { //加载注册中心//同时设置Registry为protocol 用于后面调用refprotocol.refer()的实现类是RegistryProtocol List<URL> us = loadRegistries(false);if (us != null && !us.isEmpty()) {for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}}// 生产者的服务提供者只有一个if (urls.size() == 1) {//创建调用远程服务的invoker,invoker类型为dubboinvoker// 调用 RegistryProtocol 的 refer 构建 Invoker 实例invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {// 多个服务提供者,存到list中List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);//根据集群管理选择一个一个dubboinvokerinvoker = cluster.join(new StaticDirectory(u, invokers));} else { // not a registry urlinvoker = cluster.join(new StaticDirectory(invokers));}}}// create service proxy//这里通过invoker创建代理对象return (T) proxyFactory.getProxy(invoker); }

2.1本地服务引用invoker创建

我们先看一下 InjvmProtocol 中的 refprotocol.refer(interfaceClass, url)方法
代码如下

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap); }

直接创建 InjvmInvoker 对象,挺简单的

2.2远程服务引用invoker创建

DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法

接下来我们来看DubboProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法
代码如下

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);//创建一个DubboInvoker类型的 invoker,这个对象的invoke 方法会向netty服务端发送调用信息DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker; }

下面最主要的方法是getClients(url),代码如下

private ExchangeClient[] getClients(URL url) {// whether to share connectionboolean service_share_connect = false;int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (service_share_connect) {// 获取共享客户端clients[i] = getSharedClient(url);} else {clients[i] = initClient(url);}}return clients;}

接下来分析getSharedClient(url); 方法
代码如下

private ExchangeClient getSharedClient(URL url) {String key = url.getAddress();ReferenceCountExchangeClient client = referenceClientMap.get(key);if (client != null) {if (!client.isClosed()) {client.incrementAndGetCount();return client;} else {referenceClientMap.remove(key);}}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {if (referenceClientMap.containsKey(key)) {return referenceClientMap.get(key);}//初始化客户端ExchangeClient exchangeClient = initClient(url);client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);referenceClientMap.put(key, client);ghostClientMap.remove(key);locks.remove(key);return client;}}

接下来分析 initClient(url) 方法
代码如下

if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {//这里的 requestHandler是 DubboProtocol类中的 ExchangeHandlerAdapter()//这个handler用来调用服务client = Exchangers.connect(url, requestHandler);}

接下来研究 Exchangers.connect(url, requestHandler) 方法,
跳过调用代码,直接进入HeaderExchanger 类的connect(url,hanlder)方法

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//这时handler被包装成//new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}

接下来分析Transporters.connect(url,hanlder)方法
跳过调用代码进入到 NettyTransporter 类的connect(url, listener)方法
代码如下

public Client connect(URL url, ChannelHandler listener) throws RemotingException {return new NettyClient(url, listener); }

接下来new NettyClient(url, listener)都做了什么
代码如下

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {//对handler进行了包装super(url, wrapChannelHandler(url, handler));}

我们先看下wrapChannelHandler(url,handler) 对handler做了哪些包装,一会再看 NettyClient 父类的构造方法,wrapChannelHandler(url,handler) 代码如下

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);return ChannelHandlers.wrap(handler, url); }

接下来分析ChannelHandlers.wrap(handler, url)方法
代码如下

public class ChannelHandlers {private static ChannelHandlers INSTANCE = new ChannelHandlers();public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {//ExtensionLoader.getExtensionLoader(Dispatcher.class)//.getAdaptiveExtension().dispatch(handler, url)的实现类暂且认为是AllDispatcher//AllDispatcher 中使用 new AllChannelHandler(handler)对handler进行了包装//这时handler已经被包装成了这样//new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(//new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}

现在的handler类型就是这样的
new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))
看上去比较恶心,但是他就是要用这样的handler,我们要记住他,以后分析

看完handler的包装再看NettyClient 父类的构造方法吧
主要代码如下

try {//创建bootstrap配置信息doOpen();} catch (Throwable t) {close();}try {// 根据创建的bootstrap配置信息来进行连接connect();} catch (RemotingException t) {if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;}executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));

下面来看下doOpen()方法

protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();bootstrap = new ClientBootstrap(channelFactory);// config// @see org.jboss.netty.channel.socket.SocketChannelConfigbootstrap.setOption("keepAlive", true);bootstrap.setOption("tcpNoDelay", true);bootstrap.setOption("connectTimeoutMillis", getTimeout());//再一次对handler进行包装现在handler包装的信息如下//new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new //AllChannelHandler(new DecodeHandler(new //HeaderExchangeHandler(new ExchangeHandlerAdapter())))))final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);bootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});}

上面代码只创建对象不进行连接
下面来看下connect()方法,代码如下

protected void doConnect() throws Throwable {long start = System.currentTimeMillis();//连接服务端ChannelFuture future = bootstrap.connect(getConnectAddress());try {boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);if (ret && future.isSuccess()) {Channel newChannel = future.getChannel();newChannel.setInterestOps(Channel.OP_READ_WRITE);try {// Close old channelChannel oldChannel = NettyClient.this.channel; // copy referenceif (oldChannel != null) {try {oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} } finally {if (!isConnected()) {future.cancel();}}}

这样客户端的服务器起来了
我们要记住客户端的handler包装信息(和服务端的一样)有
new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter())))))

RegistryProtocol 中的 refprotocol.refer(interfaceClass, urls) 方法

接下来分析 RegistryProtocol 类的 refer 方法
代码如下

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {return doRefer(getMergeableCluster(), registry, type, url);}}return doRefer(cluster, registry, type, url);}

接下来看doRefer方法

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 创建 RegistryDirectory 实例RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);// 设置注册中心和协议directory.setRegistry(registry);directory.setProtocol(protocol);Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());// 生成urlURL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);// 注册服务消费者,在 consumers 目录下新节点if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}// 订阅 providers、configurators、routers 等节点数据directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker; }

如上,doRefer 方法创建一个 RegistryDirectory 实例(RegistryDirectory 会同步生产者的服务信息,根据可以调用的服务创建对应的DubboInvoker对象存储到RegistryDirectory 中),然后生成消费者url,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。
由于一个服务可能部署在多台服务器上,这样生产者就会在注册中心上注册多个服务,这个时候就需要 Cluster 在多个服务中选择一个invoker(负载均衡)返回

3.通过invoker 创建代理对象

接下来研究 ReferenceConfig 的 proxyFactory.getProxy(invoker) 方法,就是对invoker进行下代理
接下来看JavassistProxyFactory的 getProxy 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }

最后生成的代理类是这样的

package org.apache.dubbo.common.bytecode;public class proxy0 implements org.apache.dubbo.demo.DemoService {public static java.lang.reflect.Method[] methods;private java.lang.reflect.InvocationHandler handler;public proxy0() {}public proxy0(java.lang.reflect.InvocationHandler arg0) {handler = $1;}public java.lang.String sayHello(java.lang.String arg0) {Object[] args = new Object[1];args[0] = ($w) $1;Object ret = handler.invoke(this, methods[0], args);return (java.lang.String) ret;} }

集群容错

官方文档上说集群容错分为
服务目录 Directory、服务路由 Router、集群 Cluster、负载均衡 LoadBalance

服务目录是什么: 生产者启动时会将可以调用的服务存储到zk上。消费者启动后拉取zk上可以调用的服务,根据每个服务信息创建一个Invoker对象,将所有的invoker存储到服务目录 Directory中,Directory中存储的invoker信息会随着生产者提供服务信息变化。

服务路由 Router的作用: 服务路由规定了服务消费者可调用哪些服务提供者。
服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,条件路由规则信息如下

host = 10.20.153.10 => host = 10.20.153.11
[服务消费者ip] => [服务提供者ip]

说明服务消费者ip 可以调用此服务提供者的ip

集群 Cluste: 在dubbo中 Cluste 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理打印日志还是抛异常),现在都交给集群模块去处理

负载均衡 LoadBalance:调用时在多个invoker中通过权重和策略的参数选择一个invoker

服务目录 Directory

通常使用RegistryDirectory,类图如下


Directory 继承自 Node 接口,这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。
AbstractDirectory 实现了 Directory 接口,这个接口包含了一个重要的方法定义,list(Invocation),用于获取所有 Invoke对象,这里的invoke对象可以调用生产者的服务。
RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,可以通过此接口方法得到变更invoke信息

RegistryDirectory是在哪里初始化的呢,肯定是消费者使用到它了
在RegistryProtocol 的 doRefer 方法进行的初始化

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {//在这里进行的初始化RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}//订阅服务端提供的服务directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;}

接下来分析 RegistryDirectory 是如何将zk中的生产者服务信息同步到每个invoker对象中的
先看父类的 list 方法,它可以返回所有的invoker对象,代码如下

//可以获得所有的invocation public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory already destroyed .url: " + getUrl());}//模板方法调用子类的 doList 方法//RegistryDirectory 的 doList方法中 如果服务没有禁用则从成员变量(Map<String, List<Invoker<T>>> methodInvokerMap)中获取List<Invoker<T>> invokers = doList(invocation);List<Router> localRouters = this.routers; // local referenceif (localRouters != null && !localRouters.isEmpty()) {for (Router router : localRouters) {try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//进行路由invokers = router.route(invokers, getConsumerUrl(), invocation);}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers; }

接下来看下 RegistryDirectory 的 notify方法,如果zk中生产者提供的服务信息发生变化则会调用此方法,在方法中根据变化的信息同步到对应的invoker对象,代码如下

public synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();for (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category)|| Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}// configuratorsif (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls);}// routersif (routerUrls != null && !routerUrls.isEmpty()) {List<Router> routers = toRouters(routerUrls);if (routers != null) { // null - do nothingsetRouters(routers);}}List<Configurator> localConfigurators = this.configurators; // local reference// merge override parametersthis.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);}}// providersrefreshInvoker(invokerUrls);}

上面代码最主要方法是 refreshInvoker(invokerUrls)
代码如下

private void refreshInvoker(List<URL> invokerUrls) {if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden = true; // Forbid to accessthis.methodInvokerMap = null; // Set the method invoker map to nulldestroyAllInvokers(); // Close all invokers} else {this.forbidden = false; // Allow to accessMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local referenceif (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet<URL>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {return;}Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker mapMap<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map// state change// If the calculation is wrong, it is not processed.if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;this.urlInvokerMap = newUrlInvokerMap;try {destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}}}

将上面代码是将url信息转换成成invoker存储到 methodInvokerMap 和 urlInvokerMap 成员变量中

服务路由 Router

我们什么时候要用到Router,当我们要配置dubbo服务的白名单黑名单 或一个服务提供给不同的消费者。在服务字典获所有invoker的时候,会通过路由过滤掉不能调用的invoker
先看下Router 的关系图


有两个实现类
条件路由 ConditionRouter 和 脚本路由 ScriptRouter
先看下条件路由 ConditionRouter 吧
ConditionRouter 的构造方法代码如下

public ConditionRouter(URL url) {this.url = url;// 获取 priority 和 force 配置this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);this.force = url.getParameter(Constants.FORCE_KEY, false);try {// 获取路由规则String rule = url.getParameterAndDecoded(Constants.RULE_KEY);if (rule == null || rule.trim().length() == 0) {throw new IllegalArgumentException("Illegal route rule!");}rule = rule.replace("consumer.", "").replace("provider.", "");// 定位 => 分隔符int i = rule.indexOf("=>");// 分别获取服务消费者和提供者匹配规则String whenRule = i < 0 ? null : rule.substring(0, i).trim();String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();// 解析服务消费者匹配规则Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);// 解析服务提供者匹配规则Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);// 将解析出的匹配规则分别赋值给 whenCondition 和 thenCondition 成员变量this.whenCondition = when;this.thenCondition = then;} catch (ParseException e) {throw new IllegalStateException(e.getMessage(), e);} }

根据上面代码得出,根据传入的 url 对象,通过 parseRule 方法解析,将解析的信息赋值给成员变量下面我们来看下 parseRula方法吧,代码如下

private static Map<String, MatchPair> parseRule(String rule)throws ParseException {// 定义条件映射集合Map<String, MatchPair> condition = new HashMap<String, MatchPair>();if (StringUtils.isBlank(rule)) {return condition;}MatchPair pair = null;Set<String> values = null;// 通过正则表达式匹配路由规则,ROUTE_PATTERN = ([&!=,]*)\s*([^&!=,\s]+)// 这个表达式看起来不是很好理解,第一个括号内的表达式用于匹配"&", "!", "=" 和 "," 等符号。// 第二括号内的用于匹配英文字母,数字等字符。举个例子说明一下:// host = 2.2.2.2 & host != 1.1.1.1 & method = hello// 匹配结果如下:// 括号一 括号二// 1. null host// 2. = 2.2.2.2// 3. & host// 4. != 1.1.1.1 // 5. & method// 6. = hellofinal Matcher matcher = ROUTE_PATTERN.matcher(rule);while (matcher.find()) {// 获取括号一内的匹配结果String separator = matcher.group(1);// 获取括号二内的匹配结果String content = matcher.group(2);// 分隔符为空,表示匹配的是表达式的开始部分if (separator == null || separator.length() == 0) {// 创建 MatchPair 对象pair = new MatchPair();// 存储 <匹配项, MatchPair> 键值对,比如 <host, MatchPair>condition.put(content, pair); } // 如果分隔符为 &,表明接下来也是一个条件else if ("&".equals(separator)) {// 尝试从 condition 获取 MatchPairif (condition.get(content) == null) {// 未获取到 MatchPair,重新创建一个,并放入 condition 中pair = new MatchPair();condition.put(content, pair);} else {pair = condition.get(content);}} // 分隔符为 =else if ("=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule ...");values = pair.matches;// 将 content 存入到 MatchPair 的 matches 集合中values.add(content);} // 分隔符为 != else if ("!=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule ...");values = pair.mismatches;// 将 content 存入到 MatchPair 的 mismatches 集合中values.add(content);}// 分隔符为 ,else if (",".equals(separator)) {if (values == null || values.isEmpty())throw new ParseException("Illegal route rule ...");// 将 content 存入到上一步获取到的 values 中,可能是 matches,也可能是 mismatchesvalues.add(content);} else {throw new ParseException("Illegal route rule ...");}}return condition; }

上面代码做了什么呢,它就是将 url对象中的信息进行解析,然后将解析的数据返回
接下来看ConditionRouter 的 route 方法

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {if (invokers == null || invokers.isEmpty()) {return invokers;}try {// 先对服务消费者条件进行匹配,如果匹配失败,表明服务消费者 url 不符合匹配规则,// 无需进行后续匹配,直接返回 Invoker 列表即可。比如下面的规则:// host = 10.20.153.10 => host = 10.0.0.10// 这条路由规则希望 IP 为 10.20.153.10 的服务消费者调用 IP 为 10.0.0.10 机器上的服务。// 当消费者 ip 为 10.20.153.11 时,matchWhen 返回 false,表明当前这条路由规则不适用于// 当前的服务消费者,此时无需再进行后续匹配,直接返回即可。if (!matchWhen(url, invocation)) {return invokers;}List<Invoker<T>> result = new ArrayList<Invoker<T>>();// 服务提供者匹配条件未配置,表明对指定的服务消费者禁用服务,也就是服务消费者在黑名单中if (thenCondition == null) {logger.warn("The current consumer in the service blacklist...");return result;}// 这里可以简单的把 Invoker 理解为服务提供者,现在使用服务提供者匹配规则对 // Invoker 列表进行匹配for (Invoker<T> invoker : invokers) {// 若匹配成功,表明当前 Invoker 符合服务提供者匹配规则。// 此时将 Invoker 添加到 result 列表中if (matchThen(invoker.getUrl(), url)) {result.add(invoker);}}// 返回匹配结果,如果 result 为空列表,且 force = true,表示强制返回空列表,// 否则路由结果为空的路由规则将自动失效if (!result.isEmpty()) {return result;} else if (force) {logger.warn("The route result is empty and force execute ...");return result;}} catch (Throwable t) {logger.error("Failed to execute condition router rule: ...");}// 原样返回,此时 force = false,表示该条路由规则失效return invokers; }

上面代码主要通过 matchWhen、matchThen 方法进行匹配路由,看传递过来的参数判断是否可以访问

总结

dubbo生产者初始化过程

首先通过 DubboNamespaceHandler 解析配置文件,每有一个dubbo:service标签即创建一个ServiceBean 类
每个servicebean会在项目启动的时候通过 setApplicationContext 方法 向spring添加一个监听器
监听器会初始化生产者的服务
监听器是怎么初始化生产者的服务的下面继续分析
首先循环遍历所有注册中心,然后遍历所有协议,
根据注册中心和协议 对每个服务进行导出
根据配置信息创建URL对象
进行导出服务,如果要导出服务到本地
–》根据实现类ref对象和URL对象创建 invoker对象,然后缓存起来
进行导出服务,如果要导出服务到远程
–》根据实现类ref对象和URL对象创建 invoker对象,invoker就是对ref对象进行简单的代理
–》根据invoker创建DubboExporter对象(用于处理请求),DubboExporter对invoker对象进行包装
–》开启netty服务端
–》获得注册中心地址,连接注册中心,检测注册中心开启情况,如果变化给所有监听器进行回调
–》每有一个服务则在注册中心上创建一条信息
–》然后订阅 override 数据

消费者初始化过程

首先通过 DubboNamespaceHandler 解析配置文件, 每有一个 dubbo:reference 创建一个ReferenceBean 类
ReferenceBean 是一个factorybean, 可以通过它来创建一个单例的代理类(ref 要调用接口的实现类)
这个factorybean会创建一个什么样的代理对象呢,下面进行分析
如果是本地引用则 根据实现类(ref) 创建一个invoker对象
如果是远程引用则 根据实现类(ref) 创建一个DubboInvoker对象
–> 每有一个服务提供者则创建一个 DubboInvoker,DubboInvoker对象的invoker方法会调用netty的client的send方法(DubboInvoker可以向服务端发起调用请求)
–> 一个服务有多个提供者的话,会创建多个DubboInvoker放到集合中,然后创建AbstractClusterInvoker实现类下的对象进行集群管理
–> 集群管理包括(调用服务失败后抛异常还是调用别的生产者、负载均衡策略选择)
开启netty客户端,连接注册中心,向注册中心中写入服务,初始化服务字典RegistryDirectory
在服务字典(服务端每有一个服务则创建一个DubboInvoker存储到服务字典中)中添加监听。
根据invoker创建对应的代理proxy对象返回。 proxy是对invoker对象进行了包装。

服务调用过程

消费者向生产者发送请求

DubboProductInterface obj=(DubboProductInterface)application.getBean("DubboProductInterface") obj.sayHello();

上面代码会通过getBean() 方法 创建 DubboProductInterface 接口的代理对象,每次调用这个代理对象的方法都会发起远程调用,这个代理对象是什么样子呢?

在spring容器中通过单例的factorybean会创建一个DubboProductInterface 类型的代理对象 如果生产者非集群部署(一个服务的提供者只有一个) --->也就是生产者只有一个则直接根据 生产者提供的服务信息 返回一个DubboInvoker对象。 如果生产者是集群部署的话 (一个服务的提供者有多个) --->会通过AbstractClusterInvoker 将多个DubboInvoker进行包装,包装后对多个DubboInvoker进行集群管理 --->(一个服有几个生产者提供就有几个DubboInvoker) --->然后返回AbstractClusterInvoker 对象

AbstractClusterInvoker 的 invoke 方法做了什么?

假如调用的AbstractClusterInvoker的实现类是 FailoverClusterInvoker 他的作用是 调用失败自动切换生产者,也就是调用生产者失败后调用别的生产者 FailoverClusterInvoker 的doinvoke方法会先获得重试次数(失败后最多调用几次) 通过LoadBalance 的实现类进行负责均衡(从多个DubboInvoker中选择一个返回), 调用AbstractInvoker中的invoke方法,也就是DubboInvoker中的doinvoke方法 如果调用失败则根据重试次数进行多次调用,调用了重试次数次 都失败则抛异常。

在DubboInvoker中调用分为很多种包括,异步有返回值调用、异步没有返回值调用、同步调用(同步调用可定有返回值,空也算返回值)
在这就分析同步调用吧,在DubboInvoker 的doinvoke 方法中
currentClient.request(inv, timeout).get(); 在这里的get() 会阻塞等待返回的数据
调用流程如下

> DubboInvoker#doInvoke(Invocation)> ReferenceCountExchangeClient#request(Object, int) --> 里面有一个计数器—> HeaderExchangeClient#request(Object, int) --> 初始化this.channel = new HeaderExchangeChannel(client);--> 进行心跳检测—> HeaderExchangeChannel#request(Object, int) --> 将请求的信息包装成Request对象—> AbstractPeer#send(Object)> AbstractClient#send(Object, boolean) -->连接Netty服务端 将连接后的channel对象存储起来(例NioServerSocketChannel)在dubbo中是NettyChannel-->获得channel -->调用channel的send方法 —> NettyChannel#send(Object, boolean)> NioClientSocketChannel#write(Object)

消费者向生产者发送参数的编码

接下来分析编码调用流程

在 NettyClient中设置编码器, pipeline.addLast("encoder", adapter.getEncoder()); adapter.getEncoder() 会创建一个内部类 InternalEncoder() InternalEncoder()encode()方法 调用codec.encode(channel, buffer, msg); 也就是 ExchangeCodec 的encode方法 的 encodeRequest()方法 encodeRequest() 方法中通过channel将对象信息发送给服务端

生产者接收请求解码过程

解码调用过程如下

ExchangeCodec --》decodeExchangeCodec --decode(channel, buffer, readable, header)ExchangeCodec --decodeBody(channel, is, header)DubboCodec --decodeBody() 创建一个new DecodeableRpcInvocation() 对象返回DecodeableRpcInvocation ---decode() 最终会返回一个DecodeableRpcInvocation 对象 解码流程结束,将发送的信息封装到一个request对象返回, 这个request对象的类型是ecodeableRpcInvocation

生产者根据请求信息调用指定方法

接下来就是将这个request对象交给hanlder进行处理,处理request的handler经过一系列包装,包装类层次如下

new NettyHandler(new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(new DecodeHandler(new HeaderExchangeHandler(new ExchangeHandlerAdapter()))))))

接下来看调用栈吧

NettyHandler -messageReceived() netty框架用来接收请求的入口MultiMessageHandler -》 receivedHeartbeatHandler -》 receivedAllChannelHandler -》 received 开启线程ChannelEventRunnable -run() 根据通道类型进行调用不同方法DecodeHandler -》receivedHeaderExchangeHandler -》 received 调用方法,将调用方法的结果返回的结果发送回去channel.send(response);HeaderExchangeHandler -handleRequest(exchangeChannel, request); 处理请求ExchangeHandlerAdapter -》reply 也就是 DubboProtocol -》reply 根据传递的参数 获得对应的DubboExporterDubboExporter 中包含着对应的invokerAbstractProxyInvoker -》invoke JavassistProxyFactory -》getInvoker 即可调用指定方法

消费者对生产者发送的参数解码

消费者接收生产者响应解码

NettyClient 的 adapter.getEncoder() 方法进行解码, 最后会到 ExchangeCodec 的 encodeResponse(channel, buffer, (Response) msg); 方法 然后DubboCodec 的 decodeBody 方法 返回一个new DecodeableRpcResult 请求对象 DecodeableRpcResult ==decode() 方法,将参数存储到DecodeableRpcResult对象中

消费者接收生产者发送的参数

解码完成后,接收使用handler解析DecodeableRpcResult

还是那个包装了很多次的handler 跳过部分handler直接进入主要的逻辑 HeaderExchangeHandler received HeaderExchangeHandler handleResponse(channel, (Response) message); 在这里移除message 同时将请求传递的response参数放到DefaultFuture的成员变量中然后唤醒用户线程

消费者接收生产者响应,如何找到消费者发起请求的线程

消费者调用生产者
生产者没返回调用结果时,消费者会阻塞,在哪里阻塞的呢

DubboInvoker 的 doinvoke 方法 --》 currentClient.request(inv, timeout).get(); 在这里的get() 会阻塞 也就是DefaultFuture 的get方法

先看下DefaultFuture 中都做了什么

DefaultFuture 中有个静态map,存储着返回信息请求id 和 DefaultFuture对象 DefaultFuture 静态代码块会开启一个守护线程,如果相应超过固定时间则将DefaultFuture从静态map中移除

DefaultFuture 的get方法都做了什么,将代码简写

private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();private volatile Response response;public Object get(int timeout) throws RemotingException {//如果返回值为空if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {//在lock锁中进行阻塞,相当于在synchronized中使用wait,要用notify进行唤醒//synchronized 中的notify是随机唤醒,不能指定唤醒//lock锁中的 await可以被指定唤醒done.await(timeout, TimeUnit.MILLISECONDS);//是否超过响应时间if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} finally {//释放锁lock.unlock();}}return returnFromResponse(); }

在生产者返回响应是如何唤醒线程的呢
在解码后交给handler进行处理,直接看核心代码
HeaderExchangeHandler —》 handleResponse(channel, (Response) message);

static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);} }

接下来看 DefaultFuture.received(channel, response)

public static void received(Channel channel, Response response) {try {//根据id获得对应的DefaultFuture //通过id将静态map中的DefaultFuture 移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//在方法中调用 done.signal(); 进行唤醒future.doReceived(response);} }} finally {CHANNELS.remove(response.getId());} }

接下来看 future.doReceived(response);

private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {//进行唤醒done.signal();}} finally {lock.unlock();}if (callback != null) {invokeCallback(callback);} }

经典问题

Dubbo 中注册中心可以使用哪些,协议可以使用哪些
Dubbo 中失败重试次数怎么设置
Dubbo 请求是否异步,异步是否需要返回值,怎么配置
Douuo 消费者调用生产者,消费者阻塞生产者返回结果,如何根据结果找到消费者上正在阻塞的线程

dubbo 怎么如何追踪dubbo id的

如果非集群服务则会调用 DubboInvoker 的 doInvoke方法,如果是异步有返回值则调用 ResponseFuture future = currentClient.request(inv, timeout); 方法 //HeaderExchangeChannel中创建Future 首先 DefaultFuture中有一个静态 Map,map的key为id,value为本次请求的DefaultFuture,同时会开一个守护线程来将超时的DefaultFuture移除 假如是异步请求,每当一次请求,就创建一个 本次请求的DefaultFuture还维护着一个lock锁 用于阻塞和释放当前线程(使用wait和notify的话不能唤醒指定线程), 为了防止静态 Map中本次请求的DefaultFuture被移除导致找不到DefaultFuture, RpcContext将本次请求的DefaultFuture存储到在threadlocal中,DefaultFuture中也有id)

总结

以上是生活随笔为你收集整理的志宇-dubbo源码分析的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。