Dubbo3原始碼篇1-服務發現(本地和遠端暴露)原始碼分析

語言: CN / TW / HK

歡迎大家關注 github.com/hsfxuebao ,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

1. Dubbo與Spring整合

image.png 簡單看下DubboBeanDefinitionParser.parse()方法: ```java /* * @param element 當前要解析的標籤 * @param parserContext 解析上下文,其中包含了當前配置檔案中所有其它標籤的解析資訊 * @param beanClass 當前標籤解析出的內容要封裝的類,其中就是存放的從標籤中讀出的屬性。讀到了什麼就記錄下什麼 * @param registered 解析出的標籤是否需要註解到註冊中心 * @return 解析物件。將讀取出的資料最終要構建出一個“邏輯”上的物件。例如,構建出一個“註冊中心”物件、“協議”物件等 / @SuppressWarnings("unchecked") private static RootBeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean registered) { // ----------------------- 1 建立並初始化解析物件 --------------- RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); // 指定該bean不進行延遲初始化 beanDefinition.setLazyInit(false);

    // ----------------------- 2 解決id問題: 為空與重複問題 ---------------
    // config id
    // 獲取當前解析標籤的id屬性
    String configId = resolveAttribute(element, "id", parserContext);
    // 若id不空,則寫入到beanDefinition
    if (StringUtils.isNotEmpty(configId)) {
        beanDefinition.getPropertyValues().addPropertyValue("id", configId);
    }

    // get id from name
    // 若id屬性為空,則獲取name屬性賦值給id
    if (StringUtils.isEmpty(configId)) {
        configId = resolveAttribute(element, "name", parserContext);
    }
    // 此時若id不空,則處理{}點位符
    if (StringUtils.isNotEmpty(configId)) {
        configId = resolvePlaceholders(configId, parserContext);
    }

    // bean名稱取id屬性值
    String beanName = configId;
    // 若此時beanName為空,即id屬性仍為空
    if (StringUtils.isEmpty(beanName)) {
        // generate bean name
        // 取beanClass的類名
        String prefix = beanClass.getName();
        int counter = 0;
        // beanName為類名與數字的拼接
        beanName = prefix + "#" + counter;
        // 檢視該beanName是否重複。若重複,則讓數字增一
        while (parserContext.getRegistry().containsBeanDefinition(beanName)) {
            beanName = prefix + "#" + (counter++);
        }
    }
    // 此時beanName一定不空,且不重複
    beanDefinition.setAttribute(BEAN_NAME, beanName);

    // ----------------------- 3 對特殊標籤的處理 ---------------
    // 對<dubbo:protocol/>標籤的處理
    if (ProtocolConfig.class.equals(beanClass)) {

// for (String name : parserContext.getRegistry().getBeanDefinitionNames()) { // BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name); // PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol"); // if (property != null) { // Object value = property.getValue(); // if (value instanceof ProtocolConfig && beanName.equals(((ProtocolConfig) value).getName())) { // definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(beanName)); // } // } // }

    // 對<dubbo:service/>標籤的處理
    } else if (ServiceBean.class.equals(beanClass)) {
        String className = resolveAttribute(element, "class", parserContext);
        if (StringUtils.isNotEmpty(className)) {
            RootBeanDefinition classDefinition = new RootBeanDefinition();
            classDefinition.setBeanClass(ReflectUtils.forName(className));
            classDefinition.setLazyInit(false);
            parseProperties(element.getChildNodes(), classDefinition, parserContext);
            beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, beanName + "Impl"));
        }
    }


    // ----------------------- 4 對普通標籤的普適性處理 ---------------
    Map<String, Class> beanPropTypeMap = beanPropsCache.get(beanClass.getName());
    if (beanPropTypeMap == null) {
        beanPropTypeMap = new HashMap<>();
        beanPropsCache.put(beanClass.getName(), beanPropTypeMap);
        if (ReferenceBean.class.equals(beanClass)) {
            //extract bean props from ReferenceConfig
            getPropertyMap(ReferenceConfig.class, beanPropTypeMap);
        } else {
            getPropertyMap(beanClass, beanPropTypeMap);
        }
    }

    ...

    return beanDefinition;
}

也就是說 dubbo的配置檔案每一個標籤對應一個類,每個標籤可以配置對應類的所有屬性值:java @Override public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class)); ... } ```

2. 核心介面

2.1 Invocation

其封裝了遠端呼叫的具體資訊。 image.png

2.2 Invoker

其是提供者 provider 的代理物件,在程式碼中就代表提供者。特別是在消費者進行遠端調 用時,其通過服務路由、負載均衡、叢集容錯等機制要查詢的就是 Invoker。找到了其需要 的 Invoker 例項就可以進行遠端呼叫了

image.png

2.3 Exporter

服務暴露物件。其包含一個很重要的方法 getInvoker(),用於獲取當前服務暴露例項所 包含的遠端呼叫例項 Invoker,即可以進行的遠端呼叫。

而 unexport()方法會使服務不進行服務暴露。

image.png

2.4 Directory

Directory 中包含一個很重要的方法 list(),其返回結果為一個 List<Invoker>。其實簡單來 說,可以將 Directory 理解為一個動態的 Invoker 列表。

image.png

3. 啟動入口DubboBootstrapApplicationListener

```java // 當Spring容器建立時會觸發該方法的執行 @Override public void onApplicationEvent(ApplicationEvent event) { if (isOriginalEventSource(event)) { if (event instanceof DubboAnnotationInitedEvent) { // This event will be notified at AbstractApplicationContext.registerListeners(), // init dubbo config beans before spring singleton beans applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);

        // All infrastructure config beans are loaded, initialize dubbo here
        DubboBootstrap.getInstance().initialize();
    } else if (event instanceof ApplicationContextEvent) {
        this.onApplicationContextEvent((ApplicationContextEvent) event);
    }
}

}

private void onApplicationContextEvent(ApplicationContextEvent event) { if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) { DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext(); }

// 容器重新整理事件(容器建立、重新整理會引發該事件)
if (event instanceof ContextRefreshedEvent) {
    onContextRefreshedEvent((ContextRefreshedEvent) event);
// 容器關閉事件
} else if (event instanceof ContextClosedEvent) {
    onContextClosedEvent((ContextClosedEvent) event);
}

}

private void onContextClosedEvent(ContextClosedEvent event) { if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) { // will call dubboBootstrap.stop() through shutdown callback. DubboShutdownHook.getDubboShutdownHook().run(); } } 接下來執行DubboBootstrap.start()方法:java public DubboBootstrap start() { if (started.compareAndSet(false, true)) { startup.set(false); shutdown.set(false); awaited.set(false);

    initialize();
    if (logger.isInfoEnabled()) {
        logger.info(NAME + " is starting...");
    }
    // 1. export Dubbo Services
    // 服務暴露
    exportServices();

    // If register consumer instance or has exported services
    if (isRegisterConsumerInstance() || hasExportedServices()) {
        // 2. export MetadataService
        exportMetadataService();
        // 3. Register the local ServiceInstance if required
        registerServiceInstance();
    }

    // 服務引用
    referServices();
    ... 
}
return this;

} ``` 服務暴露和服務引用的入口都是在這裡。

