org.apache.dubbo 服务注册原理源码分析:

  本文主要针对 dubbo-spring-boot-starter   2.7.7版本, 对应的 org.apache.dubbo 2.7.7 版本的源码。

  本文主要从以下几个点来分析:

  1. 前置知识点--Dubbo的SPI机制。
  2. 服务发布注册的入口。
  3. 服务发布源码分析
  4. 服务注册源码分析。

Dubbo的SPI机制:

  没接触过 Dubbo SPI 的小伙伴可以参考我之前写的  关于 Dubbo SPI的相关博文。虽然版本又差异,但是 SPI机制是一样的。这里简单做一下描述。

  关于 Dubbo 中提供的拓展点,可以参考官方文档的说明:IT虾米网

  扩展点的特征:在类级别标准`@SPI(RandomLoadBalance.NAME)`.其中,括号内的数据,表示当前扩展点的默认扩展点。另一个是@Adaptive

  • @SPI 表示当前这个接口是一个扩展点,可以实现自己的扩展实现,默认的扩展点是DubboProtocol。
  • @Adaptive  表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类

  例如:

@SPI(RandomLoadBalance.NAME) 
public interface LoadBalance { 
 
    @Adaptive("loadbalance") 
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; 
 
}

  在 Dubbo 中,拓展点分为以下三类:

  1. 指定名称的扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name")。
  2. 自适应扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()。
  3. 激活扩展点:ExtensionLoader.getExtensionLoader(Protocol.class).getActiveExtension。

  自定义负载均衡拓展点 :

  在 Dubbo 中,想要拓展拓展点,只需要以下几个步骤

1.创建拓展点实现类 (以LoadBalance为例):

public class WuzzLoadBalance extends AbstractLoadBalance { 
    @Override 
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { 
        return null; 
    } 
}

2.在指定文件夹下创建以拓展点全路径名(org.apache.dubbo.rpc.cluster.LoadBalance)的文件,Dubbo 中有多个目录都可以配置拓展点,这里用 resource/META-INF/dubbo/

wuzzLoadBalance=com.wuzz.demo.loadbalance.WuzzLoadBalance

3.搞个测试类进行测试:

  可以发现我们已经可以拿到我们自己的实现类了。那么他具体是怎么实现的呢?让我们继续往下看

Dubbo的拓展点源码:

  接下去我们来看看三种拓展点的具体实现:

指定名称的扩展点:以 ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("wuzzLoadBalance") 为例

  先来看前半段 : ExtensionLoader#getExtensionLoader

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { 
  //....省略判断逻辑 
       
  // 从缓存中获取该 loader 
  ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); 
  if (loader == null) { 
    // 如果从缓存中获取不到,则new 一个,并且保存起来 
    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); 
    // 然后 get 
    loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); 
  } 
  return loader; 
}

  该方法需要一个Class类型的参数,该参数表示希望加载的扩展点类型,该参数必须是接口,且该接口必须被@SPI注解注释,否则拒绝处理。检查通过之后首先会检查ExtensionLoader缓存中是否已经存在该扩展对应的ExtensionLoader,如果有则直接返回,否则创建一个新的ExtensionLoader负责加载该扩展实现,同时将其缓存起来。可以看到对于每一个扩展,dubbo中只会有一个对应的ExtensionLoader实例。进入到构造方法:

private ExtensionLoader(Class<?> type) { 
  this.type = type; 
  // 判断类型,很显然 这里会走后面 
  objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); 
}

  咱们姑且先当这个ExtensionLoader实例 已经存在缓存中,那么我们直接进入到 getExtension("wuzzLoadBalance")  这段代码流程中

public T getExtension(String name) { 
        if (StringUtils.isEmpty(name)) { 
            throw new IllegalArgumentException("Extension name == null"); 
        } 
        if ("true".equals(name)) {//如果name=true,表示返回一个默认的扩展点 
            return getDefaultExtension(); 
        } 
        final Holder<Object> holder = getOrCreateHolder(name);//缓存一下,如果实例已经加载过了,直接从缓存读取 
        Object instance = holder.get(); 
        if (instance == null) { 
            synchronized (holder) { 
                instance = holder.get(); 
                if (instance == null) { 
                    instance = createExtension(name);//根据名称创建实例 
                    holder.set(instance); 
                } 
            } 
        } 
        return (T) instance; 
}

  createExtension:仍然是根据名称创建扩展,getExtensionClasses() 加载指定路径下的所有文件

private T createExtension(String name) { 
        Class<?> clazz = getExtensionClasses().get(name); 
        if (clazz == null) { 
            throw findException(name); 
        } 
        try { 
            T instance = (T) EXTENSION_INSTANCES.get(clazz); 
            if (instance == null) { 
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); 
                instance = (T) EXTENSION_INSTANCES.get(clazz); 
            } 
            injectExtension(instance); 
            Set<Class<?>> wrapperClasses = cachedWrapperClasses; 
            if (CollectionUtils.isNotEmpty(wrapperClasses)) { 
                for (Class<?> wrapperClass : wrapperClasses) { 
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); 
                } 
            } 
            initExtension(instance); 
            return instance; 
        } catch (Throwable t) { 
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " + 
                    type + ") couldn't be instantiated: " + t.getMessage(), t); 
        } 
}

  这个方法内主要做了以下三件事

  1. 加载指定路径下的文件内容,保存到集合中
  2. 会对存在依赖注入的扩展点进行依赖注入
  3. 会对存在Wrapper类的扩展点,实现扩展点的包装

  先来看文件内容的加载流程:

private Map<String, Class<?>> getExtensionClasses() { 
        Map<String, Class<?>> classes = cachedClasses.get(); 
        if (classes == null) { 
            synchronized (cachedClasses) { 
                classes = cachedClasses.get(); 
                if (classes == null) { 
                    // 真正加载类的方法 
                    classes = loadExtensionClasses(); 
                    cachedClasses.set(classes); 
                } 
            } 
        } 
        return classes; 
}

  ExtensionLoader#loadExtensionClasses:

