IT序号网

dubbo(2.5.3)源码之服务发布与注册

leader 2021年06月07日 大数据 443 0

服务端发布流程:

  dubbo 是基于 spring 配置来实现服务的发布的,对于dubbo 配置文件中看到的<dubbo:service>等标签都是服务发布的重要配置 ,对于这些提供可配置化的支持,spring功不可没,spring提供了可拓展的Schema的支持。也就是自定义标签的使用,这样 dubbo基于这样的规范实现自己的拓展,以至于我们在项目中可以使用dubbo所定义的标签。在实现这个拓展的前提是要把spring的Core包加入项目中。具体的加载在Spring源码深度解析一书中有详细介绍,这里简单提一下拓展自定义标签的大致步骤:

  1. NamespaceHandler: 注册一堆 BeanDefinitionParser,利用他们来进行解析,源码中 com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler extends NamespaceHandlerSupport,这里的DubboNamespaceHandler就是dubbo所实现的命名空间拓展:

public class DubboNamespaceHandler extends NamespaceHandlerSupport { 
 
	static { 
		Version.checkDuplicate(DubboNamespaceHandler.class); 
	} 
 
	public void init() { 
	    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); 
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); 
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); 
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); 
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); 
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); 
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); 
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); 
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); 
    } 
 
}

  可以看到在 init 里面 利用registerBeanDefinitionParser注册了一系列的标签,有我们很熟悉的protocol,service,registry等。这个里面主要做了一件事,把不同的配置分别转化成spring容器中的bean对象

  2. BeanDefinitionParser:用于解析每个 element 的内容,com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser implements BeanDefinitionParser 中DubboBeanDefinitionParser就充当了这么一个角色。

  3. Spring 默认会加载 jar 包下的 META-INF/spring.handlers 文件寻找对应的 NamespaceHandler。在dubbo源码中对应路径我们真的发现了这个文件,以下是里面的内容:

http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler 

  还需要一个 spring.schemas 文件:

http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd 

  在服务启动后会注册一个 ServiceBean ,该类实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware 五个接口,分别是初始化bean的时候提供一个机制,启动后会在bean加载完调用afterPropertiesSet(),bean销毁后的机制,上下文机制 加载AppicationContext,事件的监听,获取bean的当前属性。

  所以在解析 dubbo 配置文件的时候 会通过 ServiceBean 里的 afterPropertiesSet()这个方法 。 这个方法里面再次判断 哪些标签及属性是否解析,判断通过最后调用了一个:

if (! isDelay()) { 
      export(); 
} 

  判断是否配置了delay属性,延迟加载,在<dubbo:provider delay="10" /> 中配置。这个 export() 就是发布服务的入口。然后到 ServiceConfig<T> 类里面 export() 方法,继而进入doExportUrls() 方法进行下一步发布跟注册:

private void doExportUrls() { 
        List<URL> registryURLs = loadRegistries(true); // 获取注册中心的配置地址,可能有多个 
        for (ProtocolConfig protocolConfig : protocols) { // 是否支持多协议发布 
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);// 协议 注册中心地址 
        } 
    }

  其中protocols是:<dubbo:protocol name="dubbo" port="20880" id="dubbo" /> 地址就是配置的注册中心地址:[registry://192.168.254.135:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&owner=wuzz&pid=53004&registry=zookeeper&timestamp=1543566568738]。其中我在dubbo配置文件里的配置是这样的:

<!--提供方信息 -->
 <dubbo:application name="dubbo-server" owner="wuzz" />
<!--注册中心 --> <dubbo:registry id="zk1" address="zookeeper://192.168.254.135:2181" /> <!-- 协议 --> <dubbo:protocol port="20880" name="dubbo" />

  然后进入 doExportUrlsFor1Protocol 方法:先是通过各种判断获取一个合法的IP地址,也就是我们上篇博文中提到的主机绑定,然后通过protocolConfig.getPort();获取端口并进行一系列判断,最后创建一个map:

Map<String, String> map = new HashMap<String, String>(); 用这个map来存放所有的参数,就是dubbo中配置的所有参数属性。dubbo整个调用链路是基于URL驱动的,在绑定好所有的参数以后:

 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); 

  把这个map转化成了一个URL地址:

dubbo://192.168.254.1:20880/com.gupaoedu.dubbo.IGpHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.gupaoedu.dubbo.IGpHello

&methods=sayHello&owner=wuzz&pid=50344&side=provider&timestamp=1543567388046

  这个地址是否似曾相识呢?  没错这个就是最后注册到注册中心节点的地址值。

  接下去就是服务的核心发布过程:

//判断注册中心是否为空 
if (registryURLs != null && registryURLs.size() > 0 
                        && url.getParameter("register", true)) { 
                    for (URL registryURL : registryURLs) { 
                        //封装了两个URL 
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); 
                        URL monitorUrl = loadMonitor(registryURL); 
                        if (monitorUrl != null) { 
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 
                        } 
                        if (logger.isInfoEnabled()) { 
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 
                        } 
                        // 执行远程调用过成 
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 
                        // 执行服务发布 
                        Exporter<?> exporter = protocol.export(invoker); 
                        // 暴露服务或取消暴露 
                        exporters.add(exporter); 
                    } 
                } else { 
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 
 
                    Exporter<?> exporter = protocol.export(invoker); 
                    exporters.add(exporter); 
                } 

  proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));这段代码是获得一个远程服务的代理对象,可以发现,这个proxyFactory是一个拓展点:默认实现是@SPI("javassist")。而且这个拓展点会生成一个自适应适配器ProxyFactory$Adaptive

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); 

  看一下 ProxyFactory$Adaptive :

import com.alibaba.dubbo.common.extension.ExtensionLoader; 
 
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { 
	public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, 
			com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException  { 
		if (arg2 == null) 
			throw new IllegalArgumentException("url == null"); 
		com.alibaba.dubbo.common.URL url = arg2; 
		String extName = url.getParameter("proxy", "javassist"); 
		if (extName == null) 
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" 
					+ url.toString() + ") use keys([proxy])"); 
		com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader 
				.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 
		return extension.getInvoker(arg0, arg1, arg2); 
	} 
 
	public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException  { 
		if (arg0 == null) 
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 
		if (arg0.getUrl() == null) 
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 
		com.alibaba.dubbo.common.URL url = arg0.getUrl(); 
		String extName = url.getParameter("proxy", "javassist"); 
		if (extName == null) 
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" 
					+ url.toString() + ") use keys([proxy])"); 
		com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader 
				.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 
		return extension.getProxy(arg0); 
	} 
} 

  程序会调用ProxyFactory$Adaptive.getInvoker 方法,也是跟之前一样  。从URL中获取 extName,然后获得指定的拓展点 ,默认使用 javassist实现,发现该拓展点有3个实现:

stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper 
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory 
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory 

  默认使用 com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory,最终调用 JavassistProxyFactory.getInvoker:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 
        // TODO Wrapper类不能正确处理带$的类名 //这里主要做了一系列的封装 
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 
        return new AbstractProxyInvoker<T>(proxy, type, url) { 
            @Override 
            protected Object doInvoke(T proxy, String methodName,  
                                      Class<?>[] parameterTypes,  
                                      Object[] arguments) throws Throwable { 
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 
            }//所以Invoker<?> invoker = proxyFactory.getInvoker();拿到的实例就是他,在处理消息的时候要调用 
        }; 
    } 

  我们进入 Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);然后会发现一个 makeWrapper(c); 在这个方法里面是创建 Wapper的一些装饰。里面主要是组装一个动态的代理类,最后返回一个实例。可以看到里面有个 ClassGenerator cc = ClassGenerator.newInstance(cl); 这个是字节码生成工具,里面组装了很多东西,有一个最核心的是invokeMethod 方法是非常重要的,单独拉出来看看:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) 
			throws java.lang.reflect.InvocationTargetException { 
		com.gupaoedu.dubbo.IGpHello w; 
		try { 
			w = ((com.gupaoedu.dubbo.IGpHello) $1); 
		} catch (Throwable e) { 
			throw new IllegalArgumentException(e); 
		} 
		try { 
			if ("sayHello".equals($2) && $3.length == 1) { 
				return ($w) w.sayHello((java.lang.String) $4[0]); 
			} 
		} catch (Throwable e) { 
			throw new java.lang.reflect.InvocationTargetException(e); 
		} 
		throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException( 
				"Not found method \"" + $2 + "\" in class com.gupaoedu.dubbo.IGpHello."); 
	} 

  这是服务端返回的动态字节码 像$1这样的可能是 Object对象进行对IGpHello的转换。 最后 makeWrapper 返回 (Wrapper) cc.toClass().newInstance();

  接着:

return new AbstractProxyInvoker<T>(proxy, type, url) { 
            @Override 
            protected Object doInvoke(T proxy, String methodName,  
                                      Class<?>[] parameterTypes,  
                                      Object[] arguments) throws Throwable { 
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 
            } 
        }; 

  上上述代码最后返回的是wrapper.invokeMethod,也就是服务端返回的动态字节码里面的.invokeMethod 方法,最后触发 ($w) w.sayHello((java.lang.String) $4[0]); 其实Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));这段代码中 invoker:

  服务的发布---protocol.export(invoker)

  protocol 这个地方,其实并不是直接调用 DubboProtocol 协议的 export, 大家跟我看看 protocol 这个属性是在哪里实例化的?以及实例化的代码是什么?

private static final Protocol protocol =  
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); //Protocol$Adaptive 

  其实这个protocol 就是上篇博客中我们所提到的SPI拓展机制中关于 getAdaptiveExtension 获取到的一个动态自适应拓展点的类 :Protocol$Adaptive 调用的 export方法也是这个类中的方法:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) 
			throws com.alibaba.dubbo.rpc.RpcException { 
		if (arg0 == null)// 判断参数是否为空 
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 
		if (arg0.getUrl() == null) // 判断请求地址是否为空 
			throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 
		com.alibaba.dubbo.common.URL url = arg0.getUrl(); 
                // 获取到URL,并且从URL中获取请求的协议,默认为dubbo 
		String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); 
		if (extName == null) 
			throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" 
					+ url.toString() + ") use keys([protocol])"); 
                // 获取指定的协议拓展点实现 
		com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader 
				.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 
                //发布 
		return extension.export(arg0); 
	} 

  Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 这段代码是获取制动名称的拓展点实现,而extName 在该场景下是  registry ,所以这里利用自使用的适配器去运行动态的加载指定拓展点,这也是使用适配器的目的。非常的灵活。可以从debug中看到:

  所以 extension 就是等于 RegistryProtocol.最后调用了  extension.export(arg0); 其实就是 RegistryProtocol 类的export 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
        //export invoker 本地发布服务(启动 netty) 
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 
        //registry provider 
        final Registry registry = getRegistry(originInvoker); 
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 
        registry.register(registedProviderUrl); 
        //  订阅 override 数据 
        //  FIXME 提供者订阅时,会影响同一 JVM 即暴露服务,又引用同一服务的的场景,因为subscribed 以服务名为缓存的 key,导致订阅信息覆盖。 
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 
        // 保证每次 export 都返回一个新的 exporter 实例 
        return new Exporter<T>() { 
            public Invoker<T> getInvoker() { 
                return exporter.getInvoker(); 
            } 
 
            public void unexport() { 
                try { 
                    exporter.unexport(); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    registry.unregister(registedProviderUrl); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    overrideListeners.remove(overrideSubscribeUrl); 
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
            } 
        }; 
    } 

  先来看一下这个本地服务发布的流程 : final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 从缓存中获取 key String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { // 双重检查锁 exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }

  这里检查完exporter 判断为 null 的时候会新建一个 exporter 并且会调用 protocol.export(invokerDelegete), originInvoker),在加载扩展点的时候,有一个 injectExtension 方法,针对已经加载的扩展点中的扩展点属性进行依赖注入。而这个protocol是通过set方法注入来的。所以这里会产生一个自适应的适配器,就是因此我们知道 protocol 是一个自适应扩展点,Protocol$Adaptive,然后调用这个自适应扩展点中的 export 方法,这个时候传入的协议地址应该是:

  可以看到这里的协议变成了dubbo,然后在获取Protocol 拓展点实现的时候找到了 com.alibaba.dubbo.rpc.Protocol 文件内有如下内容:

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper 
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 

  紧接着在动态生成的 Protocol$Adaptive的export方法中的  ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);这个方法内createExtension会对DubboProtocol进行一个包装:

    private T createExtension(String name) { 
        Class<?> clazz = getExtensionClasses().get(name); 
        if (clazz == null) { 
            throw findException(name); 
        } 
        try { 
            T instance = (T) EXTENSION_INSTANCES.get(clazz); 
            if (instance == null) { 
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); 
                instance = (T) EXTENSION_INSTANCES.get(clazz); 
            } 
            injectExtension(instance); 
            Set<Class<?>> wrapperClasses = cachedWrapperClasses; 
            if (wrapperClasses != null && wrapperClasses.size() > 0) { 
                for (Class<?> wrapperClass : wrapperClasses) { 
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); 
                } 
            } 
            return instance; 
        } catch (Throwable t) { 
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " + 
                    type + ")  could not be instantiated: " + t.getMessage(), t); 
        } 
    } 

  cachedWrapperClasses 在loadFile方法内,也就是在寻找3哥指定文件目录下的拓展点的时候加载的。

  因此在 Protocol$Adaptive.export 方 法 中 ,ExtensionLoader.getExtension(Protocol.class).getExtension。应该就是基于 DubboProtocol 协议去发布服务了吗?如果是这样,那你们太单纯了。这里并不是获得一个单纯的 DubboProtocol 扩展点,而是会通过 Wrapper对 Protocol 进 行 装 饰 , 装 饰 器 分 别 为 : ProtocolFilterWrapper/ProtocolListenerWrapper; 至于 MockProtocol 为什么不在装饰器里面呢?大家再回想一下我们在看 ExtensionLoader.loadFile 这段代码的时候,有一个判断,装饰器必须要具备一个带有 Protocol 的构造方法,如下:

public ProtocolFilterWrapper(Protocol protocol){ 
     if (protocol == null) { 
         throw new IllegalArgumentException("protocol == null"); 
     } 
     this.protocol = protocol; 
} 

  截止到这里,我们已经知道,Protocol$Adaptive 里面的 export 方法,会调用 ProtocolFilterWrapper 以及 ProtocolListenerWrapper 类的方法

  看看 ProtocolFilterWrapper 和 ProtocolListenerWrapper

ProtocolFilterWrapper:

  这个类非常重要,dubbo 机制里面日志记录、超时等等功能都是在这一部分实现的这个类有 3 个特点,

    第一它有一个参数为 Protocol protocol 的构造函数;
    第二,它实现了 Protocol 接口;
    第三,它使用责任链模式,对 export 和 refer 函数进行了封装

  其中export:

    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 
            return protocol.export(invoker); 
        } 
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 
    } 

  buildInvokerChain://buildInvokerChain 函数:它读取所有的 filter 类,利用这些类封装invoker

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { 
        Invoker<T> last = invoker;
// 自动激活扩展点,根据条件获取当前扩展可自动激活的实现 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }

  其中 fileter 的拓展点从jar包上来看 如下:

echo=com.alibaba.dubbo.rpc.filter.EchoFilter 
generic=com.alibaba.dubbo.rpc.filter.GenericFilter 
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter 
token=com.alibaba.dubbo.rpc.filter.TokenFilter 
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter 
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter 
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter 
context=com.alibaba.dubbo.rpc.filter.ContextFilter 
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter 
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter 
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter 
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter 
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter 
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter 
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter 
validation=com.alibaba.dubbo.validation.filter.ValidationFilter 
cache=com.alibaba.dubbo.cache.filter.CacheFilter 
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter 
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter 

  这其中涉及到很多功能,包括权限验证、异常、超时等等,当然可以预计计算调用时间等等应该也是在这其中的某个类实现的;这里我们可以看到 export 和 refer 过程都会被 filter 过滤

ProtocolListenerWrapper:

  在这里我们可以看到 export 和 refer 分别对应了不同的 Wrapper;export是对应的 ListenerExporterWrapper。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 
            return protocol.export(invoker); 
        } 
        return new ListenerExporterWrapper<T>(protocol.export(invoker),  
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) 
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); 
    } 

  上诉代码中的 protocol  所对应的就是 DubboProtocol,调用他的 export方法,继而调用 openServer开启服务:

private void openServer(URL url) { 
        // find server. 
        String key = url.getAddress(); 
        //client 也可以暴露一个只有 server 可以调用的服务 
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); 
        if (isServer) { 
        	ExchangeServer server = serverMap.get(key); 
        	if (server == null) { // 没有的话就是创建服务 
        		serverMap.put(key, createServer(url)); 
        	} else { 
        		//server 支持 reset,配合 override 功能使用 
        		server.reset(url); 
        	} 
        } 
    }

  createServer:创建服务,开启心跳检测,默认使用 netty。组装 url

private ExchangeServer createServer(URL url) { 
        //默认开启 server 关闭时发送 readonly 事件 
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 
        // heartbeat 心跳连接 
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 
 
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 
            throw new RpcException("Unsupported server type: " + str + ", url: " + url); 
 
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 
        ExchangeServer server; 
        try { 
            server = Exchangers.bind(url, requestHandler); 
        } catch (RemotingException e) { 
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 
        } 
        str = url.getParameter(Constants.CLIENT_KEY); 
        if (str != null && str.length() > 0) { 
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 
            if (!supportedTypes.contains(str)) { 
                throw new RpcException("Unsupported client type: " + str); 
            } 
        } 
        return server; 
    } 

  然后进入server = Exchangers.bind(url, requestHandler);

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
        if (url == null) { 
            throw new IllegalArgumentException("url == null"); 
        } 
        if (handler == null) { 
            throw new IllegalArgumentException("handler == null"); 
        } 
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 
        return getExchanger(url).bind(url, handler); 
 } 
 
public static Exchanger getExchanger(URL url) { 
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); 
        return getExchanger(type); 
} 
 
public static Exchanger getExchanger(String type) { 
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type) 
} 

  可以看出这里又是一个获取 Exchanger 拓展点,默认是 HeaderExchanger.NAME,调用他的 HeaderExchanger.bind方法,在调用 HeaderExchanger.bind 方 法 的 时 候 , 是 先 new 一 个HeaderExchangeServer. 这个 server 是干嘛呢? 是对当前这个连接去建立心跳机制

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
       return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 
} 

  这里通过 Transporters 的bind 方法:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 
        if (url == null) { 
            throw new IllegalArgumentException("url == null"); 
        } 
        if (handlers == null || handlers.length == 0) { 
            throw new IllegalArgumentException("handlers == null"); 
        } 
        ChannelHandler handler; 
        if (handlers.length == 1) { 
            handler = handlers[0]; 
        } else { 
            handler = new ChannelHandlerDispatcher(handlers); 
        } 
        return getTransporter().bind(url, handler); 
    } 