4. ServiceConfig類

我們先來看看ServiceConfig類,ServiceConfig可以說是每暴露一個介面就會有一個ServiceConfig物件,比如說我現在有2個介面\ 在這裡插入圖片描述

然後有2個對應的實現類,那麼我們在服務暴露的時候就會有2個ServiceConfig例項,其實我們看它的屬性的時候也能猜到 在這裡插入圖片描述\ 我這裡只是截了3個成員,第一個是介面名,第二個是介面的class物件,第三個就是介面的具體實現類。\ 接下來我們具體看看ServiceConfig的成員: ```java //記錄隨機埠的 private static final Map RANDOM_PORT_MAP = new HashMap(); // 延時暴露 executor private static final ScheduledExecutorService DELAY_EXPORT_EXECUTOR = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true)); // 自適應Protocol ,這個就跟變色龍似的,能夠根據具體的引數值變成不同的實現 private static final Protocol PROTOCOL = ExtensionLoader .getExtensionLoader(Protocol.class) // 獲取SPI介面Protocol的extensionLoader例項 .getAdaptiveExtension(); // 使用extensionLoader例項獲取Protocol的自適應類例項

//代理工廠的自適應 private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 是否已經暴露 private transient volatile boolean exported;

//是否需要暴露 private transient volatile boolean unexported;

private DubboBootstrap bootstrap;

private transient volatile AtomicBoolean initialized = new AtomicBoolean(false);

// exporters private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

//介面class protected Class<?> interfaceClass;

// 具體實現類 protected T ref;

protected String path;

// 關於provider的配置 protected ProviderConfig provider;

protected String providerIds;

// 範化 protected volatile String generic; ```

繼續 DubboBootstrap.exportService(): ```java private void exportServices() { // 遍歷當前配置檔案中的所有標籤 for (ServiceConfigBase sc : configManager.getServices()) { // TODO, compatible with ServiceConfig.export() ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc; serviceConfig.setBootstrap(this); if (!serviceConfig.isRefreshed()) { serviceConfig.refresh(); }

    // 判斷是否是非同步暴露
    if (sc.shouldExportAsync()) {
        ExecutorService executor = executorRepository.getServiceExportExecutor();
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                if (!sc.isExported()) {
                    sc.export();
                    exportedServices.add(sc);
                }
            } catch (Throwable t) {
                logger.error("export async catch error : " + t.getMessage(), t);
            }
        }, executor);

        asyncExportingFutures.add(future);
    } else {  // 處理同步暴露情況
        if (!sc.isExported()) {  // 若尚未暴露
            sc.export();  // 服務暴露
            exportedServices.add(sc);
        }
    }
}

}

public Boolean shouldExportAsync() { // 獲取中的export-async屬性 Boolean shouldExportAsync = getExportAsync(); if (shouldExportAsync == null) { // 獲取標籤中的export-async屬性 shouldExportAsync = provider != null && provider.getExportAsync() != null && provider.getExportAsync(); }

return shouldExportAsync;

} 接下來ServiceConfig.export():java public synchronized void export() { // 若的export屬性為true,且當前服務尚未暴露 if (this.shouldExport() && !this.exported) { this.init();

    // check bootstrap state
    if (!bootstrap.isInitialized()) {
        throw new IllegalStateException("DubboBootstrap is not initialized");
    }

    if (!this.isRefreshed()) {
        this.refresh();
    }

    if (!shouldExport()) {
        return;
    }

    // 判斷是否延遲暴露
    if (shouldDelay()) {
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        // 服務暴露
        doExport();
    }

    if (this.bootstrap.getTakeoverMode() == BootstrapTakeoverMode.AUTO) {
        this.bootstrap.start();
    }
}

} 我們可以看到這個方法主要做的工作是服務暴露延遲的,如果delay不是null && delay>0 然後給ScheduledExecutorService然後 delay ms後再進行服務暴露,我們要想使用延遲暴露功能, - 可以在@Service註解中新增delay 屬性。 @Service(delay =1000 ) ```

  • 也可以在xml中新增 xml <dubbo:provider delay="100"/> <dubbo:service interface="xxx.xxx.xxx" delay="1000"></dubbo:service> 我們再接著往下看不延遲暴露走doExport()方法, ```java protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; }

    // 若的path屬性為空,則取interface屬性值 // URL的格式 protocol://ip:port/path?a=b&b=c&... if (StringUtils.isEmpty(path)) { path = interfaceName; } // 假設有3個註冊中心,2個服務暴露協議 // 為每個服務暴露協議在每個註冊中心中進行暴露 doExportUrls(); exported(); }

private void doExportUrls() { ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass()); repository.registerProvider( getUniqueServiceName(), ref, serviceDescriptor, this, serviceMetadata );

// 獲取所有註冊中心的【標準化地址URL】與【相容性地址URL】
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

// 遍歷所有服務暴露協議(<dubbo:protocol/>)
for (ProtocolConfig protocolConfig : protocols) {
    String pathKey = URL.buildKey(getContextPath(protocolConfig)
            .map(p -> p + "/" + path)
            .orElse(path), group, version);
    // In case user specified path, register service one more time to map it to path.
    repository.registerService(pathKey, interfaceClass);
    // 使用當前遍歷的服務暴露協議與所有註冊中心配對進行服務暴露
    doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}

} 我們看一下loadRegistries()的程式碼:java public static List loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) { // check && override if necessary List registryList = new ArrayList(); // 獲取標籤 ApplicationConfig application = interfaceConfig.getApplication(); // 獲取標籤 List registries = interfaceConfig.getRegistries();

if (CollectionUtils.isNotEmpty(registries)) {
    // 遍歷所有<dubbo:registry/>標籤
    for (RegistryConfig config : registries) {
        // 獲取<dubbo:registry/>標籤的address屬性
        String address = config.getAddress();
        if (StringUtils.isEmpty(address)) {
            address = ANYHOST_VALUE;
        }

        // 只要address不是N/A,即不是不可用
        if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
            // 建立並初始化一個map,這個map中的值為<dubbo:registry/>標籤及相當標籤中的屬性值
            Map<String, String> map = new HashMap<String, String>();
            // 將<dubbo:application/>標籤屬性寫入map
            AbstractConfig.appendParameters(map, application);
            // 將<dubbo:registry/>標籤屬性寫入map
            AbstractConfig.appendParameters(map, config);
            map.put(PATH_KEY, RegistryService.class.getName());
            AbstractInterfaceConfig.appendRuntimeParameters(map);
            if (!map.containsKey(PROTOCOL_KEY)) {
                map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
            }

            // 構建註冊中心標準URL
            List<URL> urls = UrlUtils.parseURLs(address, map);

            for (URL url : urls) {
                url = URLBuilder.from(url)
                    // 向URL中新增registry屬性,例如registry=zookeeper
                    .addParameter(REGISTRY_KEY, url.getProtocol())
                    // 將URL的協議修改為registry
                    .setProtocol(extractRegistryType(url))
                    .build();
                if ((provider && url.getParameter(REGISTER_KEY, true))
                    || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                    registryList.add(url);
                }
            }
        }
    }
}
// 為每個標準URL生成一個相容URL
return genCompatibleRegistries(registryList, provider);

} ``` 我們可以看到先走了loadRegistries(true); 這個方法,獲取到了一個URL集合,其實這裡就是獲取註冊中心列表,一個URL就是一個註冊中心,會生成一個標準化的URL和相容URL,registry開頭的為標準URL: image.png