private Map<String, Class<?>> loadExtensionClasses() { 
        cacheDefaultExtensionName(); 
 
        Map<String, Class<?>> extensionClasses = new HashMap<>(); 
     // 这里循环加载, 
        for (LoadingStrategy strategy : strategies) { 
       // 这里调用两次,可以从下面的参数中得知可能是为了做兼容 
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); 
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages()); 
        } 
 
        return extensionClasses; 
}

  我们可以断点看看这个strategies :

  这里对应的三个实现实质上是分别对应的三个拓展点配置目录:

  1. META-INF/dubbo/internal/
  2. META-INF/dubbo/
  3. META-INF/services/

  接下去具体的加载细节就不去深挖了,我们只要知道,这里通过这 三个路径去把我们的拓展点加载出来并且缓存起来:

  这才使得我们 getExtension("wuzzLoadBalance") 能拿到我们自己的实现.

  我们还需要关注的就是拓展点的包装 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)),我们直接断点看一下:

  2.7.8 源码在此处有些许差别,但是最终也是如此包装。

  这里以 Protocol 为例,发现 cachedWrapperClasses 里面有3个 wrapper类,且返回的 instance 并不是一个 DubboProtocol 这么简单,经过了层层包装。那么为什么呢?我们来看一下 Protocol 拓展点文件:

  这里我们可以得出结论,在加载拓展点指定文件的时候,具有Wrapper 实现的时候,会将Wrapper 缓存到  cachedWrapperClasses 集合中,且会将这些拓展点进行包装。

自适应扩展点:

  什么叫自适应扩展点呢?我们先演示一个例子,在下面这个例子中,我们传入一个Protocol接口,它会返回一个AdaptiveProtocol。这个就叫自适应。

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

  我们可以看到 Protocol这个类的 export方法上面有一个注解@Adaptive。 这个就是一个自适应扩展点的标识。它可以修饰在类上,也可以修饰在方法上面。这两者有什么区别呢? 简单来说,放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码,来进行转发。 拿Protocol这个接口来说,它里面定义了export和refer两个抽象方法,这两个方法分别带有@Adaptive的标识,标识是一个自适应方法。 我们知道Protocol是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢? 取决于我们在使用dubbo的时候所配置的协议名称。而这里的方法层面的Adaptive就决定了当前这个方法会采用何种协议来发布服务。

  我们直接进入  ExtensionLoader#getAdaptiveExtension 获取自适应拓展点的源码流程:

public T getAdaptiveExtension() { 
     // 又是缓存中获取 
        Object instance = cachedAdaptiveInstance.get(); 
        if (instance == null) { 
            if (createAdaptiveInstanceError != null) { 
                throw new IllegalStateException("Failed to create adaptive instance: " + 
                        createAdaptiveInstanceError.toString(), 
                        createAdaptiveInstanceError); 
            } 
       // 双重检查锁 
            synchronized (cachedAdaptiveInstance) { 
                instance = cachedAdaptiveInstance.get(); 
                if (instance == null) { 
                    try { 
                        instance = createAdaptiveExtension(); 
                        cachedAdaptiveInstance.set(instance); 
                    } catch (Throwable t) { 
                        createAdaptiveInstanceError = t; 
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); 
                    } 
                } 
            } 
        } 
        return (T) instance; 
}

  这部分逻辑没有特殊的地方,无非就是缓存+双重检查。然后进入创建自适应拓展点的代码 : ExtensionLoader#createAdaptiveExtension,

  创建自适应拓展点:ExtensionLoader#createAdaptiveExtension,这个方法中做两个事情

  1. 获得一个自适应扩展点实例
  2. 实现依赖注入
private T createAdaptiveExtension() { 
        try { 
            return injectExtension((T) getAdaptiveExtensionClass().newInstance()); 
        } catch (Exception e) { 
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); 
        } 
}

  然后进入 ExtensionLoader#getAdaptiveExtensionClass :

private Class<?> getAdaptiveExtensionClass() { 
        getExtensionClasses(); 
        if (cachedAdaptiveClass != null) { 
            return cachedAdaptiveClass; 
        } 
        return cachedAdaptiveClass = createAdaptiveExtensionClass(); 
}

  getExtensionClasses()这个方法在前面讲过了,会加载当前传入的类型的所有扩展点,保存在一个hashmap中 这里有一个判断逻辑,如果 cachedApdaptiveClas!=null ,直接返回这个cachedAdaptiveClass,这个cachedAdaptiveClass是一个什么?

  cachedAdaptiveClass是在 加载解析/META-INF/dubbo下的扩展点的时候加载进来的。在加载完之后如果这个类有@Adaptive标识,则会赋值赋值而给cachedAdaptiveClass

  createAdaptiveExtensionClass:动态生成字节码,然后进行动态加载。那么这个时候锁返回的class,如果加载的是Protocol.class,应该是Protocol$Adaptive 这个cachedDefaultName实际上就是扩展点接口的@SPI注解对应的名字,如果此时加载的是Protocol.class,那么cachedDefaultName=dubbo

private Class<?> createAdaptiveExtensionClass() { 
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); 
        ClassLoader classLoader = findClassLoader(); 
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); 
        return compiler.compile(code, classLoader); 
}

  例如根据  ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 生成的自适应拓展点就是:

package org.apache.dubbo.rpc; 
import org.apache.dubbo.common.extension.ExtensionLoader; 
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol { 
    public void destroy()  { 
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); 
    } 
    public int getDefaultPort()  { 
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); 
    } 
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { 
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); 
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); 
        org.apache.dubbo.common.URL url = arg0.getUrl(); 
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); 
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); 
        return extension.export(arg0); 
    } 
    public java.util.List getServers()  { 
        throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); 
    } 
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException { 
        if (arg1 == null) throw new IllegalArgumentException("url == null"); 
        org.apache.dubbo.common.URL url = arg1; 
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); 
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); 
        return extension.refer(arg0, arg1); 
    } 
}

关于objectFactory:

  在injectExtension这个方法中,我们发现入口出的代码首先判断了objectFactory这个对象是否为空。这个是在哪里初始化的呢?实际上我们在获得ExtensionLoader的时候,就对objectFactory进行了初始化。

private ExtensionLoader(Class<?> type) { 
  this.type = type; 
  objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); 
}

  然后通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()去获得一个自适应的扩展点,进入ExtensionFactory这个接口中,可以看到它是一个扩展点,并且有一个自己实现的自适应扩展点AdaptiveExtensionFactory; 注意:@Adaptive加载到类上表示这是一个自定义的适配器类,表示我们再调用getAdaptiveExtension方法的时候,不需要走上面这么复杂的过程。会直接加载到AdaptiveExtensionFactory。然后在getAdaptiveExtensionClass()方法处有判断,就是上文提到的 cachedAdaptiveClass。