public static Transporter getTransporter() { 
      return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); 
}

  最后这里又是一个自适应适配器去选择合适的 getTransporter,这里Transporter的默认实现是 netty,可以在SPI注解上看出的:@SPI("netty")。通过 NettyTranport 创建基于 Netty 的 server 服务

   官网有个服务发布的流程图

 服务注册的过程:

  前面,我们已经知道,基于 spring 这个解析入口,到发布服务的过程,接着基于 DubboProtocol 去发布,最终调用 Netty 的 api 创建了一个NettyServer。服务注册的话就需要通过刚刚发布服务的时候通过 
RegistryProtocol.export方法去看看:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
        //export invoker 本地发布服务(启动 netty) 
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 
        //registry provider 
        final Registry registry = getRegistry(originInvoker); 
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 
        registry.register(registedProviderUrl); 
        //  订阅 override 数据 
        //  FIXME 提供者订阅时,会影响同一 JVM 即暴露服务,又引用同一服务的的场景,因为subscribed 以服务名为缓存的 key,导致订阅信息覆盖。 
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 
        // 保证每次 export 都返回一个新的 exporter 实例 
        return new Exporter<T>() { 
            public Invoker<T> getInvoker() { 
                return exporter.getInvoker(); 
            } 
 
            public void unexport() { 
                try { 
                    exporter.unexport(); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    registry.unregister(registedProviderUrl); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    overrideListeners.remove(overrideSubscribeUrl); 
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
            } 
        }; 
    } 

  getRegistry:这个方法是 invoker 的地址获取 registry 实例

private Registry getRegistry(final Invoker<?> originInvoker){ 
        //获得registry://192.168.254.135:2181 的协议地址 
        URL registryUrl = originInvoker.getUrl(); 
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { 
            // 得到 zookeeper 的协议地址 
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); 
            // registryUrl 就会变成了 zookeeper://192.168.11.156 
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); 
        } 
        return registryFactory.getRegistry(registryUrl); 
    } 

  这段代码很明显了,通过前面这段代码的分析,其实就是把 registry 的协议头改成服务提供者配置的协议地址,也就是我们配置的<dubbo:registry  address="zookeeper://192.168.254.135:2181" />然后 registryFactory.getRegistry 的目的,就是通过协议地址匹配到对应的注册中心。

  registryFactory :这个又是一个拓展点,且方法层面有@Adaptive注解,所以会创建一个自适应的适配器类 RegistryFactory$Adaptive

@SPI("dubbo") 
public interface RegistryFactory { 
   /** 
  * 连接注册中心. 
  *  
  * 连接注册中心需处理契约: 
  * 1. 当设置 check=false 时表示不检查连接,否则在连接不上时抛出异常。 
  * 2. 支持 URL 上的 username:password 权限认证。 
  * 3. 支持 backup=10.20.153.10 备选注册中心集群地址。 
  * 4. 支持 file=registry.cache 本地磁盘文件缓存。 
  * 5. 支持 timeout=1000 请求超时设置。 
  * 6. 支持 session=60000 会话超时或过期设置。  
  * @param url 注册中心地址,不允许为空 
  * @return 注册中心引用,总不返回空 
  */ 
    @Adaptive({"protocol"}) 
    Registry getRegistry(URL url); 
} 

  这里生成了适配器类以后,会跟之前一样 得到一个 extName ,这里是zookeeper,在通过这个名称获取指定的拓展点实现,通过那3个指定路径可以发现一个文件 com.alibaba.dubbo.registry.redis.RegistryFactory里面有3个实现:

dubbo=com.alibaba.dubbo.registry.dubbo.DubboRegistryFactory 
multicast=com.alibaba.dubbo.registry.multicast.MulticastRegistryFactory 
zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory 
redis=com.alibaba.dubbo.registry.redis.RedisRegistryFactory 

  最后得到  ZookeeperRegistryFactory。看看ZookeeperRegistryFactory:

public class ZookeeperRegistryFactory extends AbstractRegistryFactory { 
	 
	private ZookeeperTransporter zookeeperTransporter; 
 