接著就是迴圈暴露doExportUrlsFor1Protocol(protocolConfig, registryURLs); ```java private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) { // 構建服務暴露URL要使用的map Map map = buildAttributes(protocolConfig);

//init serviceMetadata attachments
// 元資料註冊
serviceMetadata.getAttachments().putAll(map);

// 構建服務暴露URL
URL url = buildUrl(protocolConfig, registryURLs, map);

// 服務暴露
exportUrl(url, registryURLs);

} - 構建服務暴露URL要使用的map:這個方法首先判斷ProtocolConfig的協議,如果沒有預設設定成dubbo,再往下就是設定引數,比如說side=provider,dubbo=2.2.0,timestamp,pid等等,然後把一些config中的配置塞到map中,接著就是遍歷處理MethodConfig。再接著就是判斷是不是範化呼叫,如果是就把範化的資訊扔到map中,設定methods=*,如果不是範化呼叫,就找到你所有的method,然後將methods=你所有method名拼接起來。 - 註冊元資料 - 構建服務暴露URL: dubbo://192.168.124.7:20881/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&bind.ip=192.168.124.7&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&metadata-type=remote&methods=hello&pid=25778&release=&revision=1.0.0&side=provider&timestamp=1656114630120&version=1.0.0 接下里,我們進入服務暴露的方法:java private void exportUrl(URL url, List registryURLs) { // 獲取的scope屬性 String scope = url.getParameter(SCOPE_KEY);

// 若scope的值不等於none,則進行暴露
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

    // 若scope的值不等於remote,則進行本地暴露
    // export to local if the config is not remote (export to remote only when config is remote)
    if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        // 本地暴露
        exportLocal(url);
    }

    // 若scope的值不等於local,則進行遠端暴露
    // export to remote if the config is not local (export to local only when config is local)
    if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        // 遠端暴露
        url = exportRemote(url, registryURLs);
        MetadataUtils.publishServiceDefinition(url);
    }

}
this.urls.add(url);

} ``` 在這個分為本地暴露和遠端暴露,我們分別都看一下

5. 本地暴露

看一下exportLocal()方法: java private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setProtocol(LOCAL_PROTOCOL) // 將URL的protocol設定為injvm .setHost(LOCALHOST_VALUE) .setPort(0) // URL沒有埠號 .build(); // 本地暴露 doExportUrl(local, false); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); } 首先把URL中的protocol變成injvm,host是127.0.0.1,port是0 。其實這裡是新生成了一個URL,把之前URL裡面的配置搬過來了。接下來: java private void doExportUrl(URL url, boolean withMetaData) { // 構建出invoker Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } // 遠端暴露 Exporter<?> exporter = PROTOCOL.export(invoker); exporters.add(exporter); } 我們先來看下proxyFactory.getInvoker(ref, (Class) interfaceClass, local)這個方法。proxyFactory是我們一個類成員: ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); 獲取了一個自適應的擴充套件實現類。我們看下這個自適應是根據哪個key來找實現類的。 ```java @SPI("javassist") public interface ProxyFactory {

/**
 * create proxy.
 *
 * @param invoker
 * @return proxy
 */
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;

/**
 * create proxy.
 *
 * @param invoker
 * @return proxy
 */
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

/**
 * create invoker.
 *
 * @param <T>
 * @param proxy
 * @param type
 * @param url
 * @return invoker
 */
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

} 咱們用的getInvoker方法,然後會根據proxy這個屬性去咱們的URL中找對應的值,我們現在沒有刻意設定這個proxy屬性的話,就會走預設,也就是@SPI(“javassist”)中的javassist實現類。這塊知識資料dubbo spi裡面的。 我們來看看javassist實現類,也就是JavassistProxyFactory這個類。java public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    // getProxy() 建立代理類proxy的class
    // newInstance() 建立這個class的例項,即代理物件
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