@Adaptive 
public class AdaptiveExtensionFactory implements ExtensionFactory { 
 
    private final List<ExtensionFactory> factories; 
 
    public AdaptiveExtensionFactory() { 
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); 
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); 
        for (String name : loader.getSupportedExtensions()) { 
            list.add(loader.getExtension(name)); 
        } 
        factories = Collections.unmodifiableList(list); 
    } 
 
    @Override 
    public <T> T getExtension(Class<T> type, String name) { 
        for (ExtensionFactory factory : factories) { 
            T extension = factory.getExtension(type, name); 
            if (extension != null) { 
                return extension; 
            } 
        } 
        return null; 
    } 
}

  我们可以看到除了自定义的自适应适配器类以外,还有两个实现类,一个是SPI,一个是Spring,AdaptiveExtensionFactory轮询这2个,从一个中获取到就返回。

激活扩展点:

  自动激活扩展点,有点类似 springboot 的时候用到的 conditional,根据条件进行自动激活。但是这里设计的初衷是,对于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化配置我们的配置工作

  @Activate提供了一些配置来允许我们配置加载条件,比如group过滤,比如key过滤。举个例子,我们可以看看org.apache.dubbo.Filter这个类,它有非常多的实现,比如说CacheFilter,这个缓存过滤器,配置信息如下group表示客户端和和服务端都会加载,value表示url中有cache_key的时候

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY) 
public class CacheFilter implements Filter { 
}

  通过下面这段代码,演示关于Filter的自动激活扩展点的效果。没有添加“红色部分的代码”时,list的结果是10,添加之后list的结果是11. 会自动把cacheFilter加载进来

ExtensionLoader<Filter> loader = ExtensionLoader.getExtensionLoader(Filter.class); 
URL url = new URL("", "", 0); 
url = url.addParameter("cache", "cache"); 
List<Filter> filters = loader.getActivateExtension(url, "cache"); 
System.out.println(filters.size());

服务发布注册的入口:

@DubboComponentScan:

  在我们使用 Dubbo 构建服务的时候,我们通常需要配置一个 Dubbo Service 的扫描路径。那么这个注解应该是比较关键的。我们进入到这个注解的源码来开始揭开Dubbo的神秘面纱。

@Target(ElementType.TYPE) 
@Retention(RetentionPolicy.RUNTIME) 
@Documented 
@Import(DubboComponentScanRegistrar.class) 
public @interface DubboComponentScan { 
    //....... 
}

  我们看到了熟悉的东西:@Import(DubboComponentScanRegistrar.class) ,跟进去我们发现该类 实现了  ImportBeanDefinitionRegistrar 接口,该接口提供了类的注册的回调。也就是说DubboComponentScanRegistrar 最后会调用 registerBeanDefinitions 方法:

@Override 
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { 
  // 获取到元数据中配置的扫描路径,可以是多个,所以这里是集合 
  Set<String> packagesToScan = getPackagesToScan(importingClassMetadata); 
  // 注册指定的bean 
  registerServiceAnnotationBeanPostProcessor(packagesToScan, registry); 
  // 注册通用的bean 
  // @since 2.7.6 Register the common beans 
  registerCommonBeans(registry); 
}

  DubboComponentScanRegistrar#getPackagesToScan 这个方法中就是获取 DubboComponentScan 配置的参数,进行组装返回。

  主要关注 DubboComponentScanRegistrar#registerServiceAnnotationBeanPostProcessor 方法:

private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) { 
     // 构建一个rootBeanDefinition 
        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class); 
     // 将前面组装的扫描路径作为一个属性放到 ServiceAnnotationBeanPostProcessor 中 
        builder.addConstructorArgValue(packagesToScan); 
        builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); 
        AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); 
       //注册该Bean,毋庸置疑,这个Bean 就是 ServiceAnnotationBeanPostProcessor 
        BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry); 
}

  可以看到,ServiceAnnotationBeanPostProcessor 被标记了过时,后续可能会有点变化。我们先来看一下 ServiceAnnotationBeanPostProcessor 的类图  :

  从类图可以看出,在该Bean初始化前后,会调用好几个回调方法,其中 BeanDefinitionRegistryPostProcessor 就是Bean 注册后会调用一个 postProcessBeanDefinitionRegistry 方法,该方法在其父类中:

@Override 
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { 
      // 注册一个监听器,这个是很关键的,等等需要去看这个类 
        // @since 2.7.5 
        registerBeans(registry, DubboBootstrapApplicationListener.class); 
      // 获取到那个扫描路径 
        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); 
 
        if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) { 
       // 进行扫描 DubboService 进行注入 
            registerServiceBeans(resolvedPackagesToScan, registry); 
        } else { 
            if (logger.isWarnEnabled()) { 
                logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!"); 
            } 
        } 
}

    然后我们重点看 ServiceClassPostProcessor#registerServiceBeans

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) { 
     // 注册一个扫描器 
        DubboClassPathBeanDefinitionScanner scanner = 
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader); 
     // Bean 名字解析相关 
        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); 
 
        scanner.setBeanNameGenerator(beanNameGenerator); 
     // 通过注解过滤 
        // refactor @since 2.7.7 
        serviceAnnotationTypes.forEach(annotationType -> { 
            scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType)); 
        }); 
     // 循环遍历我们配置的扫描路径 
        for (String packageToScan : packagesToScan) { 
        // 扫描 
            // Registers @Service Bean first 
            scanner.scan(packageToScan); 
       // 拼装 
            // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not. 
            Set<BeanDefinitionHolder> beanDefinitionHolders = 
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); 
 
            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) { 
          // 遍历拼装好的 BeanDefinitionHolder 
                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { 
            //注册Bean 
                    registerServiceBean(beanDefinitionHolder, registry, scanner); 
                } 
                    //....... 
            } else { 
            // ....... 
            } 
        } 
}

  来看一下注解过滤中的serviceAnnotationTypes ,其实一目了然,DubboService 是新版的修改,避免与 Spring的 Service注解重名,org.apache.dubbo.config.annotation.Service 是兼容老版本,com.alibaba.dubbo.config.annotation.Service 也是为了兼容。

