消费端启动初始化过程

  消费端的代码解析也是从配置文件解析开始的,服务发布对应的<dubbo:service,解析xml的时候解析了一个ServiceBean,并且调用ServiceConfig进行服务的发布。服务的消费对应的<dubbo:reference,在初始化的过程中也解析了一个 ReferenceBean类去做处理。在bean加载后会调用里面的 afterPropertiesSet()这个方法。也就是把配置文件解析到各个对应bean里面。然后调用了getObject():

public Object getObject() throws Exception { 
        return get(); 
    }

  然后也有一个 ReferenceConfig这个类来一起执行,进入他的get()方法:

public synchronized T get() { 
        if (destroyed) { 
            throw new IllegalStateException("Already destroyed!"); 
        } 
        if (ref == null) { 
            init();//初始化 
        } 
        return ref; 
    } 

  然后进入初始化操作。init() 方法里的操作跟服务发布的时候很类似,还是检查各种配置,解析各种标签,最后封装到一个 map 里面。然后调用 ref = createProxy(map); 返回一个代理对象,对于该方法体内前面很多代码都是执行初始化。判断虚拟机情况,组装URL什么的。主要关于服务端调用的的部分代码如下:

} else { //从注册中心上获得相应的协议url地址 
                List<URL> us = loadRegistries(false); 
                if (us != null && us.size() > 0) { 
                    for (URL u : us) { 
                        URL monitorUrl = loadMonitor(u); 
                        if (monitorUrl != null) { 
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 
                        } 
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 
                    } 
                } 
                if (urls == null || urls.size() == 0) { 
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); 
                } 
            } 
 
            if (urls.size() == 1) { 
                // 获得invoker代理对象MockClusterInvoker 
                invoker = refprotocol.refer(interfaceClass, urls.get(0)); 
            } else { 
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 
                URL registryURL = null; 
                for (URL url : urls) { 
                    invokers.add(refprotocol.refer(interfaceClass, url)); 
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 
                        registryURL = url; // 用了最后一个registry url                    } 
                } 
                if (registryURL != null) { //有 注册中心协议的URL 
                    //对有注册中心的Cluster 只用 AvailableCluster 
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers)); 
                } else { //不是  注册中心的URL 
                    invoker = cluster.join(new StaticDirectory(invokers)); 
                } 
            } 
        } 
        Boolean c = check; 
        if (c == null && consumer != null) { 
            c = consumer.isCheck(); 
        } 
        if (c == null) { 
            c = true; // default true 
        } 
        if (c && !invoker.isAvailable()) { 
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); 
        } 
        if (logger.isInfoEnabled()) { 
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); 
        } 
        // 
        return (T) proxyFactory.getProxy(invoker); 

  其中 List<URL> us = loadRegistries(false); 是从配置文件解析,最后组装了这么一个地址,基于前面的案例,debug以后发现该地址如下:

registry://192.168.254.135:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.5.4&owner=wuzz&pid=53420&registry=zookeeper&timestamp=1543905535962 

  我们可以看到以上代码有一些步骤是关于  Monitor 的操作,暂时不管。由于我们这里注册中心为1个,最后会进入以下操作:

if (urls.size() == 1) { 
           // 获得invoker代理对象 
           invoker = refprotocol.refer(interfaceClass, urls.get(0)); 
} 

  可以从代码中发现这个 refprotocol 又是一个自适应适配器 Protocol$Adaptive ,他会调用里面的refer() 方法:

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) 
			throws com.alibaba.dubbo.rpc.RpcException { 
		if (arg1 == null) 
			throw new IllegalArgumentException("url == null"); 
		com.alibaba.dubbo.common.URL url = arg1; 
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); 
		if (extName == null) 
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" 
					+ url.toString() + ") use keys([protocol])"); 
		com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader 
				.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 
		return extension.refer(arg0, arg1); 
	} 

  又是熟悉的套路,这里代码中的 url.getProtocol() = registry ,所以 extension 就是 RegistryProtocol,调用他的 refer() ,跟发布服务的流程真的很像:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 
        // url本身是一个 registry://.... 
        // 替换成了<dubbo:registry address="zookeeper://192.168.254.135:2181" />中的zookeeper 
        // zookeeper://.... 
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 
        // 跟服务注册的时候一模一样的代码,最后获得ZookeeperRegistry 获得跟zk的链接 
        Registry registry = registryFactory.getRegistry(url); 
        if (RegistryService.class.equals(type)) {// 这里的type是interface com.gupaoedu.dubbo.IGpHello 
            return proxyFactory.getInvoker((T) registry, type, url); 
        } 
 
        // group="a,b" or group="*" 
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 
        String group = qs.get(Constants.GROUP_KEY); 
        if (group != null && group.length() > 0) { 
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 
                    || "*".equals(group)) { 
                return doRefer(getMergeableCluster(), registry, type, url); 
            } 
        } 
        // 最后调用这个 
        return doRefer(cluster, registry, type, url); 
    } 

  doRefer(cluster, registry, type, url):com.alibaba.dubbo.rpc.cluster.Cluster$Adpative,ZookeeperRigistry,interface com.gupaoedu.dubbo.IGpHello,zookeeper://192.168.254.135:2181/...:

  cluster : 这个是个依赖注入的拓展点:

@SPI(FailoverCluster.NAME) 
public interface Cluster { 
 
    /** 
     * Merge the directory invokers to a virtual invoker. 
     * 
     * @param <T> 
     * @param directory 
     * @return cluster invoker 
     * @throws RpcException 
     */ 
    @Adaptive 
    <T> Invoker<T> join(Directory<T> directory) throws RpcException; 
 
} 

  而且可以从源码中发现这cluster默认是 FailoverCluster ,而且@Adaptive是标注在方法上,所以会生成一个自适应的适配器 Cluster$Adaptive:

public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { 
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { 
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); 
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); 
        com.alibaba.dubbo.common.URL url = arg0.getUrl(); 
        String extName = url.getParameter("cluster", "failover"); 
        if (extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); 
        com.alibaba.dubbo.rpc.cluster.Cluster extension =  
                           (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader 
                           .getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); 
        return extension.join(arg0); 
    } 
} 

  很明显,这个时候我们可以知道这个适配器里面会生成一个拓展点,而这个 extension 默认他就是 FailoverCluster 而且在文件 com.alibaba.dubbo.rpc.cluster.Cluster中:

mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper 
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster 
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster 
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster 
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster 
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster 
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster 
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster 
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster 

  我们发现了一个装饰器 Wrapper :mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper,经过查看其源码,发现内有一个携带了拓展点Cluster的构造方法,所以上面说到的 extension其实是MockClusterWrapper(FailoverCluster)。这里就是之前提到的服务降级里面的 Mock机制,在你 FailoverCluster失败之后调用Mock。

  了解了 Cluster 我们继续进入 doRefer(cluster, registry, type, url)::

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 
        // 对多个invoker 进行组装,也就是做到服务的动态上下线的功能 
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 
        directory.setRegistry(registry);//ZookeeperRegistry 
        directory.setProtocol(protocol);// Protocol$Adaptive 
        // 组装一个 url=comsumer://192.168.254.135/....... 
        // 就是ZK上面会注册一个消费的协议地址 
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); 
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 
                && url.getParameter(Constants.REGISTER_KEY, true)) { 
            // 注册一个消费协议地址 最后是zkClient.create() 
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 
                    Constants.CHECK_KEY, String.valueOf(false))); 
        } 
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                        + "," + Constants.CONFIGURATORS_CATEGORY 
                        + "," + Constants.ROUTERS_CATEGORY)); 
        return cluster.join(directory); 
    } 

  最后的两行代码我们从官网上能找到一张图:IT虾米网

  通过 Directory 传入cluster,返回一个 List<invoker>,传入路由,经过路由机制,继续传入负载均衡做一个负载,得到最终调用的 invoker。

  先来看一下 cluster.join(directory):实际会调用到 MockClusterWrapper 的 join方法:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException { 
        return new MockClusterInvoker<T>(directory, 
                this.cluster.join(directory)); 
    } 

  到这里new 出来一个 MockClusterInvoker 。到此为止 invoker = refprotocol.refer(interfaceClass, urls.get(0));所得到的对象就是  MockClusterInvoker 。 

  接着流程来到了 proxyFactory.getProxy(invoker) :

  proxyFactory 这个东西是否是曾相识呢?? 在服务的发布之前也调用了proxyFactory去获得一个服务的代理对象去发布,而且这个proxyFactory对应的是 :先是生成一个ProxyFactory$Adpative,再调用里面的getProxy,通过默认实现extName=Javassist来获得一个Extension,即JavassistProxyFactory,然后去调用他的getProxy方法,但是在拓展点文件 com.alibaba.dubbo.rpc.ProxyFactory文件中:

stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper 
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory 
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory 

  说明有个装饰的Wapper,所以这里需要包装 ,所以proxyFactory其实就是StubProxyFactoryWrapper(JavassistProxyFactory):做了一系列装饰以后生成本地stub存根,还是会进入 JavassistProxyFactory 的getProxy方法:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 
     return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); 
} 

  该方法内基于javassist 机制生成一个动态的代理类,跟之前手写RPC时候获取代理类有点像了,跟之前服务端发布服务获取invoker类似,这个里面最后组装了一个 Class<?> pc = ccm.toClass(); debug看到里面有个 mMethods是一个数组:

public java.lang.String sayHello(java.lang.String arg0){ 
       Object[] args = new Object[1];  
       args[0] = ($w)$1;  
       Object ret = handler.invoke(this, methods[0], args);  
       return (java.lang.String)ret; 
} 
 
public java.lang.Object $echo(java.lang.Object arg0) { 
	Object[] args = new Object[1]; 
	args[0] = ($w) $1; 
	Object ret = handler.invoke(this, methods[1], args); 
	return (java.lang.Object) ret; 
} 

  最后调用方法的时候通过 InvokerInvocationHandler 的invoke去进行远程调用。最后生成的代理类当中会有一个 InvokerInvocationHandler 属性,并且要通过构造进行赋值,也就是上面的Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));这段代码,然后再通过哪个生成的pc这个代理类的 sayHello去调用方法 。至此,消费端去消费获得引用的流程就结束了。

什么时候建立和服务端的连接:

  消费端的初始化过程,但是似乎没有看到客户端和服务端建立NIO连接。实际上,建立连接的过程在消费端初始化的时候就建立好的,只是前面我们没有分析,代码在RegistryProtocol.doRefer方法内directory.subscribe方法中:做订阅

directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                        + "," + Constants.CONFIGURATORS_CATEGORY 
                        + "," + Constants.ROUTERS_CATEGORY)); 

  调用RegistryDirectory<T>里面的subscribe(URL url):

public void subscribe(URL url) { 
        // url=consumer://192.168.254.1/com.gupaoedu.dubbo.IGpHello?.... 
        setConsumerUrl(url);//设置消费端URL 
        registry.subscribe(url, this); 
    }

  这里的registry就是之前所设置的ZookeeperRegistry ,url 就是comsumer://....,this就是 RegistryDirectory,由于ZookeeperRegistry没有实现 subscribe方法,所以去他的父类中去找,父类是 FailbackRegistry:

public void subscribe(URL url, NotifyListener listener) { 
        if (destroyed.get()){ 
            return; 
        } 
        super.subscribe(url, listener);
//移除失败的订阅 removeFailedSubscribed(url, listener); try { // 向服务器端发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && urls.size() > 0) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // 如果开启了启动检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // 将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }

  doSubscribe(url, listener);是个模板方法,会在子类中实现,即 ZookeeperRegistry:

protected void doSubscribe(final URL url, final NotifyListener listener) { 
        try {//这里的Interface是com.gupaoedu.dubbo.IGpHello,明显不相等,走else 
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { 
                String root = toRootPath(); 
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 
                if (listeners == null) { 
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 
                    listeners = zkListeners.get(url); 
                } 
                ChildListener zkListener = listeners.get(listener); 
                if (zkListener == null) { 
                    listeners.putIfAbsent(listener, new ChildListener() { 
                        public void childChanged(String parentPath, List<String> currentChilds) { 
                            for (String child : currentChilds) { 
                                child = URL.decode(child); 
                                if (!anyServices.contains(child)) { 
                                    anyServices.add(child); 
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 
                                            Constants.CHECK_KEY, String.valueOf(false)), listener); 
                                } 
                            } 
                        } 
                    }); 
                    zkListener = listeners.get(listener); 
                } 
                zkClient.create(root, false); 
                List<String> services = zkClient.addChildListener(root, zkListener); 
                if (services != null && services.size() > 0) { 
                    for (String service : services) { 
                        service = URL.decode(service); 
                        anyServices.add(service); 
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 
                                Constants.CHECK_KEY, String.valueOf(false)), listener); 
                    } 
                } 
            } else {// 进入这里 
                List<URL> urls = new ArrayList<URL>(); 
//toCategoriesPath(url) 会得到一个数组:[/dubbo/com.gupaoedu.dubbo.IGpHello/providers, 
// /dubbo/com.gupaoedu.dubbo.IGpHello/configurators, /dubbo/com.gupaoedu.dubbo.IGpHello/routers] 
                for (String path : toCategoriesPath(url)) { 
                    //第一次进来是空的,存放监听器 
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 
                    if (listeners == null) { 
                        // 将url做key 新建一个map做value,放入监听器 
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 
                        // listeners ={} 
                        listeners = zkListeners.get(url); 
                    } 
                    // null 
                    ChildListener zkListener = listeners.get(listener); 
                    if (zkListener == null) { 
                    // 这里新建了一个监听器该监听器为子节点变化监听并且变化后调用notify做事件处理 
                        listeners.putIfAbsent(listener, new ChildListener() { 
                            public void childChanged(String parentPath, List<String> currentChilds) { 
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); 
                            } 
                        }); 
                        // 得到该监听器 
                        zkListener = listeners.get(listener); 
                    } 
                    // 创建节点,并且将上面创建的监听器放入 
                    zkClient.create(path, false); 
                    List<String> children = zkClient.addChildListener(path, zkListener); 
                    if (children != null) { 
                        urls.addAll(toUrlsWithEmpty(url, path, children)); 
                    } 
                } 
                notify(url, listener, urls); 
            } 
        } catch (Throwable e) { 
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 
        } 
    }

  notify(url, listener, urls):将刚刚的消费者的地址,监听器,以及自己组装的URLS传入:

protected void notify(URL url, NotifyListener listener, List<URL> urls) { 
        if (url == null) { 
            throw new IllegalArgumentException("notify url == null"); 
        } 
        if (listener == null) { 
            throw new IllegalArgumentException("notify listener == null"); 
        } 
        try { 
            doNotify(url, listener, urls); 
        } catch (Exception t) { 
            //  
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); 
            if (listeners == null) { 
                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); 
                listeners = failedNotified.get(url); 
            } 
            listeners.put(listener, urls); 
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); 
        } 
    } 

  doNotify(url, listener, urls);这个方法最终通过调用链进入 AbstactRegistry :