//生成一個invoker的包裝類
/**
 * @param proxy  介面實現類
 * @param type  介面型別class
 * @param url  URL
 * @param <T>  介面型別
 * @return  Invoker
 */
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' 不能解析類名中帶$的
    // 這裡是如果,介面實現類中有$符號,就是用介面型別,沒有$符號,就用實現類的型別
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        /**
         * 進行呼叫
         * @param proxy 實現類
         * @param methodName 方法名
         * @param parameterTypes  引數型別
         * @param arguments  引數
         * @return
         * @throws Throwable
         */
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

} 首先第一行,Wrapper.getWrapper,這個會幫你生成一個Wrapper,其實這個Wrapper會根據你提供的這個型別生成一個獲取你這個類中成員變數的方法,設定成員變數的方法,執行你這個類中方法的方法。\ 我們可以來看下生成的啥樣子:java public class Wrapper$1 {

public static String[] pns;// 欄位名
public static Map pts;//<欄位名,欄位型別>
public static String[] mns;//方法名
public static String[] dmns;//自己方法的名字

public static Class[] mts;//方法引數型別

public String[] getPropertyNames(){ return pns; }
public boolean hasProperty(String n){ return pts.containsKey(n); }


public Class getPropertyType(String n){ return (Class)pts.get(n); }

public String[] getMethodNames(){ return mns; }
public String[] getDeclaredMethodNames(){ return dmns; }




public void setPropertyValue(Object o, String n, Object v){

    com.xuzhaocai.dubbo.provider.IHelloProviderService w;
    try{
        w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
    }catch(Throwable e) {
        throw new IllegalArgumentException(e);
    }
    if( $2.equals("欄位名")){
        w."欄位名"= $3;
        return ;
    }
}


public Object getPropertyValue(Object o, String n){
    com.xuzhaocai.dubbo.provider.IHelloProviderService w;
    try{
        w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
    }catch(Throwable e){
        throw new IllegalArgumentException(e);
    }
    if( $2.equals("欄位名")){
        return ($w) w."欄位名";

    }

    return null;

}

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws InvocationTargetException{

    com.xuzhaocai.dubbo.provider.IHelloProviderService w;
    try{
         w = (( com.xuzhaocai.dubbo.provider.IHelloProviderService)$1);
    }catch(Throwable e){
        throw new IllegalArgumentException(e);
    }
    try{
        if("方法名".equals($2)  && 方法引數個數 == $3.length  &&  $3[1].getName().equals("方法第幾個引數的name")){
            w.方法名(引數);
        }

        if("方法名".equals($2)  && 方法引數個數 == $3.length  &&  $3[1].getName().equals("方法第幾個引數的name")){
            w.方法名(引數);
        }
    } catch(Throwable e) {
        throw new java.lang.reflect.InvocationTargetException(e);
    }

    throw new NoSuchMethodException("Not found method "+$2+" in class 你傳進來那個實現類");

}

} `` 就是針對你這個類生成了3個方法: - setPropertyValue(Object o, String n, Object v) 往o中設定屬性 - Object getPropertyValue(Object o, String n) 從o中取屬性值 -Object invokeMethod(Object o, String n, Class[] p, Object[] v)`執行o的某個方法。

我們接著往下看: java return new AbstractProxyInvoker<T>(proxy, type, url) { /** * 進行呼叫 * @param proxy 實現類 * @param methodName 方法名 * @param parameterTypes 引數型別們 * @param arguments 引數 * @return * @throws Throwable */ @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } 我們來看下這個AbstractProxyInvoker這個抽象類。 ```java public abstract class AbstractProxyInvoker implements Invoker { Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);

private final T proxy;

private final Class<T> type;

private final URL url;

...
/**
 * 呼叫
 * @param invocation 呼叫實體
 * @return 結果實體
 * @throws RpcException
 */
@Override
public Result invoke(Invocation invocation) throws RpcException {
    try {
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        CompletableFuture<Object> future = wrapWithFuture(value);
        CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
            AppResponse result = new AppResponse(invocation);
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            return result;
        });
        return new AsyncRpcResult(appResponseFuture, invocation);
    } catch (InvocationTargetException e) {
        if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
...
//實際呼叫子類實現
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

@Override
public String toString() {
    return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}

} ``` 可以看出AbstractProxyInvoker這個抽象類非常簡單,構造中對T proxy 介面實現類,具體提供服務, Class type 介面型別, URL url ,這三個引數進行檢驗,然後儲存。

實現介面Invoker的invoke(Invocation invocation)方法,實際上還是子類實現doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments)方法。

現在再看JavassistProxyFactory類就清楚了,最終走的是wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

上面這部分是proxyFactory.getInvoker(ref, (Class) interfaceClass, local),接著我們看下 protocol.export(): Exporter<?> exporter = PROTOCOL.export(invoker); 這裡這個protocol也是ServiceConfig的類成員,獲取自適應實現類。我們看下Protocol介面: java @SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy(); } 因為我們URL這裡protocol=injvm,所以回去找對應的實現,也就是InjvmProtocol這個類。 ```java /* * InjvmProtocol / public class InjvmProtocol extends AbstractProtocol implements Protocol {

public static final String NAME = Constants.LOCAL_PROTOCOL;

public static final int DEFAULT_PORT = 0;
private static InjvmProtocol INSTANCE;

public InjvmProtocol() {
    INSTANCE = this;
}

public static InjvmProtocol getInjvmProtocol() {
    if (INSTANCE == null) {
        ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(InjvmProtocol.NAME); // load
    }
    return INSTANCE;
}
static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
    Exporter<?> result = null;

    if (!key.getServiceKey().contains("*")) {
        result = map.get(key.getServiceKey());
    } else {
        if (map != null && !map.isEmpty()) {
            for (Exporter<?> exporter : map.values()) {
                if (UrlUtils.isServiceKeyMatch(key, exporter.getInvoker().getUrl())) {
                    result = exporter;
                    break;
                }
            }
        }
    }

    if (result == null) {
        return null;
    } else if (ProtocolUtils.isGeneric(
            result.getInvoker().getUrl().getParameter(Constants.GENERIC_KEY))) {
        return null;
    } else {
        return result;
    }
}
@Override
public int getDefaultPort() {
    return DEFAULT_PORT;
}
/**
 * 服務暴露
 * @param invoker Service invoker
 * @param <T>
 * @return
 * @throws RpcException
 */
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
...