private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList( 
            // @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007 
            DubboService.class, 
            // @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service 
            Service.class, 
            // @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330 
            com.alibaba.dubbo.config.annotation.Service.class 
);

  然后我们进入主线逻辑 ServiceClassPostProcessor#registerServiceBean

private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry, 
                                     DubboClassPathBeanDefinitionScanner scanner) { 
     // 获取到需要注册的Dubbo Service 的 bean class 
        Class<?> beanClass = resolveClass(beanDefinitionHolder); 
     // 获取都 Dubbo Service 的 注解元数据 
        Annotation service = findServiceAnnotation(beanClass); 
 
        /** 
         * The {@link AnnotationAttributes} of @Service annotation 
      * 获取到我们注解上面配置的参数信息 
         */ 
        AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false); 
      // 获取该实现的接口 
        Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass); 
     // 获取实现类 类名 
        String annotatedServiceBeanName = beanDefinitionHolder.getBeanName(); 
     // 该方法主要是构建了一个ServiceBean 
        AbstractBeanDefinition serviceBeanDefinition = 
                buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName); 
 
        // ServiceBean Bean name 
    // 获取类名,比如这里是 ServiceBean:com.wuzz.demo.api.HelloService  
        String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass); 
 
        if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean 
       // 然后调用注册方法 
            registry.registerBeanDefinition(beanName, serviceBeanDefinition); 
          // ...... 
        } else { 
          //....... 
        } 
}

  源码跟到这里,我们应该知道,这里注册了一个 ServiceBean ,所以跟进这个类的构造,但是发现什么都没做,但是这个时候我们需要想起来,之前 ServiceClassPostProcessor#postProcessBeanDefinitionRegistry 方法内初始化了一个监听器 DubboBootstrapApplicationListener,我们看一下该监听器监听了什么:

@Override 
public void onApplicationContextEvent(ApplicationContextEvent event) { 
        if (event instanceof ContextRefreshedEvent) { 
            onContextRefreshedEvent((ContextRefreshedEvent) event); 
        } else if (event instanceof ContextClosedEvent) { 
            onContextClosedEvent((ContextClosedEvent) event); 
        } 
}

  从这个代码可以看出,这个监听器必然执行,在 Spring 上下文刷新完毕的时候走 DubboBootstrapApplicationListener#onContextRefreshedEvent

private void onContextRefreshedEvent(ContextRefreshedEvent event) { 
        dubboBootstrap.start(); 
}

  终于看到了曙光,原来 Dubbo 的初始化入口在这里。附上这个流程的流程图:

服务发布源码分析:

  通过上面的分析,我们知道了服务得发布入口在 DubboBootstrap#start:

public DubboBootstrap start() { 
     // 原子操作,避免并发问题 
        if (started.compareAndSet(false, true)) { 
            ready.set(false); 
            initialize();//初始化 
            if (logger.isInfoEnabled()) { 
                logger.info(NAME + " is starting..."); 
            } 
            // 1. export Dubbo Services 
            exportServices(); // 发布服务 
 
            // Not only provider register 
            if (!isOnlyRegisterProvider() || hasExportedServices()) { 
                // 2. export MetadataService 
                exportMetadataService(); // 发布元数据服务 
                //3. Register the local ServiceInstance if required 
                registerServiceInstance(); // 注册服务实例 
            } 
        // 客户端相关的操作 
            referServices(); 
            if (asyncExportingFutures.size() > 0) { 
                new Thread(() -> { 
                    try { 
                        this.awaitFinish(); 
                    } catch (Exception e) { 
                        logger.warn(NAME + " exportAsync occurred an exception."); 
                    } 
                    ready.set(true); 
                    if (logger.isInfoEnabled()) { 
                        logger.info(NAME + " is ready."); 
                    } 
                }).start(); 
            } else { 
                ready.set(true); 
                if (logger.isInfoEnabled()) { 
                    logger.info(NAME + " is ready."); 
                } 
            } 
            if (logger.isInfoEnabled()) { 
                logger.info(NAME + " has started."); 
            } 
        } 
        return this; 
}

  其中 initialize 方法,就是初始化服务发布的相关配置信息:

private void initialize() { 
        if (!initialized.compareAndSet(false, true)) { 
            return; 
        } 
        // 初始化拓展外部化配置 
        ApplicationModel.initFrameworkExts(); 
        // 如果配置了中心配置,如 dubbo-admin,则进行初始化 
        startConfigCenter(); 
        // 如果有必要,注册到中心配置 
        useRegistryAsConfigCenterIfNecessary(); 
        // 加载远程配置 
        loadRemoteConfigs(); 
        // 检查全局配置 
        checkGlobalConfigs(); 
        // 初始化元数据服务 
        initMetadataService(); 
        // 初始化事件监听器 
        initEventListener(); 
 
        if (logger.isInfoEnabled()) { 
            logger.info(NAME + " has been initialized!"); 
        } 
    }

  目前该初始化流程不影响我们继续看服务的发布流程,所以我们这里直接进入 DubboBootstrap#exportServices

private void exportServices() { 
     // 遍历我们需要发布的服务实现类,进行发布 
        configManager.getServices().forEach(sc -> { 
            // TODO, compatible with ServiceConfig.export() 
        // 这里就是之前将我们需要发布的 DubboService 包装成 ServiceBean 
       // 而ServiceBean 是 ServiceConfig 的子类 
            ServiceConfig serviceConfig = (ServiceConfig) sc; 
            serviceConfig.setBootstrap(this); 
       // 异步发布? 
            if (exportAsync) {//调用线程池+Futrue 发布 
                ExecutorService executor = executorRepository.getServiceExporterExecutor(); 
                Future<?> future = executor.submit(() -> { 
                    sc.export(); 
                    exportedServices.add(sc); 
                }); 
                asyncExportingFutures.add(future); 
            } else {// 同步发布 
                sc.export(); 
                exportedServices.add(sc);// 发布完添加到发布服务的集合中 
            } 
        }); 
}

  无论同步/异步 发布,均会走到 ServiceConfig#export 方法中:

public synchronized void export() { 
     // 是否需要发布 
        if (!shouldExport()) { 
            return; 
        } 
     // 检查 bootstrap是否初始化 
        if (bootstrap == null) { 
            bootstrap = DubboBootstrap.getInstance(); 
            bootstrap.init(); 
        } 
     // 检查相关配置 
        checkAndUpdateSubConfigs(); 
     // 初始化元数据 
        //init serviceMetadata 
        serviceMetadata.setVersion(version); 
        serviceMetadata.setGroup(group); 
        serviceMetadata.setDefaultGroup(group); 
        serviceMetadata.setServiceType(getInterfaceClass()); 
        serviceMetadata.setServiceInterfaceName(getInterface()); 
        serviceMetadata.setTarget(getRef()); 
      // 是否延迟发布 
        if (shouldDelay()) {// 构建一个定时任务 
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); 
        } else { 
       // 直接发布 
            doExport(); 
        } 
        exported(); 
}

  然后进入 ServiceConfig#doExport 这里面没有什么特殊逻辑,转到 ServiceConfig#doExportUrls

private void doExportUrls() { 
     // 获取服务仓库,其实就是一个缓存 
        ServiceRepository repository = ApplicationModel.getServiceRepository(); 
     // 添加 
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass()); 
     // 缓存 provider 
        repository.registerProvider( 
                getUniqueServiceName(), 
                ref, 
                serviceDescriptor, 
                this, 
                serviceMetadata 
        ); 
     // 获取配置的注册中心列表 
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); 
     // 遍历协议 
        for (ProtocolConfig protocolConfig : protocols) { 
            String pathKey = URL.buildKey(getContextPath(protocolConfig) 
                    .map(p -> p + "/" + path) 
                    .orElse(path), group, version); 
            // In case user specified path, register service one more time to map it to path. 
            repository.registerService(pathKey, interfaceClass); 
            // TODO, uncomment this line once service key is unified 
            serviceMetadata.setServiceKey(pathKey); 
       // 通过注册中心发布服务 
            doExportUrlsFor1Protocol(protocolConfig, registryURLs); 
        } 
}

  进入 ServiceConfig#doExportUrlsFor1Protocol ,这里代码很长,不过我们要是知道他主要做了什么看起来就轻松了,本质上做了以下几件事

  • 生成url
  • 根据url中配置的协议类型,调用指定协议进行服务的发布
  • 启动服务
  • 注册服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { 
        String name = protocolConfig.getName(); // 获取协议名称 
        if (StringUtils.isEmpty(name)) { 
            name = DUBBO; //默认为dubbo 
        } 
     //准备MAP。用域拼接URL 
        Map<String, String> map = new HashMap<String, String>(); 
        map.put(SIDE_KEY, PROVIDER_SIDE); 
 
        ServiceConfig.appendRuntimeParameters(map); 
        AbstractConfig.appendParameters(map, getMetrics()); 
        AbstractConfig.appendParameters(map, getApplication()); 
        AbstractConfig.appendParameters(map, getModule()); 
        // remove 'default.' prefix for configs from ProviderConfig 
        // appendParameters(map, provider, Constants.DEFAULT_KEY); 
        AbstractConfig.appendParameters(map, provider); 
        AbstractConfig.appendParameters(map, protocolConfig); 
        AbstractConfig.appendParameters(map, this); 
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig(); 
        if (metadataReportConfig != null && metadataReportConfig.isValid()) { 
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE); 
        } 
        if (CollectionUtils.isNotEmpty(getMethods())) { 
            for (MethodConfig method : getMethods()) { 
                AbstractConfig.appendParameters(map, method, method.getName()); 
                String retryKey = method.getName() + ".retry"; 
                if (map.containsKey(retryKey)) { 
                    String retryValue = map.remove(retryKey); 
                    if ("false".equals(retryValue)) { 
                        map.put(method.getName() + ".retries", "0"); 
                    } 
                } 
                List<ArgumentConfig> arguments = method.getArguments(); 
                if (CollectionUtils.isNotEmpty(arguments)) { 
                    for (ArgumentConfig argument : arguments) { 
                        // convert argument type 
                        if (argument.getType() != null && argument.getType().length() > 0) { 
                            Method[] methods = interfaceClass.getMethods(); 
                            // visit all methods 
                            if (methods.length > 0) { 
                                for (int i = 0; i < methods.length; i++) { 
                                    String methodName = methods[i].getName(); 
                                    // target the method, and get its signature 
                                    if (methodName.equals(method.getName())) { 
                                        Class<?>[] argtypes = methods[i].getParameterTypes(); 
                                        // one callback in the method 
                                        if (argument.getIndex() != -1) { 
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { 
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 
                                            } else { 
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 
                                            } 
                                        } else { 
                                            // multiple callbacks in the method 
                                            for (int j = 0; j < argtypes.length; j++) { 
                                                Class<?> argclazz = argtypes[j]; 
                                                if (argclazz.getName().equals(argument.getType())) { 
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j); 
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) { 
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 
                                                    } 
                                                } 
                                            } 
                                        } 
                                    } 
                                } 
                            } 
                        } else if (argument.getIndex() != -1) { 
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 
                        } else { 
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); 
                        } 
 
                    } 
                } 
            } // end of methods for 
        } 
     // 以上代码都是为了组装 URL 
      // 是否泛化接口 
 
        if (ProtocolUtils.isGeneric(generic)) { 
            map.put(GENERIC_KEY, generic); 
            map.put(METHODS_KEY, ANY_VALUE); 
        } else { 
            String revision = Version.getVersion(interfaceClass, version); 
            if (revision != null && revision.length() > 0) { 
                map.put(REVISION_KEY, revision); 
            } 
 
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 
            if (methods.length == 0) { 
                logger.warn("No method found in service interface " + interfaceClass.getName()); 
                map.put(METHODS_KEY, ANY_VALUE); 
            } else { 
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 
            } 
        } 
 
        /** 
         * Here the token value configured by the provider is used to assign the value to ServiceConfig#token 
         */ 
     // token 校验 
 
        if(ConfigUtils.isEmpty(token) && provider != null) { 
            token = provider.getToken(); 
        } 
 
        if (!ConfigUtils.isEmpty(token)) { 
            if (ConfigUtils.isDefault(token)) { 
                map.put(TOKEN_KEY, UUID.randomUUID().toString()); 
            } else { 
                map.put(TOKEN_KEY, token); 
            } 
        } 
        //init serviceMetadata attachments 
        serviceMetadata.getAttachments().putAll(map); 
     // 主机绑定 
        // export service 
        String host = findConfigedHosts(protocolConfig, registryURLs, map); 
        Integer port = findConfigedPorts(protocolConfig, name, map); // 获取端口。默认20880 
     // 组装URL 
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); 
     // 获取拓展点 
        // You can customize Configurator to append extra parameters 
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 
                .hasExtension(url.getProtocol())) { 
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url); 
        } 
 
        String scope = url.getParameter(SCOPE_KEY); 
        // don't export when none is configured 
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) { 
        // 如果scope!=remote, 则先本地暴露服务 
            // export to local if the config is not remote (export to remote only when config is remote) 
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { 
                exportLocal(url); 
            } 
       // 如果scope!=remote, 则先本地暴露服务 
            // export to remote if the config is not local (export to local only when config is local) 
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { 
                if (CollectionUtils.isNotEmpty(registryURLs)) { 
                    for (URL registryURL : registryURLs) { 
                        //if protocol is only injvm ,not register 
               // //如果设置的protocol是injvm,跳过 
 
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { 
                            continue; 
                        } 
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); 
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL); 
                        if (monitorUrl != null) { 
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); 
                        } 
                        if (logger.isInfoEnabled()) { 
                            if (url.getParameter(REGISTER_KEY, true)) { 
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 
                            } else { 
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); 
                            } 
                        } 
               // // 是否采用自定义的动态代理机制,默认是javassist 
                        // For providers, this is used to enable custom proxy to generate invoker 
                        String proxy = url.getParameter(PROXY_KEY); 
                        if (StringUtils.isNotEmpty(proxy)) { 
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy); 
                        } 
               //获得一个自适应扩展点,这个时候返回的Invoker是一个动态代理类。 
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); 
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 
 
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); 
                        exporters.add(exporter); 
                    } 
                } else { 
                    if (logger.isInfoEnabled()) { 
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); 
                    } 
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); 
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 
 
                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); 
                    exporters.add(exporter); 
                } 
                /** 
                 * @since 2.7.0 
                 * ServiceData Store 
                 */ 
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE)); 
                if (metadataService != null) { 
                    metadataService.publishServiceDefinition(url); 
                } 
            } 
        } 
        this.urls.add(url); 
}

  对于上述代码中的 getMethods 里面的一阵循环是什么意思呢?请看下面代码:

@DubboService(loadbalance = "random", // 负载均衡 
        timeout = 50000, //超时 
        cluster = "failsafe", // 服务容错 
        protocol = {"dubbo", "rest"}, //多协议支持 
        registry = {"hangzhou", "wenzhou"}, //多注册中心 
        methods = { 
                @Method(name = "sayHello", timeout = -1), 
                @Method(name = "sayHello", timeout = -1, 
                        arguments = { 
                            @Argument(), 
                            @Argument() 
                        }) 
        } 
)

  其实本质上就是解析 @DubboService 的注解配置元数据,然后来到了 主机绑定,也就是 IP的查找方法上 ServiceConfig#findConfigedHosts:

private String findConfigedHosts(ProtocolConfig protocolConfig, 
                                     List<URL> registryURLs, 
                                     Map<String, String> map) { 
        boolean anyhost = false; 
     // 查找环境变量中是否存在启动参数 [DUBBO_IP_TO_BIND] =服务注册的ip 
        String hostToBind = getValueFromConfig(protocolConfig, DUBBO_IP_TO_BIND); 
        if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) { 
            throw new IllegalArgumentException("Specified invalid bind ip from property:" + DUBBO_IP_TO_BIND + ", value:" + hostToBind); 
        } 
 
        // if bind ip is not found in environment, keep looking up 
        if (StringUtils.isEmpty(hostToBind)) { 
       //读取配置文件, dubbo.protocols.dubbo.host= 服务注册的ip 
            hostToBind = protocolConfig.getHost(); 
            if (provider != null && StringUtils.isEmpty(hostToBind)) { 
                hostToBind = provider.getHost(); 
            } 
            if (isInvalidLocalHost(hostToBind)) { 
                anyhost = true; 
                try { 
                    logger.info("No valid ip found from environment, try to find valid host from DNS."); 
              // 获得本机ip地址 
                    hostToBind = InetAddress.getLocalHost().getHostAddress(); 
                } catch (UnknownHostException e) { 
                    logger.warn(e.getMessage(), e); 
                } 
                if (isInvalidLocalHost(hostToBind)) { 
                    if (CollectionUtils.isNotEmpty(registryURLs)) { 
                        for (URL registryURL : registryURLs) { 
                            if (MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) { 
                                // skip multicast registry since we cannot connect to it via Socket 
                                continue; 
                            } 
                            try (Socket socket = new Socket()) { 
                                SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); 
                                socket.connect(addr, 1000); 
                   //通过Socket去连接注册中心,从而获取本机IP 
                                hostToBind = socket.getLocalAddress().getHostAddress(); 
                                break; 
                            } catch (Exception e) { 
                                logger.warn(e.getMessage(), e); 
                            } 
                        } 
                    } 
                    if (isInvalidLocalHost(hostToBind)) { 
               //会轮询本机的网卡,直到找到合适的IP地址 
                        hostToBind = getLocalHost(); 
                    } 
                } 
            } 
        } 
 
        map.put(BIND_IP_KEY, hostToBind); 
     //上面获取到的ip地址是bindip,如果需要作为服务注册中心的ip, DUBBO_IP_TO_REGISTRY -dDUBBO_IP_TO_REGISTRY=ip 
        // registry ip is not used for bind ip by default 
        String hostToRegistry = getValueFromConfig(protocolConfig, DUBBO_IP_TO_REGISTRY); 
        if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) { 
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); 
        } else if (StringUtils.isEmpty(hostToRegistry)) { 
            // bind ip is used as registry ip by default 
            hostToRegistry = hostToBind; 
        } 
 
        map.put(ANYHOST_KEY, String.valueOf(anyhost)); 
 
        return hostToRegistry; 
}

  总之就是直到找到一个合法的主机地址为止。然后获取到端口。将map 配置信息集合、IP、Port 传入,构造一个 URL

dubbo://192.168.1.1:20880/com.wuzz.demo.api.HelloService?accepts=0&anyhost=true&application=springboot-dubbo&bind.ip=192.168.1.1

&bind.port=20880&cluster=failsafe&connections=0&deprecated=false&dubbo=2.0.2&dynamic=true&executes=0&generic=false

