Dubbo 泛化呼叫在vivo統一配置系統的應用

語言: CN / TW / HK

作者:vivo 網際網路伺服器團隊- Wang Fei、LinYupan

Dubbo泛化呼叫特性可以在不依賴服務介面API包的場景中發起遠端呼叫, 這種特性特別適合框架整合和閘道器類應用開發。

本文結合在實際開發過程中所遇到的需要遠端呼叫多個三方系統的問題,闡述瞭如何利用Dubbo泛化呼叫來簡化開發降低系統耦合性的專案實踐,最後對Dubbo泛化呼叫的原理進行了深度解析。

一、背景

統一配置平臺是一個提供終端裝置各個模組進行檔案配置和檔案下發能力的平臺,模組開發在後臺伺服器進行檔案配置,然後終端裝置可以按照特定規則獲取對應的配置檔案,檔案下發可以按照多種裝置維度進行下發,具體專案框架可以參加下圖:

現有的下發策略,都是由模組開發在統一配置後臺伺服器進行下發維度配置,檔案是否下發到對應終端裝置,由使用者在本平臺所選擇的維度所確定。

但是其他業務方也存在在公司內部的A/B實驗平臺配置下發規則,來藉助統一配置平臺每天輪詢伺服器請求新檔案的能力,但是在統一配置平臺配置的檔案是否能夠下發由A/B實驗平臺來確定,A/B實驗平臺內會配置對應的規則以及配置統一配置平臺對應的檔案id,然後統一配置平臺就需要針對請求呼叫A/B實驗平臺介面來判斷檔案是否可以下發。

隨著公司內部實驗平臺的增加,越來越多這種由三方平臺來決定檔案是否下發的對接需求,如何更好更快的應對這種類似的對接需求,是我們需要去深入思考的問題。

二、方案選型

原有統一配置的下發邏輯是先找到所有可以下發的檔案,然後判斷單個配置檔案是否滿足裝置維度,如果滿足則可以下發。現在在對接A/B實驗平臺以後,檔案是否能下發還需要由外部系統來確定,當時設計時考慮過兩種方案:

方案一:

同樣先找到所有可以下發的檔案,然後針對單個檔案按照①裝置維度判斷是否匹配,然後②呼叫A/B實驗平臺的介面獲取這臺裝置可以下發的檔案Id, 再呼叫③灰度實驗平臺獲取這臺裝置可以下發的檔案id, 最後將前三步獲取到的配置檔案id進行彙總得到可以下發的檔案,如下圖所示。

方案一打破了原來檔案是否能夠下發的判斷邏輯,現在除了原有的判斷邏輯,還需要額外步驟呼叫其他系統來追加另外可以下發的檔案。並且後續不可避免對接其他三方系統,方案一需要不斷增加呼叫三方介面的邏輯來追加可以下發的檔案id。此外常規的dubbo呼叫在provider端需要引入其他實驗系統的二方庫以及模型型別,增加了統一配置系統和其他系統的強耦合性。

方案二: 利用 Dubbo 泛化呼叫高階特性抽象一個下發維度(遠端呼叫),專門用於其他想由三方實驗系統來決定是否下發檔案的場景,如下圖所示:

方案二統一抽象一個遠端呼叫下發維度,可以保持原有的判斷邏輯,也就是先把系統中所有可以下發的檔案先查找出來,然後根據裝置維度進行匹配,如果某一個檔案配置的是遠端呼叫維度,那麼查詢這個遠端呼叫維度所包含的函式名稱、引數型別陣列和引數值物件陣列,然後呼叫三方介面,從而判斷這個檔案是否可以下發到裝置,最終獲取到可以下發的檔案id列表。

此外,利用Dubbo泛化呼叫高階特性,呼叫方並不關心提供者的介面的詳細定義,只需要關注呼叫哪個方法,傳什麼引數以及接收到什麼返回結果即可,這樣避免需要依賴服務提供者的二方庫以及模型類元,這樣可以大大降低consumer端和provider端的耦合性。

綜合上面的分析,我們最終確定了方案二採取利用Dubbo泛化呼叫來抽象一個統一維度的方式,下面來看一下具體的實現。

三、具體實現

  1. GenericService是Dubbo提供的泛化介面,用來進行泛化呼叫。只提供了一個$invoke方法,三個入口引數分別為函式名稱、引數型別陣列和引數值物件陣列。

package com.alibaba.dubbo.rpc.service;