我把這次無關的方法先去掉了,看下 export方法,然後new InjvmExporter類java /* * InjvmExporter / class InjvmExporter extends AbstractExporter { //service key private final String key; private final Map<String, Exporter<?>> exporterMap; InjvmExporter(Invoker invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; exporterMap.put(key, this); }

@Override
public void unexport() {
    super.unexport();
    exporterMap.remove(key);
}

} ``` 最終是將key=介面全類名,value=this(也就是InjvmExporter物件)put到exporterMap中了。

再回到ServiceConfig的exportLocal方法中。還有最後一句exporters.add(exporter);這裡是將上面生成的InjvmExporter物件快取了起來。

到這裡我們這個服務本地暴露(injvm)就解析完成了。

6. 遠端暴露

我們知道服務暴露分為本地(injvm)與遠端(remote)兩種方式,接下來本篇將解析dubbo的遠端暴露。 // 遠端暴露 url = exportRemote(url, registryURLs); ```java private URL exportRemote(URL url, List registryURLs) {

// 處理有註冊中心的情況
if (CollectionUtils.isNotEmpty(registryURLs)) {
    // 遍歷所有註冊中心URL
    for (URL registryURL : registryURLs) {
        if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
            url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
        }

        //if protocol is only injvm ,not register
        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            continue;
        }

        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
        if (monitorUrl != null) {
            url = url.putAttribute(MONITOR_KEY, monitorUrl);
        }

        // For providers, this is used to enable custom proxy to generate invoker
        String proxy = url.getParameter(PROXY_KEY);
        if (StringUtils.isNotEmpty(proxy)) {
            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
        }

        if (logger.isInfoEnabled()) {
            if (url.getParameter(REGISTER_KEY, true)) {
                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
            } else {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
            }
        }

        // 遠端暴露(僅跟蹤registryURL地址為 registry://... 的地址)
        doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
    }

} else {  // 處理沒有註冊中心的情況

    if (MetadataService.class.getName().equals(url.getServiceInterface())) {
        MetadataUtils.saveMetadataURL(url);
    }

    if (logger.isInfoEnabled()) {
        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
    }

    // 遠端暴露(與上面的暴露相比,僅沒有向註冊中心註冊)
    doExportUrl(url, true);
}
return url;

} 如果有註冊中心,然後遍歷註冊中心,首先是獲取監控中心,將監控中心新增到url中,然後就是將proxy_key屬性放到registryUrl中,其實這個proxy_key 的值是可以設定的,就是告訴dubbo我用什麼來進行生成代理,這個對應的就是dubbo spi 自適應特性。doExportUrl(url, true):java private void doExportUrl(URL url, boolean withMetaData) { // 構建出invoker Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } // 遠端暴露 Exporter<?> exporter = PROTOCOL.export(invoker); exporters.add(exporter); } ``` 這個跟本地呼叫是同一個方法,只是引數不同,遠端暴露第二個引數為true。

DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);,這行就是包裝了一下這個invoker,主要是把原始的配置資訊跟invoker綁在一塊了。可以看下DelegateProviderMetaDataInvoker 這個類。 java public class DelegateProviderMetaDataInvoker<T> implements Invoker { protected final Invoker<T> invoker; private ServiceConfig metadata; public DelegateProviderMetaDataInvoker(Invoker<T> invoker,ServiceConfig metadata) { this.invoker = invoker; this.metadata = metadata; } ... public ServiceConfig getMetadata() { return metadata; } } 我們可以看下這個protocol,這個是自適應擴充套件類,我們從url中獲取protocol的值,咱們invoker裡面包的url是registryURL,然後對應的protocol也就是註冊中心的protocol,RegistryProtocol這個類。但是在建立擴充套件實現類的時候dubbo會給我們setter注入與wrapper包裝,所以我們拿到的RegistryProtocol最終樣子是這樣的:

image.png

Qos —> Filter —> Listener ----> RegistryProtocol.

我自己debug,麼有找到QosProtocolWrapper這個包裝類,疑問?哪位大佬可以解答一下? 接下來我們先看下QosProtocolWrapper,這裡我們只挑與本篇有關的內容 java @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //判斷是registry if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { // 啟動Qos伺服器 startQosServer(invoker.getUrl()); return protocol.export(invoker); } return protocol.export(invoker); }

判斷是registry協議的話,就啟動Qos伺服器,然後呼叫下一個export 方法,也就是ProtocolFilterWrapper的export方法。 java @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { /// registry protocol 就到下一個wapper 包裝物件中就可以了 return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }

如果是registry協議的話就可以進入下一個了,然後不是registry協議就需要繫結一堆filter了,這個我們在說dubbo協議的時候會講到,這裡我們直接進入下一個類ProtocolListenerWrapper的export方法。 ```java @Override public Exporter export(Invoker invoker) throws RpcException { // registry 是否是註冊中心 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // dubbo export dubbo --->暴露服務 生成exporter return new ListenerExporterWrapper(protocol.export(invoker),

            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));//  exporter listener
}

```

這裡也是判斷了一下是否是registry協議,如果是的話就直接進入下一個RegistryProtocol。 ```java @Override public Exporter export(final Invoker originInvoker) throws RpcException { // 獲取註冊中心URL URL registryUrl = getRegistryUrl(originInvoker);

// 獲取服務暴露URL
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);

// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
//  the same service. Because the subscribed is cached key with the name of the service, it causes the
//  subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

// 服務暴露,即將服務的exporter寫入到快取map
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

// url to registry  獲取註冊中心例項
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
    // 註冊到註冊中心
    register(registry, registeredProviderUrl);
}

// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);


exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);

// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);

} 首先獲取註冊中心和服務暴露URL,我們來看下doLocalExport方法:java private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
    return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});

} - 首先獲取一個cacheKey,這個cacheKey其實就是我們在服務暴露之前塞進去的一個export屬性值(這裡是把dynamic,enabled這兩個屬性移除掉了):java private String getCacheKey(final Invoker<?> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); return key; } private URL getProviderUrl(final Invoker<?> originInvoker) { // 獲取URL中的export屬性值,即要暴露的服務的URL Object providerURL = originInvoker.getUrl().getAttribute(EXPORT_KEY); if (!(providerURL instanceof URL)) { throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl().getAddress()); } return (URL) providerURL; } - 接下來就是根據cacheKey去bounds裡面找,如果沒有找到,就說明之前沒有暴露過,我們可以看下bounds這個成員: private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>(); - 如果沒有暴露過的話,就將invoker封裝起來,我們可以看下InvokerDelegete這個靜態類與它的父類InvokerWrapper類java // InvokerDelegete 委託類 將provider 的url 進行快取 public static class InvokerDelegate extends InvokerWrapper { private final Invoker invoker;

    /**
     * @param invoker
     * @param url     invoker.getUrl return this value
     */
    public InvokerDelegate(Invoker<T> invoker, URL url) {
        super(invoker, url);
        this.invoker = invoker;
    }
    // 獲取invoker
    public Invoker<T> getInvoker() {
        // 如果是 委託類的話,就獲取委託類裡面的那個
        if (invoker instanceof InvokerDelegate) {
            return ((InvokerDelegate<T>) invoker).getInvoker();
        } else {
            return invoker;
        }
    }
}
public class InvokerWrapper<T> implements Invoker<T> {
    private final Invoker<T> invoker;
    private final URL url;
        public InvokerWrapper(Invoker<T> invoker, URL url) {this.invoker = invoker;this.url = url;}
    @Override
    public Class<T> getInterface() {return invoker.getInterface();}
    @Override
    public URL getUrl() {return url;}
    @Override
    public boolean isAvailable() {return invoker.isAvailable();}
    @Override
    public Result invoke(Invocation invocation) throws RpcException {return invoker.invoke(invocation);}
    @Override
    public void destroy() {
        invoker.destroy();
    }
}
```
  • 可以看出來這個委託類的作用就是把儲存了url。接下來就是將protocol.export返回的exporter包裝起來快取到bounds中。我們看下ExporterChangeableWrapper這個成員內部包裝類: ```java private class ExporterChangeableWrapper implements Exporter {
    private final Invoker<T> originInvoker;
    private Exporter<T> exporter;
    private URL subscribeUrl;
    private URL registerUrl;
    
    public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
        this.exporter = exporter;
        this.originInvoker = originInvoker;
    }
    
    public Invoker<T> getOriginInvoker() {
        return originInvoker;
    }
    
    @Override
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }
    
    public void setExporter(Exporter<T> exporter) {
        this.exporter = exporter;
    }
    
    @Override
    public void unexport() {
        String key = getCacheKey(this.originInvoker);
        bounds.remove(key);
    
        Registry registry = RegistryProtocol.this.getRegistry(getRegistryUrl(originInvoker));
        ...
    }
    

    ``` - 這個作用就是將原始的invoker與exporter綁在了一起。下面我們就看看這個exporter是怎麼得到的。

6.1 伺服器啟動protocol.export方法

我們講到exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);/// dubbo protocol,我們來看一下RegistryProtocol類的private Protocol protocol 成員,這個成員的值是由dubbo spi 擴充套件技術在例項完成後setter注入進來的,實際上這裡protocol注入的是ExtensionLoader.getExtensionLoader(Protocol).getAdaptiveExtension();是自適應的實現類。我們可以看下自適應的實現類的export方法:

java public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }

然後invokerDelegete 這個委託類getUrl得到的是protocol=dubbo的url,所以 Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo"); 最後獲取的extension是經過dubbo spi setter注入與wrapper包裝後的。

image.png Filter -> Listener----->Qos-----> Dubbo

我這裡沒有debug到Qos包裝類,這塊有後面在看看?

這裡的包裝其實是有順序的,因為在包裝的時候會按照@Active對應的order順序進行逆序排序,可以看下包裝這塊程式碼: image.png

6.1.1 ProtocolFilterWrapper

接下來我們看下ProtocolFilterWrapper類的export方法: java @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {/// registry protocol 就到下一個wapper 包裝物件中就可以了 return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }

這裡不是registry protocol 然後會走下面,我們看下buildInvokerChain 方法,這裡buildInvokerChain的三個引數分別是invoker, service.filter,provider:

``` /* * build consumer/provider filter chain * 在構建呼叫鏈時方法先獲取Filter列表,然後建立與Fitler數量一樣多Invoker * 結點,接著將這些結點串聯在一起,構成一個連結串列,最後將這個鏈的首結點返回,隨後的呼叫中,將從首結點開始,依次呼叫各個結點, * 完成呼叫後沿呼叫鏈返回。這裡各個Invoker結點的串聯是通過與其關聯的invoke 方法來完成的。 / @Override public Invoker buildInvokerChain(final Invoker originalInvoker, String key, String group) { Invoker last = originalInvoker; List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);

if (!filters.isEmpty()) {
    for (int i = filters.size() - 1; i >= 0; i--) {
        final Filter filter = filters.get(i);
        // 呼叫buildInvokerChain時會傳入invoker引數。
        final Invoker<T> next = last;
        // 通過迴圈遍歷獲取到的Filter,同時建立Invoker結點,每個結點對應一個Filter。此時迴圈內部定義了next指標。
        last = new FilterChainNode<>(originalInvoker, next, filter);
    }
}

return last;

} ``` 這個方法實際上是根據dubbo spi 擴充套件技術自動啟用的特性獲取到對應的filter們,然後一層一層的包裝這個invoker,生成一個過濾呼叫鏈,最後到真實的invoker上面。這個Filter我們後期會單獨拿出來解析,這裡只需要知道它一層層包裝了invoker就行。

6.1.2 ProtocolListenerWrapper

java @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// registry 是否是註冊中心 return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker),// dubbo export dubbo --->暴露服務 生成exporter Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));// exporter listener } 這裡不是REGISTRY_PROTOCOL,然後走了下面使用ListenerExporterWrapper 將服務暴露返回的exporter與自動啟用的listener們綁在了一起。我們可以看下這個ListenerExporterWrapper類, java public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) { if (exporter == null) { throw new IllegalArgumentException("exporter == null"); } this.exporter = exporter; this.listeners = listeners; if (CollectionUtils.isNotEmpty(listeners)) { RuntimeException exception = null; // 遍歷通知 for (ExporterListener listener : listeners) { if (listener != null) { try { // 通知 listener.exported(this); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } 我們可以看到就是遍歷通知listener,告訴他們當前服務暴露完了。這些listener們我們後面單獨拿出來解析。

6.1.3 QosProtocolWrapper

java public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //判斷是registry if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { // 啟動Qos伺服器? startQosServer(invoker.getUrl()); return protocol.export(invoker); } return protocol.export(invoker); } 這裡同理不是registry protocol ,直接到了下一層.

6.1.4 DubboProtocol

```java @Override public Exporter export(Invoker invoker) throws RpcException { URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
// 將invoker封裝為了DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將exporter寫入到快取map
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
    String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
        if (logger.isWarnEnabled()) {
            logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                    "], has set stubproxy support event ,but no stub methods founded."));
        }

    }
}
// 建立並啟動Netty Server
openServer(url);
optimizeSerialization(url);

return exporter;

} 這裡首先是根據url生成一個服務的key,建立一個DubboExporter 把invoker,key與快取exporter的map 綁在一起。將 建立的這個exporter快取到exporterMap裡面。我們可以exporterMap的定義: protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>(); ``` 其中key就是生成的那個serviceKey,然後value是建立的那個exporter。

接著就是呼叫openServer(url),來開啟伺服器,我們看下原始碼: ```java private void openServer(URL url) { // find server. // key格式 ip:暴露協議埠 例如,192.168.59.1:20881 String key = url.getAddress();

//client can export a service which's only for server to invoke
// 表示當前應用是否是provider
boolean isServer = url.getParameter(IS_SERVER_KEY, true);

if (isServer) {
    // 從快取map中獲取當前key對應的同異步轉換物件。
    // 從key的值可知,一個應用中每個服務暴露協議都會對應一個同異步轉換物件,
    // 而一個同異步轉換物件會對應一個Netty Server,即一個主機中每個服務暴露
    // 協議會對應建立一個Netty Server(面試題)
    // DCL
    ProtocolServer server = serverMap.get(key);
    if (server == null) {
        synchronized (this) {
            server = serverMap.get(key);
            if (server == null) {
                // 建立同異步轉換物件
                serverMap.put(key, createServer(url));
            }else {
                server.reset(url);
            }
        }
    } else {
        // server supports reset, use together with override
        // 將當前服務的url新增到server中
        server.reset(url);
    }
}

} 首先是獲取地址,這個地址形式是ip:port,獲取isserver引數,預設是true,然後從serverMap這個快取中查詢對應的server,如果之前沒有建立過,就呼叫createServer(url) 來建立server,之後把建立的server快取在serverMap中,我們先看下serverMap這個成員變數 protected final Map serverMap = new ConcurrentHashMap<>(); 其中key是伺服器地址,也就是上面url.getAddress();獲得的,value就是對應的ProtocolServer。 我們再來看看是怎麼建立server的,看下createServer(url)這個方法的原始碼:java private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
    // todo
    server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
        throw new RpcException("Unsupported client type: " + str);
    }
}

return new DubboProtocolServer(server);

} ``` 這個方法前面部分就是設定一些引數,channel.readonly.sent =true,就是伺服器關閉的時候指傳送只讀屬性,heartbeat=60*1000 設定預設的心跳時間,獲取server,預設的情況下使用netty,設定Codec 的型別為dubbo。

然後Exchangers.bind(url, requestHandler),其實這個Exchangers 是個門面類,封裝了bind與connect兩個方法的呼叫

6.1.5 Exchanger

我們可以看看Exchanger的方法:

image.png 我們接著,看下當前呼叫的bind方法: java public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //驗證 if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } // 如果沒有codec 的話 就設定為exchange url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // exchanger.bind() return getExchanger(url).bind(url, handler); } 前面的都是引數校驗,我們看下getExchanger(url) 方法: java public static Exchanger getExchanger(URL url) { // 獲取exchanger 預設header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); }

這裡可以看到從url中獲取exchanger ,預設是header,然後使用dubbo spi 獲取到HeaderExchanger,我們看下HeaderExchanger原始碼:

6.1.6 HeaderExchanger

```java public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {

    // 建立一個通訊client
    //DecodeHandler => HeaderExchangeHandler => ExchangeHandler( handler ) 。
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

    // 建立一個通訊server  DecodeHandler  << HeaderExchangeHandler  << handler
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

} ``` 這裡有三個點,一是將這handler又包裝了兩層,二是使用Transporters 這個門面類進行bind,再就是建立HeaderExchangeServer 將server進行增強,其實HeaderExchangeServer 這個是專門傳送心跳的。 我們先看下Transporters這個類,

6.1.7 Transporters

這個Transporters也是門面類,對外統一了bind 與connect。我們只看下與這次有關的部分 java public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { //驗證 if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 多個channal 對 Channel分發 ChannelHandlerDispatcher 迴圈 handler = new ChannelHandlerDispatcher(handlers); } // 真正伺服器 進行bind return getTransporter().bind(url, handler); } public static Transporter getTransporter() { // 獲取transporter return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } 首先是引數校驗,接著判斷handler的個數,如果一個話還好說,多個話就要使用ChannelHandlerDispatcher類來包裝了,其實裡面就是對多個handler迴圈呼叫,接著呼叫getTransporter獲取Transporter擴充套件點的自適應類。 我們可以稍微看下Transporter 這個擴充套件點的程式碼, ```java @SPI("netty") // 預設是netty4的 public interface Transporter {

@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;

} ``` 可以看到bind自適應實現類是根據server 與transporter 這兩個引數決定的,server這個引數咱們上面的DubboProtocol類裡面createServer 方法中就設定好了,預設使用netty,我們通過看dubbo spi 配置檔案:

image.png netty對應的也就是NettyTransporter這個實現類。

6.1.8 NettyTransporter

```java / * 實現 Transporter 介面,基於 Netty4 的網路傳輸實現類 */ public class NettyTransporter implements Transporter { / * 副檔名 */ public static final String NAME = "netty";

@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}

} ``` 我們可以看到new了一個NettyServer物件。我們來看下程式碼

6.1.9 NettyServer

在看NettyServer前先看下它的繼承結構

image.png 看下NettyServer的構造方法: public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } 呼叫的父類的構造 先看下AbstractServer程式碼: ```java public abstract class AbstractServer extends AbstractEndpoint implements Server { protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); // 執行緒池 ExecutorService executor; // 服務地址 private InetSocketAddress localAddress; //繫結地址 private InetSocketAddress bindAddress; // 伺服器最大可接受連線數 private int accepts; // 空閒超時時間 private int idleTimeout = 600; public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//ip int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); //port if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // 伺服器最大可接受連線數 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); // 子類實現, 真正開啟伺服器 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method // 獲取執行緒池 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }

protected abstract void doOpen() throws Throwable;

} 我們可以看到AbstractServer的構造前面都是些引數的獲取,之後呼叫doOpen()方法,具體是由子類實現的我們在看看NettyServer的doOpen方法java protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap();

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
//iothreads  預設是cpu核心數+1  與 32 進行比較,取小的那個  也就是最大不超過32
workerGroup = NettyEventLoopFactory.eventLoopGroup(
        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        "NettyServerWorker");

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();

boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
/**
 *  ChannelOption.SO_REUSEADDR  這個引數表示允許重複使用本地地址和埠,
 *
 * 比如,某個伺服器程序佔用了TCP的80埠進行監聽,此時再次監聽該埠就會返回錯誤,
 * 使用該引數就可以解決問題,該引數允許共用該埠,這個在伺服器程式中比較常使用,
 * 比如某個程序非正常退出,該程式佔用的埠可能要被佔用一段時間才能允許其他程序使用,
 * 而且程式死掉以後,核心一需要一定的時間才能夠釋放此埠,不設定SO_REUSEADDR
 * 就無法正常使用該埠。
 */
// 設定執行緒組
bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // FIXME: should we use getTimeout()?
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation",
                            SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                }
                ch.pipeline()
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                        .addLast("handler", nettyServerHandler);
            }
        });