protected void notify(URL url, NotifyListener listener, List<URL> urls) { 
        if (url == null) { 
            throw new IllegalArgumentException("notify url == null"); 
        } 
        if (listener == null) { 
            throw new IllegalArgumentException("notify listener == null"); 
        } 
        if ((urls == null || urls.size() == 0) 
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { 
            logger.warn("Ignore empty notify urls for subscribe url " + url); 
            return; 
        } 
        if (logger.isInfoEnabled()) { 
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); 
        }
// 通过判断 新建一个map Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } }// 判断 if (result.size() == 0) { return; }//对 notified做个赋值 以comsumer的url为key value为{} Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); }//循环 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey();//获得key List<URL> categoryList = entry.getValue();//获得value categoryNotified.put(category, categoryList);//存值 saveProperties(url);//保存到本地配置 listener.notify(categoryList); } }

  上述代码中:

for (URL u : urls) { 
            if (UrlUtils.isMatch(url, u)) { 
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 
                List<URL> categoryList = result.get(category); 
                if (categoryList == null) { 
                    categoryList = new ArrayList<URL>(); 
                    result.put(category, categoryList); 
                } 
                categoryList.add(u); 
            } 
        } 

  这段代码片段循环过后会将map做个初始化,结果为:其实就是我们在ZK上面看到的节点

{ 
configurators=[empty://192.168.254.1/com.gupaoedu.dubbo.IGpHello?application=dubbo-client&category=configurators&dubbo=2.5.4&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello 
                &mock=com.gupaoedu.dubbo.TestMock&owner=wuzz&pid=44524&protocol=dubbo&side=consumer&timeout=50000&timestamp=1543990978879],  
routers=[empty://192.168.254.1/com.gupaoedu.dubbo.IGpHello?application=dubbo-client&category=routers&dubbo=2.5.4&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello 
                &mock=com.gupaoedu.dubbo.TestMock&owner=wuzz&pid=44524&protocol=dubbo&side=consumer&timeout=50000&timestamp=1543990978879],  
providers=[dubbo://192.168.254.1:20880/com.gupaoedu.dubbo.IGpHello?anyhost=true&application=dubbo-server&dubbo=2.5.4&generic=false&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello 
                &owner=wuzz&pid=77904&side=provider&timestamp=1543990460726]}

  在最后循环里面的  listener.notify(categoryList); 进行一个ZK目录节点下的变化做一个更新:其实就是做到服务地址上下线的通知更新(缓存和变更缓存):该方法内前半部分主要是新建了3个关于providers,routers,configurations 3个目录下的地址集合,并进行赋值,然后判断处理,到最后调用refreshInvoker :

    public synchronized void notify(List<URL> urls) {
....... // providers refreshInvoker(invokerUrls); }

  其实RegistryDirectory<T>这个类的作用就是整合 多个invoker ,以及对于zk的指定的服务节点的变化进行监听并进行刷新。refreshInvoker(invokerUrls);这个方法主要的作用就是将传入的 invokerUrls转化成invoker列表,如果已经转换则不再重新引用,直接从缓存中获取,如果传入的invokerUrls不为空,则表示最新的invokerUrls,如果为空,则表示只是下发更新地址活路由router规则:

private void refreshInvoker(List<URL> invokerUrls) { 
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null 
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { 
            this.forbidden = true; // 禁止访问 
            this.methodInvokerMap = null; // 置空列表 
            destroyAllInvokers(); // 关闭所有的nvoker 
        } else { 
            this.forbidden = false; // 允许访问 
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference 
            if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) { 
                invokerUrls.addAll(this.cachedInvokerUrls); 
            } else { 
                this.cachedInvokerUrls = new HashSet<URL>(); 
                this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrl列表 
            } 
            if (invokerUrls.size() == 0) { 
                return; 
            } 
            // 将URL列表转换成invoker列表 
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); 
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 将方法名映射到invoker列表 
            // state change 
            //计算失误不处理 
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { 
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); 
                return; 
            } 
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; 
            this.urlInvokerMap = newUrlInvokerMap; 
            try { 
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); 
            } catch (Exception e) { 
                logger.warn("destroyUnusedInvokers error. ", e); 
            } 
        } 
    } 

  最核心的方法则是  Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);这个方法,就是转化的方法:比如一个地址是dubbo://192.168.254.135...这样的地址 转化成一个DubboInvoker,下面来看一下里面的部分代码:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) { 
         ...... 
 
            if (invoker == null) { //  
                try { 
                    boolean enabled = true; 
                    if (url.hasParameter(Constants.DISABLED_KEY)) { 
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false); 
                    } else { 
                        enabled = url.getParameter(Constants.ENABLED_KEY, true); 
                    } 
                    if (enabled) { 
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 
                    } 
                } catch (Throwable t) { 
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); 
                } 
                if (invoker != null) { //  
                    newUrlInvokerMap.put(key, invoker); 
                } 
            } else { 
                newUrlInvokerMap.put(key, invoker); 
            } 
       ....... 
} 

   这个里面有调了一个拓展点 invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 这个protocol是 Protocol$Adaptive ,而Url里面拿到的protocol是dubbo,最后进入了 DubboProtocol的refer:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 
        // create rpc invoker. 
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 
        invokers.add(invoker); 
        return invoker; 
    } 

  这里新建了一个 DubboInvoker,里面getClient(url) 方法就是与服务端产生连接的代码,顺着这条主线下去,我们会在initClient(url) 这个方法里面找到如下代码:

client = Exchangers.connect(url, requestHandler); 

  期间判断我们的连接状态,接着进入getExchanger(url).connect(url, handler);这里跟服务端的原理是一样的,可以翻过去看看,然后也是通过NettyTransporter去创建一个连接。

  然后我们回到RegistryDirectory<T>里面的toInvokers(List<URL>) 方法里:

// 缓存key为没有合并消费端参数的URL,不管消费端如何合并,如果服务端URL发生变法,则重新refer 
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference 
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); 
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { 
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); 
                return; 
}//把 invoker 放入成员变量中,后续需要去获取 
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; 
this.urlInvokerMap = newUrlInvokerMap; 
try { 
    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 
} catch (Exception e) { 
     logger.warn("destroyUnusedInvokers error. ", e); 
}

  这里最后获得的 invoker是:

  而且这里是经过包装的 DubboInvoker .到这里,消费端的建立会话跟获取invoker的过程都结束了,下面应该进入数据交互,之前服务端发布的时候获取到的 invoker对象是 JavassistProxyFactory.在代理调用方法的时候会调用 代理的sayHello方法,在动态生成的代理类字节码上我们可以看到他调用的是 handler.invoke,也就是 InvocationHandler 里面的invoke 方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
        String methodName = method.getName(); 
        Class<?>[] parameterTypes = method.getParameterTypes(); 
        if (method.getDeclaringClass() == Object.class) { 
            return method.invoke(invoker, args); 
        } 
        if ("toString".equals(methodName) && parameterTypes.length == 0) { 
            return invoker.toString(); 
        } 
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) { 
            return invoker.hashCode(); 
        } 
        if ("equals".equals(methodName) && parameterTypes.length == 1) { 
            return invoker.equals(args[0]); 
        } 
        return invoker.invoke(new RpcInvocation(method, args)).recreate(); 
    } 

  最后调用  invoker.invoke(new RpcInvocation(method, args)).recreate(); 这里需要知道 invoke是什么,其实就是之前在ReferenceConfig类里面的createProxy方法内:

invoker = refprotocol.refer(interfaceClass, urls.get(0)); 

  其实就是这个 InvokerInvocationHandler 对象,接着我们进入 MockClusterInvoker的invoke方法:

    public Result invoke(Invocation invocation) throws RpcException { 
        Result result = null; 
//*******关键,这个地方会从url里面回去 mock的key,也就是之前配置文件配置的moke 
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")) { 
            //no mock 没有 
            result = this.invoker.invoke(invocation); 
        // force 表示强制调用,不仅仅是请求失败的情况采取调用 
        } else if (value.startsWith("force")) { 
            if (logger.isWarnEnabled()) { 
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); 
            } 
            //force:direct mock 
            result = doMockInvoke(invocation, null); 
        } else {//所以配置了mock一般会进来这里 
            //fail-mock 
            try { 
                result = this.invoker.invoke(invocation); 
            } catch (RpcException e) { 
                if (e.isBiz()) { 
                    throw e; 
                } else { 
                    if (logger.isWarnEnabled()) { 
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); 
                    }//原来mock机制在这里 
                    result = doMockInvoke(invocation, e); 
                } 
            } 
        } 
        return result; 
    }

  由于我是配置了mock的,所以debug可以看到:这里拿到的value就是我们对应的类名

  所以到了 result = this.invoker.invoke(invocation);这个result 就是我们需要的结果,而这个invoker就是他的默认的服务降级的配置了:是他 FailoverClusterInvoker

  为什么是 Failover呢,因为 Cluster的默认拓展点就是这个类,而MockCluster仅仅是对其做了一个封装:进入他的invoke方法会发现 FailoverClusterInvoker对该方法并没有做实现,所以还是跟往常一样,进入父类的方法 AbstractClusterInvoker<T> 这个类的invoke:我们会发现里面有一个 LoadBalance的局部变量,而这个就是光网所提供的图里面的关于负载那块相关的,通过负载便会返回一个具体的invoker去调用

    public Result invoke(final Invocation invocation) throws RpcException { 
 
        checkWhetherDestroyed(); 
        // 做负载 
        LoadBalance loadbalance; 
 
        List<Invoker<T>> invokers = list(invocation); 
        if (invokers != null && invokers.size() > 0) { 
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() 
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); 
        } else { 
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); 
        } 
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); 
        return doInvoke(invocation, invokers, loadbalance); 
    } 

  通过 RegistryDiectory 去获取一个invoker的list。在 RegistryDiectory 里面的doList 就是通过之前的成员变量 methodInvokerMap 去拿的

   ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension,又发现拓展点,进 LoadBalance.class 去看可以看到默认实现是 @SPI(RandomLoadBalance.NAME) 随机负载算法。nice

  最后调用  doInvoke(invocation, invokers, loadbalance); 这里是继续进入 FailoverClusterInvoker 的doInvoke方法:该方法拿到了一个委托的Invoker(DelegateInvoker):

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 
        List<Invoker<T>> copyinvokers = invokers; 
        checkInvokers(copyinvokers, invocation); 
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 
        if (len <= 0) { 
            len = 1; 
        } 
        // retry loop. 
        RpcException le = null; // last exception. 
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. 
        Set<String> providers = new HashSet<String>(len); 
        for (int i = 0; i < len; i++) { 
            // 
            // 
            if (i > 0) { 
                checkWhetherDestroyed(); 
                copyinvokers = list(invocation); 
                // 
                checkInvokers(copyinvokers, invocation); 
            }//做负载 获得一个具体的 invoker 
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); 
            invoked.add(invoker); 
            RpcContext.getContext().setInvokers((List) invoked); 
            try {//调用该方法获取结果 
                Result result = invoker.invoke(invocation); 
                if (le != null && logger.isWarnEnabled()) { 
                    logger.warn("Although retry the method " + invocation.getMethodName() 
                            + " in the service " + getInterface().getName() 
                            + " was successful by the provider " + invoker.getUrl().getAddress() 
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size() 
                            + ") from the registry " + directory.getUrl().getAddress() 
                            + " on the consumer " + NetUtils.getLocalHost() 
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: " 
                            + le.getMessage(), le); 
                } 
                return result; 
            } catch (RpcException e) { 
                if (e.isBiz()) { // biz exception. 
                    throw e; 
                } 
                le = e; 
            } catch (Throwable e) { 
                le = new RpcException(e.getMessage(), e); 
            } finally { 
                providers.add(invoker.getUrl().getAddress()); 
            } 
        } 
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " 
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress() 
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " 
                + Version.getVersion() + ". Last error is: " 
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); 
    } 

  因为之前在toInvokers方法中进行转换的时候 是一个DubboInvoker 进行了层层包装,我们又发现 里面 并没有实现这个方法,所以继续找父类 AbstractInvoker 这个里面可以看到invoke方法,该方法内最终是调用了一个 doInvoke 方法,而这个方法是具体调到 Dubboinvoker里面的方法:

protected Result doInvoke(final Invocation invocation) throws Throwable { 
        RpcInvocation inv = (RpcInvocation) invocation; 
        final String methodName = RpcUtils.getMethodName(invocation); 
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 
        inv.setAttachment(Constants.VERSION_KEY, version); 
 
        ExchangeClient currentClient; 
        if (clients.length == 1) { 
            currentClient = clients[0]; 
        } else { 
            currentClient = clients[index.getAndIncrement() % clients.length]; 
        } 
        try {// 判断当前的通信方式,是单向啊异步啊什么的 
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); 
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); 
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 
            if (isOneway) { 
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 
                currentClient.send(inv, isSent); 
                RpcContext.getContext().setFuture(null); 
                return new RpcResult(); 
            } else if (isAsync) { 
                ResponseFuture future = currentClient.request(inv, timeout); 
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 
                return new RpcResult(); 
            } else { 
                RpcContext.getContext().setFuture(null); 
                return (Result) currentClient.request(inv, timeout).get(); 
            } 
        } catch (TimeoutException e) { 
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 
        } catch (RemotingException e) { 
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 
        } 
    } 

  最后调用 currentClient.request(inv, timeout).get(),获取结果,这个currentClient 就是创建invoker里面的NettyClient,

    public ResponseFuture request(Object request, int timeout) throws RemotingException { 
        if (closed) { 
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); 
        } 
        // create request.组装请求,跟手写RPC框架一样的 
        Request req = new Request(); 
        req.setVersion("2.0.0"); 
        req.setTwoWay(true); 
        req.setData(request); 
        DefaultFuture future = new DefaultFuture(channel, req, timeout); 
        try {//通过这个去发送请求 
            channel.send(req); 
        } catch (RemotingException e) { 
            future.cancel(); 
            throw e; 
        } 
        return future; 
    } 

  channel.send(req);这个就是最后的发送方法了,通过 HeaderExchangeChannel 类的 send方法 最后进入 NettyChannel,利用netty做一个发送。这就是整个通信过程

  需要异步调用的话在 reference里面配置 <dubbo:method name="goodbye" async="true"/>

上一张官网的时序图:

消费端调用过程流程图


发布评论
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

dubbo(2.5.3)源码之服务发布与注册知识解答
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。