        public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { 
		this.zookeeperTransporter = zookeeperTransporter; 
	} 
 
	public Registry createRegistry(URL url) { 
        return new ZookeeperRegistry(url, zookeeperTransporter); 
        } 
 
} 

  这个方法中并没有 getRegistry 方法,而是在父类 AbstractRegistryFactory:

public Registry getRegistry(URL url) { 
    	url = url.setPath(RegistryService.class.getName()) 
    			.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 
    			.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 
    	String key = url.toServiceString(); 
        //锁定注册中心获取过程,保证注册中心单一实例 
        LOCK.lock(); 
        try { 
            Registry registry = REGISTRIES.get(key); 
            if (registry != null) { 
                return registry; 
            } 
            registry = createRegistry(url); 
            if (registry == null) { 
                throw new IllegalStateException("Can not create registry " + url); 
            } 
            REGISTRIES.put(key, registry); 
            return registry; 
        } finally { 
            //      释放锁 
            LOCK.unlock(); 
        } 
    } 

  createRegistry:创建一个注册中心,这个是一个抽象方法,具体的实现在对应的子类实例中实现的,在 ZookeeperRegistryFactory 中

public Registry createRegistry(URL url) { 
        return new ZookeeperRegistry(url, zookeeperTransporter); 
} 

  这里的 zookeeperTransporter 又是一个拓展点,默认是 ZkclientZookeeperTransporter

  这里调用 ZookeeperRegistry 去调用注册方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 
        super(url); 
        if (url.isAnyHost()) { 
    		throw new IllegalStateException("registry address == null"); 
    	} 
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); 
        if (! group.startsWith(Constants.PATH_SEPARATOR)) { 
            group = Constants.PATH_SEPARATOR + group; 
        } 
        this.root = group; //设置根节点
// 建立连接 zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }

  我们对于 getRegistry 得出了一个结论,根据当前注册中心的配置信息,最后new 了一个ZookeeperRegistry ,通过 zkClient 获得一个匹配的注册中心,继续通过registry.register(registedProviderUrl);继续往下分析,会调用 registry.register 去将 dubbo://的协议地址注册到zookeeper 上这个方法会调用 FailbackRegistry 类中的 register. 为什么呢?因为ZookeeperRegistry 这个类中并没有 register 这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的 register 方法。所以我们可以直接定位到 FailbackRegistry 这个类中的 register 方法中:

    public void register(URL url) { 
        super.register(url); 
        failedRegistered.remove(url); 
        failedUnregistered.remove(url); 
        try { 
            // 向服务器端发送注册请求 
            doRegister(url); 
        } catch (Exception e) { 
            Throwable t = e; 
 
            // 如果开启了启动时检测,则直接抛出异常 
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 
                    && url.getParameter(Constants.CHECK_KEY, true) 
                    && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 
            boolean skipFailback = t instanceof SkipFailbackWrapperException; 
            if (check || skipFailback) { 
                if(skipFailback) { 
                    t = t.getCause(); 
                } 
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 
            } else { 
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 
            } 
 
            // 将失败的注册请求记录到失败列表,定时重试 
            failedRegistered.add(url); 
        } 
    } 

  最后看看 ZookeeperRegistry.doRegister:

protected void doRegister(URL url) { 
        try {//调用 zkclient.create 在 zookeeper 中创建一个节点。 
        	zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 
        } catch (Throwable e) { 
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 
        } 
    } 

  RegistryProtocol.export 这个方法中后续的代码就是去对服务提供端去注册一个 zookeeper 监听,当监听发生变化的时候,服务端做相应的处理。

  我们可以发现最核心的2个方法:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 
Exporter<?> exporter = protocol.export(invoker); 

  跟我们之前手写的RPC框架整合Zookeeper的时候也有相同思想的影子。无非就是获取到需要发布的服务代理,把他发布出去,无非这里是更加复杂,使得系统更加全面。