// bind  啟動Netty Server
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();

} ``` 我們可以看到就是啟動netty伺服器,然後boss執行緒是一個,然後work執行緒是是cpu數+1然後與32做比較,取最小的那個,也就是最大不超過32個執行緒。\ 到這裡我們的服務就算啟動完成了。\ 我們再回過頭來看看HeaderExchangeServer這個維護心跳的

6.1.10 HeaderExchangeServer

```java public HeaderExchangeServer(RemotingServer server) { Assert.notNull(server, "server == null"); this.server = server; // 開啟空閒檢測任務 startIdleCheckTask(getUrl()); } private void startIdleCheckTask(URL url) { if (!server.canHandleIdle()) {

    AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
    int idleTimeout = getIdleTimeout(url);
    long idleTimeoutTick = calculateLeastDuration(idleTimeout);
    CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
    this.closeTimerTask = closeTimerTask;

    // init task and start timer.
    // 初始化任務並開啟定時器
    IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}

} public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); }

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
    pendingTimeouts.decrementAndGet();
    throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
}

start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
    deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;

} ``` 好了,到這裡我們遠端服務暴露中啟動部分就算結束了,這裡我們並沒有說太多的細節,我只需要把服務暴露伺服器啟動這個過程縷順就可以了,具體的細節方面放到後面分析。

6.2 服務註冊

6.1 小結中講了DubboProtocol一步步到伺服器啟動的。但是這些只是我們在服務遠端暴露的RegistryProtocol#export()方法的一個方法,只是介紹了伺服器啟動,我們知道服務暴露其實是伺服器啟動+服務註冊,今天我們就接著看看RegistryProtocol#export()方法後面的部分。

6.2.1 RegistryProtocol#export

```java