&interface=com.wuzz.demo.api.HelloService&iothreads=5&methods=sayHello&pid=13496&qos.enable=false&queues=0&release=2.7.7&serialization=kryo&side=provider&threadpool=fixed&threads=201&timeout=50000&timestamp=1601354940987

  ServiceConfig#doExportUrlsFor1Protocol 还有很多细节的处理,这里有必要解释以下的就是这个 invoker 对象了:

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); 
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

  其中 PROXY_FACTORY 定义如下:

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

  对应的接口拓展点默认实现为  javassist ,但是会有一个 StubProxyFactoryWrapper 进行包装,但是这里不影响,所以进入 JavassistProxyFactory#getInvoker

@Override 
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' 
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 
        return new AbstractProxyInvoker<T>(proxy, type, url) { 
            @Override 
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable { 
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 
            } 
        }; 
}

  通过 javassist 生成一个代理类,这里持有了对应我们需要发布的服务类的所有信息。然后将该类进行传递,一直到本地服务的发布及服务的注册。而后消费端通过这里的 wrapper.invokeMethod 进行调用。

  我们可以看一下在我这个环境测试的服务下生成的代理方法的代码,需要进入 Wrapper.getWrapper 方法断点获取:

  我们将 c3 拷贝出来:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { 
        com.wuzz.demo.api.HelloService w; 
        try { 
            w = ((com.wuzz.demo.api.HelloService) $1); 
        } catch (Throwable e) { 
            throw new IllegalArgumentException(e); 
        } 
        try { 
            if ("sayHello".equals($2) && $3.length == 0) { 
                return ($w) w.sayHello(); 
            } 
        } catch (Throwable e) { 
            throw new java.lang.reflect.InvocationTargetException(e); 
        } 
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wuzz.demo.api.HelloService."); 
}

  构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod。

  接下来我们来看看服务的远程发布 

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

  这个 PROTOCOL 的实例化,跟我们上面分析SPI之自适应拓展点一摸一样,所以这里得到的对象是  ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))。但是需要明白的是,Dubbo 基于URL 驱动,那么这个时候我们需要知道的是URL中携带的协议是什么,这样我们才能够找到对应的拓展点

  我们发现这里已经被替换成了 registry 协议,那么此刻应该走到 Protocol$Adaptive 的动态适配器类中,而其中最为关键的代码如下:

String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

  然后通过这个 extName ,通过获取指定名称的拓展点,找到对应的实现,那么这里的 registry 对应的就是 org.apache.dubbo.registry.integration.RegistryProtocol,但是Protocol 有包装类,那么最后的对象应该是  ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(RegistryProtocol)))

  这里的三个包装类都会判断URL是不是 registry 协议,如果是直接进入下个调用链,当前场景正是 registry 。最终调用 RegistryProtocol#export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
     // 通过URL里面的 registry 属性对应的值获取的注册地址,配置了zookeeper 则这里就是 zookeeper://192.168.1.101:2181/..... 
        URL registryUrl = getRegistryUrl(originInvoker); 
        // url to export locally 
     // 发布的服务地址,当前情况下是dubbo协议 则这里就是 
     // dubbo://192.168.1.1:20880/....... 
        URL providerUrl = getProviderUrl(originInvoker); 
 
        // Subscribe the override data 
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call 
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the 
        //  subscription information to cover. 
      // 修改URL ,这里设置成 provider://192.168.1.1:20880/....... 
 
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); 
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
     // 结合配置相关重写 URL 
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); 
        //export invoker 
     // 启动 Netty 并且发布本地服务。 
 
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); 
 
        // url to registry 
     // 获取注册实例,这里如果配置了zookeeper ,则返回 ZookeeperRegistry 
 
        final Registry registry = getRegistry(originInvoker); 
       // dubbo://..... 
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); 
 
        // decide if we need to delay publish 
        boolean register = providerUrl.getParameter(REGISTER_KEY, true); 
        if (register) { 
        // 注册服务, 
            register(registryUrl, registeredProviderUrl); 
        } 
 
        // register stated url on provider model 
        registerStatedUrl(registryUrl, registeredProviderUrl, register); 
 
        // Deprecated! Subscribe to override rules in 2.6.x or before.
     // //设置注册中心的订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export
     // //保证每次export都返回一个新的exporter实例
return new DestroyableExporter<>(exporter); }

  然后走服务的发布 RegistryProtocol#doLocalExport

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { 
        String key = getCacheKey(originInvoker); 
 
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { 
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); 
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); 
        }); 
}

  其中 providerUrl 是dubbo:// 协议开头的地址URL,正如之前所说,Dubbo基于URL驱动,那么此刻  protocol  是 Protocol$Adaptive,所以此刻 protocol.export(invokerDelegate) 会走 DubboProtocol#export ,需要注意的是,这里会进行包装 ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))

  我们直接进入 DubboProtocol#export 

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 
        URL url = invoker.getUrl(); 
 
        // export service. 
        String key = serviceKey(url); 
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); 
        exporterMap.put(key, exporter); 
 
        //export an stub service for dispatching event 
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); 
     //是否是本地存根事件 
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); 
     //是否配置了参数回调机制 
        if (isStubSupportEvent && !isCallbackservice) { 
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); 
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) { 
                if (logger.isWarnEnabled()) { 
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + 
                            "], has set stubproxy support event ,but no stub methods founded.")); 
                } 
 
            } 
        } 
 
        openServer(url); 
        optimizeSerialization(url); 
 
        return exporter; 
}

  openServer: 往下看这个过程,进入到openServer(),从名字来看它是用来开启一个服务。去开启一个服务,并且放入到缓存中(在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例)

private void openServer(URL url) { 
  // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例 
  String key = url.getAddress(); 
  ////client 也可以暴露一个只有server可以调用的服务 
  boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); 
  if (isServer) { 
    //是否在serverMap中缓存了 
    ExchangeServer server = serverMap.get(key); 
    if (server == null) { 
      synchronized (this) { 
        server = serverMap.get(key); 
        if (server == null) { 
          // 创建服务器实例 
          serverMap.put(key, createServer(url)); 
       } 
     } 
   } else { 
      // 服务器已创建,则根据 url 中的配置重置服务器 
      server.reset(url); 
   } 
 } 
}

  创建服务:createServer