服务端接收到消息以后处理过程:

  在服务发布的时候有一个 Exchanger 的拓展点相关的,会进入默认拓展点 HeaderExchanger 的 bind 方法:

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 
} 

  这里 new DecodeHandler(new HeaderExchangeHandler(handler)) 是干嘛的呢 ? 这里的 handler 是前面可以看到 ExchangeHandlerAdapter ,然后通过调用NettyTransporter  的bind ,紧接着 new 了一个 Nettyserver:在里面又进行了一层包装

public NettyServer(URL url, ChannelHandler handler) throws RemotingException { 
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 
    } 

  再看看这个 warp 是做了什么:

 public static ChannelHandler wrap(ChannelHandler handler, URL url) { 
        return ChannelHandlers.getInstance().wrapInternal(handler, url); 
} 
 
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { 
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) 
                .getAdaptiveExtension().dispatch(handler, url))); 
} 

  最后又包装了个 MultiMessageHandler,这里面又获取了一个自适应适配器Dispatcher$Adaptive ,默认 AllDispatcher。

    所以处理链为:MultiMessageHandler: ->HeartbeatHandler:->AllChannelHandler:->DecodeHandler:->HeaderExchangeHandler->ExchangeHandlerAdapter

  Netty最后处理消息的Handler 是 NettyHandler 里面的 messageReceived(ChannelHandlerContext, MessageEvent)方法去处理:

 @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 
        try { 
            handler.received(channel, e.getMessage()); 
        } finally { 
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 
        } 
    } 

  然后进入我们上面提到的处理链:

  MultiMessageHandler: 复合消息处理

  HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应

  AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理

  DecodeHandler:业务解码处理器,反序列化

  HeaderExchangeHandler 里面:

public void received(Channel channel, Object message) throws RemotingException { 
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); 
        try { 
            if (message instanceof Request) {//进入这里 
                // handle request. 
                Request request = (Request) message; 
                if (request.isEvent()) { 
                    handlerEvent(channel, request); 
                } else { 
                    if (request.isTwoWay()) {//双向请求/进入这里 
                        Response response = handleRequest(exchangeChannel, request); 
                        channel.send(response); 
                    } else {// 单向 
                        handler.received(exchangeChannel, request.getData()); 
                    } 
                } 
       。。。。。。。。。 
    } 

  handleRequest(exchangeChannel, request);进行请求的验证处理,最后进入handler.reply(channel, msg);

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { 
        Response res = new Response(req.getId(), req.getVersion()); 
        if (req.isBroken()) { 
            Object data = req.getData(); 
 
            String msg; 
            if (data == null) msg = null; 
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); 
            else msg = data.toString(); 
            res.setErrorMessage("Fail to decode request due to: " + msg); 
            res.setStatus(Response.BAD_REQUEST); 
 
            return res; 
        } 
        // find handler by message class. 
        Object msg = req.getData(); 
        try { 
            // handle data. 
            Object result = handler.reply(channel, msg); 
            res.setStatus(Response.OK); 
            res.setResult(result); 
        } catch (Throwable e) { 
            res.setStatus(Response.SERVICE_ERROR); 
            res.setErrorMessage(StringUtils.toString(e)); 
        } 
        return res; 
    } 

  handler.reply(channel, msg); 所调用的handler是我们最最原始传进来的没有包装过的handler:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { 

  就是 ExchangeHandlerAdapter 的reply 方法,最后调用了一个 invoke.invoke(),在发布服务的时候我们说过那个得到的invoke对象是JavassistProxyFactory 里面的getInvoker方法返回的 AbstractProxyInvoker,所以进入该类的invoke方法:

public Result invoke(Invocation invocation) throws RpcException { 
        try { 
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); 
        } catch (InvocationTargetException e) { 
            return new RpcResult(e.getTargetException()); 
        } catch (Throwable e) { 
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); 
        } 
    } 

  然后就是调用服务发布的时候的代理对象的方法,即最终要调用的实现方法。这样子消息就处理结束了。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!