/**
* Generic service interface
*
* @export
*/
public interface GenericService {

/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;

2. 建立服務引用配置物件ReferenceConfig。

private ReferenceConfig<GenericService> buildReferenceConfig(RemoteDubboRestrictionConfig config) {
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(applicationConfig);
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(config.getInterfaceName());
referenceConfig.setVersion(config.getVersion());
referenceConfig.setGeneric(Boolean.TRUE.toString());
referenceConfig.setCheck(false);
referenceConfig.setTimeout(DUBBO_INVOKE_TIMEOUT);
referenceConfig.setRetries(DUBBO_INVOKE_RETRIES);
return referenceConfig;
}

3.設定請求引數及服務呼叫, 這裡利用在後臺所配置的完整方法名、引數型別陣列和引數值陣列就可以進行服務呼叫。

public List<Integer> invoke(RemoteDubboRestrictionConfig config, ConfigFileListQuery listQuery) {
//由於ReferenceConfig很重量,裡面封裝了所有與註冊中心及服務提供方連線,所以這裡做了快取
GenericService genericService = prepareGenericService(config);

//構建引數
Map<String, Object> params = buildParams(listQuery);
String method = config.getMethod();
String[] parameterTypeArray = new String[]{Map.class.getName()};
Object[] parameterValueArray = new Object[]{params};

long begin = System.currentTimeMillis();
Assert.notNull(genericService, "cannot find genericService");
//具體呼叫
Object result = genericService.$invoke(method, parameterTypeArray, parameterValueArray);

if (logger.isDebugEnabled()) {
long duration = System.currentTimeMillis() - begin;
logger.debug("Dubbo呼叫結果:{}, 耗時: {}", result, duration);
}

return result == null ? Collections.emptyList() : (List<Integer>) result;
}

那麼為什麼Dubbo泛化呼叫所涉及的呼叫方並不關心提供者的介面的詳細定義,只需要關注呼叫哪個方法,傳什麼引數以及接收到什麼返回結果即可呢?

在講解泛化呼叫的實現原理之前,先簡單講述一下直接呼叫的原理。

四、 Dubbo 直接呼叫相關原理

Dubbo的直接呼叫相關原理涉及到兩個方面:Dubbo服務暴露原理和Dubbo服務消費原理

4.1 Dubbo 服務暴露原理

4.1.1 服務遠端暴露的整體流程

在整體上看,Dubbo框架做服務暴露分為兩大部分,第一步將持有的服務例項通過代理轉換成Invoker,第二步會把Invoker通過具體的協議(比如Dubbo)轉換成Exporter,框架做了這層抽象也大大方便了功能擴充套件。

這裡的Invoker可以簡單理解成一個真實的服務物件例項,是 Dubbo框架實體域,所有模型都會向它靠攏,可向它發起invoke呼叫。它可能是一個本地的實 現,也可能是一個遠端的實現,還可能是一個叢集實現。

原始碼如下:

if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//向註冊中心註冊服務資訊
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}

首先將實現類ref封裝為Invoker,之後將invoker轉換為exporter,最後將exporter放入快取 exporters中。

4.1.2 服務暴露的細節

4.1.2.1 將實現類ref封裝為Invoker

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

① Dubbo遠端暴露的入口在ServiceBean的export()方法,由於servicebean繼承了serviceconfig類,於是真正執行暴露的邏輯是serviceconfig的doExport()方法。

② Dubbo支援相同服務暴露多個協議,比如同時暴露Dubbo和REST協議,也支援多個註冊中心,比如zookeeper和nacos,框架內部會依次 對使用的協議都做一次服務暴露,每個協議註冊元資料都會寫入多個註冊中心,具體是執行doExportUrlsFor1Protocol。

③ 然後通過動態代理的方式建立Invoker物件,在服務端生成AbstractProxylnvoker例項,所有真實的方法呼叫都會委託給代理,然後代理轉發給服務實現者 ref 呼叫;動態代理一般有:JavassistProxyFactory 和 JdkProxyFactory兩種方式,這裡所選用的JavassistProxyFactory 。

4.1.2.2 將invoker轉換為exporter

Exporter exporter

= protocol.export(wrapperInvoker);

Exporter<?> exporter = protocol.export(wrapperInvoker);

在將服務例項ref轉換成Invoker之後,開始執行服務暴露過程。