private ProtocolServer createServer(URL url) { 
//组装url,在url中添加心跳时间、编解码参数 
    url = URLBuilder.from(url) 
        // 当服务关闭以后,发送一个只读的事件,默认是开启状态 
     .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, 
     Boolean.TRUE.toString()) 
        // 启动心跳配置 
       .addParameterIfAbsent(Constants.HEARTBEAT_KEY, 
     String.valueOf(Constants.DEFAULT_HEARTBEAT)) 
       .addParameter(Constants.CODEC_KEY, DubboCodec.NAME) 
       .build(); 
  String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); 
  //通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常 
  if (str != null && str.length() > 0 && 
    !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 
        throw new RpcException("Unsupported server type: " + str + ", url: " + 
        url); 
 } 
//创建ExchangeServer. 
  ExchangeServer server; 
  try { 
    server = Exchangers.bind(url, requestHandler); 
 } catch (RemotingException e) { 
    throw new RpcException("Fail to start server(url: " + url + ") " + 
    e.getMessage(), e); 
 } 
  str = url.getParameter(CLIENT_KEY); 
  if (str != null && str.length() > 0) { 
    Set<String> supportedTypes = 
    ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 
    if (!supportedTypes.contains(str)) { 
      throw new RpcException("Unsupported client type: " + str); 
   } 
 } 
  return new DubboProtocolServer(server); 
}

  Exchangers.bind :

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws 
RemotingException { 
  if (url == null) { 
    throw new IllegalArgumentException("url == null"); 
 } 
  if (handler == null) { 
    throw new IllegalArgumentException("handler == null"); 
 } 
 //获取 Exchanger,默认为 HeaderExchanger。 
  //调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例 
  url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 
  return getExchanger(url).bind(url, handler); 
} 
public static Exchanger getExchanger(URL url) { 
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); 
    return getExchanger(type); 
} 
// 拓展点,默认为 header 
public static Exchanger getExchanger(String type) { 
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); 
}

  然后根据拓展点进入 HeaderExchanger#bind

  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind :发布服务
  • new HeaderExchangeServer:服务端消费的调用链

  目前我们只需要关心transporters.bind方法即可

@Override 
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 
}

  进入 Transporters#bind 发布远程服务

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { 
        if (url == null) { 
            throw new IllegalArgumentException("url == null"); 
        } 
        if (handlers == null || handlers.length == 0) { 
            throw new IllegalArgumentException("handlers == null"); 
        } 
        ChannelHandler handler; 
        if (handlers.length == 1) { 
            handler = handlers[0]; 
        } else { 
            handler = new ChannelHandlerDispatcher(handlers); 
        } 
        return getTransporter().bind(url, handler); 
} 
// @SPI("netty") 默认为最新的 netty4 实现 
public static Transporter getTransporter() { 
  return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); 
}

  走到是最新的netty4版本的 netty进行服务发布:

  进入到 org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

@Override 
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { 
    return new NettyServer(url, handler); 
}

  然后创建了一个 NettyServer 实例, 里面有个 doOpen 方法用域开启服务。接下去就是启动Netty服务了。想进一步了解Netty 机制的小伙伴可以参考:IT虾米网

  值得注意的是,这里构造了一个请求处理链,Netty接受到客户端请求的时候会走这个处理链:MultiMessageHandler ->HeartbeatHandle ->AllChannelHandler ->DecodeHandler ->HeaderExchangeHandler->ExchangeHandlerAdapter

服务注册源码分析: 

  服务在本地发布完成,那么接下去要进入服务的注册阶段,相关代码在 org.apache.dubbo.registry.integration.RegistryProtocol#export 类中:

// url to registry 
final Registry registry = getRegistry(originInvoker); 
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); 
 
// decide if we need to delay publish 
boolean register = providerUrl.getParameter(REGISTER_KEY, true); 
if (register) { 
    register(registryUrl, registeredProviderUrl); 
}

  其中 getRegistry 主要是获取到一个注册器的实现,代码如下:

protected Registry getRegistry(final Invoker<?> originInvoker) { 
    // 这个时候 Url为  zookeeper://开头 
    URL registryUrl = getRegistryUrl(originInvoker); 
    // 所以这里 RegistryFactory$Adapter 获取到的应该为 zookeeper的实现 
    return registryFactory.getRegistry(registryUrl); 
}

  然后这里应该进入 ZookeeperRegistryFactory#getRegistry ,但是 RegistryFactory 拓展点存在包装类 RegistryFactoryWrapper ,所以这里先走 RegistryFactoryWrapper#getRegistry ,然后走  ZookeeperRegistryFactory#getRegistry 。由于本类未实现,则进入父类 AbstractRegistryFactory#getRegistry ,然后调用 ZookeeperRegistryFactory#createRegistry,返回一个 ListenerRegistryWrapper(ZookeeperRegistry)

  然后进入服务注册  RegistryProtocol#register

private void register(URL registryUrl, URL registeredProviderUrl) { 
    //zookeeper://192.168.1.101:2181/........ 
    Registry registry = registryFactory.getRegistry(registryUrl); 
    registry.register(registeredProviderUrl); 
}

  这里跟上面一样的逻辑,然后一定要走 ZookeeperRegistry#register ,但是本类中也未实现 ,走父类 FailbackRegistry#register

public void register(URL url) { 
        if (!acceptable(url)) { 
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); 
            return; 
        } 
     // 调用父类注册,缓存添加 
        super.register(url); 
        removeFailedRegistered(url); 
        removeFailedUnregistered(url); 
        try { 
            // Sending a registration request to the server side 
          // 注册 
            doRegister(url); 
        } catch (Exception e) { 
            Throwable t = e; 
 
            // If the startup detection is opened, the Exception is thrown directly. 
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 
                    && url.getParameter(Constants.CHECK_KEY, true) 
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol()); 
            boolean skipFailback = t instanceof SkipFailbackWrapperException; 
         // 是否启动检查 
            if (check || skipFailback) { 
                if (skipFailback) { 
                    t = t.getCause(); 
                } 
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 
            } else { 
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 
            } 
        // 失败重试 
            // Record a failed registration request to a failed list, retry regularly 
            addFailedRegistered(url); 
        } 
 }

  然后进入 ZookeeperRegistry#doRegister

@Override 
public void doRegister(URL url) { 
        try { 
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); 
        } catch (Throwable e) { 
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 
        } 
}

  有经验的开发人员看到这个就不用解释了。服务到此注册完毕,ZK 服务端即出现服务注册相关的信息。最后附上服务发布、注册的主要流程图:


发布评论
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

org.apache.dubbo 2.7.x 再聚首知识解答
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。