// 獲取註冊中心URL URL registryUrl = getRegistryUrl(originInvoker);

// 獲取服務暴露URL // url to export locally URL providerUrl = getProviderUrl(originInvoker);

// 服務暴露,即將服務的exporter寫入到快取map //export invoker doLocalExport表示本地啟動服務不包括去註冊中心註冊 final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);

// url to registry 獲取註冊中心例項 final Registry registry = getRegistry(registryUrl); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

// decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 註冊到註冊中心 register(registry, registeredProviderUrl); } ... 然後接著就是從originInvoker裡面獲取registryUrl,接著就是獲取Registry的物件。我們看下這個getRegistry()方法: protected Registry getRegistry(final URL registryUrl) { return registryFactory.getRegistry(registryUrl); } ``` 通過registryFactory 獲取Registry,其實這個registryFactory 是RegistryProtocol的一個成員,然後是dubbo spi 自動setter注入進來的。我們來看一下registryFactory介面原始碼:

image.png 我們可以看到@Adaptive的值是protocol,咱們上面的url中的protocol值正好是zookeeper,就可以找到ZookeeperRegistryFactory,我們看下ZookeeperRegistryFactory繼承圖:

image.png

繼承了一個抽象類AbstractRegistryFactory,然後getRegistry()方法就是在這個抽象類中,我們可以看下: java // Registry Collection Map<RegistryAddress, Registry> private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>(); @Override public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) // interface .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) // export refer .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { // 從快取中獲取, 有就返回, 沒有就建立 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 建立方法由子類實現 模板方法設計模式 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } } // 由子類實現的 建立方法, 模板方法設計模式 protected abstract Registry createRegistry(URL url); 前面就是設定引數,移除引數,然後將url轉換成一個serviceKey的字串,之後就是根據這個serviceKey去這個成員map中獲取value,如果存在就返回,不存在就呼叫子類的createRegistry方法來建立Registry,然後再塞進去這個map中快取。我們看下子類的createRegistry方法實現,這裡就是ZookeeperRegistryFactory的實現: ```java public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

private ZookeeperTransporter zookeeperTransporter;

public ZookeeperRegistryFactory() {
    this.zookeeperTransporter = ZookeeperTransporter.getExtension();
}

@DisableInject
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
    this.zookeeperTransporter = zookeeperTransporter;
}

@Override
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

} ``` 這裡就是new了個ZookeeperRegistry物件然後返回了,這裡zookeeperTransporter 成員是dubbo spi 建立完成這個物件自動注入的,這個咱們後面再解析。

我們接著RegistryProtocol#export 方法的解析,final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);這行程式碼就是獲取服務提供者的url,接著下面就是獲取一個register的引數,預設是true,

我們再回到RegistryProtocol#export方法中,判斷register,這裡是true,然後走了register(registryUrl, registeredProviderUrl);,我們看下register方法, java private void register(Registry registry, URL registeredProviderUrl) { registry.register(registeredProviderUrl); } 可以看出來,直接使用registry物件,這其實還是那個ZookeeperRegistry,然後就是呼叫ZookeeperRegistry#register 方法,這個咱們講到註冊中心部分再說。

7. 整體流程圖

image.png image.png

服務暴露的核心程式碼在這裡就結束了,敬請期待下一篇文章哈

參考文章

Dubbo3.0原始碼註釋github地址
dubbo原始碼系列\ dubbo原始碼分析專欄