這裡會經過一系列的過濾器鏈路,最終會通過RegistryProtocol#export 進行更細粒度的控制,比如先進行服務暴露再註冊服務元資料。註冊中心在做服務暴露時依次 做了以下幾件事情:

  1. 委託具體協議(Dubbo)進行服務暴露,建立NettyServer監聽埠和儲存服務例項。

  2. 建立註冊中心物件,與註冊中心建立TCP連線。

  3. 註冊服務元資料到註冊中心。

  4. 訂閱configurators節點,監聽服務動態屬性變更事件。

  5. 服務銷燬收尾工作,比如關閉埠、反註冊服務資訊等。

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

URL registryUrl = getRegistryUrl(originInvoker);

//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);

ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

if (register) {
//TODO 註冊服務元資料
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}

// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

這裡我們重點講解委託具體協議進行服務暴露的過程doLocalExport(final InvokeroriginInvoker)。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}

(Exporter) protocol.export(invokerDelegete)方法又會經過一系列的攔截器進行處理,最終呼叫DubboProtocol的export方法。

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}

openServer(url);
optimizeSerialization(url);
return exporter;
}

這裡很重要的一點就是將exporter放到了快取,這裡的key是

serviceGroup/serviceName:serviceVersion:port這樣形式,這裡最後獲取到的是com.alibaba.dubbo.demo.DemoService:20880,之後建立DubboExporter。這裡的記憶體快取exporterMap是很重要的一個屬性,在後續消費者呼叫服務提供者時會被被再次使用到。

至此,伺服器提供者的遠端暴露流程就基本介紹完畢。

4.2 Dubbo服務消費的實現原理

4.2.1 服務消費的整體流程

在整體上看,Dubbo框架做服務消費也分為兩大部分,第一步通過持有遠端服務例項生成 Invoker,這個Invoker在客戶端是核心的遠端代理物件。第二步會把Invoker通過動態代理轉換 成實現使用者介面的動態代理引用。這裡的Invoker承載了網路連線、服務呼叫和重試等功能,在 客戶端,它可能是一個遠端的實現,也可能是一個叢集實現。

原始碼如下:

public Object getObject() throws Exception {
return get();
}

public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}

private void init() {
...
ref = createProxy(map);
}

private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 建立服務代理
return (T) proxyFactory.getProxy(invoker);
}

4.2.2 服務消費的細節

4.2.2.1 使用Protocol將interfaceClass轉化為Invoker

invoker = refprotocol.refer(interfaceClass, url);

① 服務引用的入口點在 ReferenceBean#getObject,

由於Referencebean'繼承了serviceconfig類,接著會呼叫Reference的get方法。

② 然後根據引用的介面型別將持有遠端服務例項生成 Invoker。

③ 通過一系列的過濾器鏈,最後呼叫RegistryProtocol的doRefer方法。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

這段邏輯主要完成了註冊中心例項的建立,元資料註冊到註冊中心及訂閱的功能。

具體遠端Invoker是在哪裡建立的呢?客戶端呼叫攔截器又是在哪裡構造的呢?

當在directory.subscrib()中 第一次發起訂閱時會進行一次資料拉取操作,同時觸發RegistryDirectory#notify方法,這裡 的通知資料是某一個類別的全量資料,比如providers和routers類別資料。當通知providers數 據時,在RegistryDirectory#toInvokers方法內完成Invoker轉換。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
......
URL url = mergeUrl(providerUrl);

String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
.........
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}

核心程式碼

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

這裡會經過一系列的過濾器鏈,然後最終呼叫DubboProtocol的refer方法,來建立具體的invoker。

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

這裡返回的invoker會用來更新RegistryDirectory的methodInvokerMap 屬性,最終在實際呼叫消費端方法時,會根據method找到對應的invoker列表。

private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

4.2.2.2 使用ProxyFactory建立代理

(T) proxyFactory.getProxy(invoker)

上述的proxyFactory是ProxyFactory$Adaptive例項,其getProxy內部最終得到是一個被StubProxyFactoryWrapper包裝後的JavassistProxyFactory。直接來看JavassistProxyFactory.getProxy方法。

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
  • 【invoker】:MockClusterInvoker例項

  • 【interfaces】:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]

我們最終返回的代理物件其實是一個proxy0物件,當我們呼叫其sayHello方法時,其呼叫內部的handler.invoke方法。

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0 implements DemoService {
public static Method[] methods;
private InvocationHandler handler;

public String sayHello(String paramString) {
Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramString;
Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
return (String) localObject;
}

public proxy0() {
}

public proxy0(InvocationHandler paramInvocationHandler) {
this.handler = paramInvocationHandler;
}
}

Dubbo泛化呼叫一般是伺服器提供者都採用直接暴露的形式,消費者端採用服務泛化呼叫的形式,所以這裡重點討論Dubbo泛化呼叫與直接呼叫在消費者端的服務引用和發起消費的區別與聯絡。

五、Dubbo泛化呼叫與直接呼叫的區別與聯絡

5.1 通過持有遠端服務例項生成 Invoker

private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
...
// 建立服務代理
return (T) proxyFactory.getProxy(invoker);
}

這裡的interfaceClass的來源不一樣,createProxy(Mapmap) 是在ReferenceConfig的init()方法中呼叫的,具體的interfaceClass根據是否是返回呼叫會有所區別,具體看如下程式碼:

private void init() {
...
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
...
ref = createProxy(map);
}

直接呼叫:

interfaceClass→com.alibaba.dubbo.demo.DemoService

泛化呼叫:

interfaceClass→com.alibaba.dubbo.rpc.service.GenericService

最終獲取的invoker也不一樣

直接呼叫:

interface com.alibaba.dubbo.demo.DemoService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=24932&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640744945905&side=consumer×tamp=1640745033688

泛化呼叫:

interface com.alibaba.dubbo.rpc.service.GenericService -> dubbo://xx.xx.xx.xx:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&bean.name=com.alibaba.dubbo.demo.DemoService&check=false&dubbo=2.0.2&generic=true&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=27952&qos.port=33333®ister.ip=xx.xx.xx.xx&remote.timestamp=1640748337173&side=consumer×tamp=1640748368427

5.2 服務發起消費流程

在4.2.2  服務消費者發起請求細節第①步是將請求引數(方法名,方法引數型別,方法引數值,服務名,附加引數)封裝成一個Invocation。

直接呼叫的RpcInvoaction如下:

RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={}]

泛化呼叫的RpcInvoaction如下:

RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]

我們可以發現這裡生成的RpcInvocation物件是有區別的,但是服務提供者暴露的服務是不會發生變化的,所以這裡必然有一個轉換過程,這裡的引數轉換關鍵就在於服務提供者端的GenericImplFilter類。

@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {

protected final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
logger.info("----------------GenericFilter-------------------------");
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);

if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}

if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
...
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
...
} else {
rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);
}

}

核心流程:

①  是否是泛化呼叫判斷

if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {

② 引數的提取

String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];

③  引數的序列化,再構造新的RpcInvocation物件

Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
}
...

Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));

序列化前RpcInvocation物件:

RpcInvocation [methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHello, [Ljava.lang.String;@22c1bff0, [Ljava.lang.Object;@30ae230f], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]

序列化後RpcInvocation物件:

RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=296, dubbo=2.0.2, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0, generic=true}]

後面的呼叫邏輯就和直接呼叫的是一致的了,比如從本地快取從本地快取中Map<string, list<invoker>> methodInvokerMap中獲取key為sayHello(指定方法名)的List<invoker>,接著進行後續的呼叫。

那麼什麼時候觸發GenericFilter的invoke方法呢,這裡其實就和過濾器的呼叫鏈建立有關係了,從GenericFilter類上的註解,我們可以看到@Activate(group = Constants.PROVIDER, order = -20000),說明是在服務提供者端生效的。

另外,服務提供者端是如何知道呼叫是直接呼叫還是泛化呼叫的,這裡就涉及到與服務提供者端GenericFilter對應的消費者端的GenericImplFilter類,程式碼如下:

/**
* GenericImplInvokerFilter
*/
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {

private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation instanceof RpcInvocation) {
...
}

if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {

Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {

for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}

((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
return invoker.invoke(invocation);
}

5.3 泛化呼叫的整體流程圖

六、總結

高內聚低耦合是我們架構設計的一個重要目標,而Dubbo的泛化呼叫特性僅僅只需知道服務的完整介面路徑、請求引數型別和請求引數值就可以直接進行呼叫獲取請求結果,能夠避免依賴特定三方jar包,從而降低了系統的耦合性。在日常學習和開發的過程中,我們除了需要關注一門技術的常規使用方法以外,還需要關注一些高階特性,從而做出更合適的架構設計。

參考資料:

  1. 《深入理解Apache Dubbo與實戰》 詣極,林琳

  2.  Dubbo原始碼分析-泛化呼叫 

  3. Dubbo泛化呼叫使用及原理解析