【Eureka技術指南】「SpringCloud」從源碼層面讓你認識Eureka工作流程和運作機制(下)
theme: smartblue
小知識,大挑戰!本文正在參與“程序員必備小知識”創作活動。
承接上文的對應的Eureka的上篇介紹,我們開始介紹,詳見 [【SpringCloud技術專題】「Eureka源碼分析」從源碼層面讓你認識Eureka工作流程和運作機制(上)]
承接上文的對應的Eureka的上篇介紹,我們開始介紹,詳見 [【SpringCloud技術專題】「Eureka源碼分析」從源碼層面讓你認識Eureka工作流程和運作機制(上)]
原理回顧
- Eureka Server 提供服務註冊服務,各個節點啟動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務註冊表中將會存儲所有可用服務節點的信息,服務節點的信息可以在界面中直觀的看到。
- Eureka Client 是一個Java 客户端,用於簡化與Eureka Server的交互,客户端同時也具備一個內置的、使用輪詢負載算法的負載均衡器。
- 在應用啟動後,將會向Eureka Server發送心跳(默認週期為30秒),如果Eureka Server在多個心跳週期(默認3個心跳週期=90秒)沒有收到某個節點的心跳,Eureka Server將會從服務註冊表中把這個服務節點移除。
- 高可用情況下的:Eureka Server之間將會通過複製的方式完成數據的同步;
- Eureka Client具有緩存的機制,即使所有的Eureka Server 都掛掉的話,客户端依然可以利用緩存中的信息消費其它服務的API;
EurekaServer 啟動流程分析
EurekaServer 處理服務註冊、集羣數據複製
EurekaClient 是如何註冊到 EurekaServer 的?
剛才在org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每個方法都打了一個斷點,而且現在EurekaServer已經處於Debug運行狀態,那麼我們就隨便找一個被 @EnableEurekaClient 的微服務啟動試試微服務來試試吧,直接Run。
- 當啟動後,就一定會調用註冊register方法,那麼就接着往下看,拭目以待;
實例註冊方法機制
InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法進斷點了。
- InstanceRegistry.register順着堆棧信息往上看,是 ApplicationResource.addInstance 方法被調用了,分析addInstance;
ApplicationResource 類
主要是處理接收 Http 的服務請求。
```java @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); }
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
} ```
-
這裏的寫法貌似看起來和我們之前 Controller 的 RESTFUL 寫法有點不一樣,仔細一看,原來是Jersey RESTful 框架,是一個產品級的RESTful service 和 client 框架。與Struts類似,它同樣可以和hibernate,spring框架整合。
-
看到 registry.register(info, "true".equals(isReplication)); 註冊啊,原來EurekaClient客户端啟動後會調用會通過Http(s)請求,直接調到ApplicationResource.addInstance 方法,只要是和註冊有關的,都會調用這個方法。
-
接着我們深入 registry.register(info, "true".equals(isReplication)) 查看;
java
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
- handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication) 方法;
java
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
- 然後通過 ApplicationContext 發佈了一個事件 EurekaInstanceRegisteredEvent 服務註冊事件,可以給 EurekaInstanceRegisteredEvent 添加監聽事件,那麼用户就可以在此刻實現自己想要的一些業務邏輯。
- 然後我們再來看看 super.register(info, isReplication) 方法,該方法是 InstanceRegistry 的父類 PeerAwareInstanceRegistryImpl 的方法。
服務户廁機制
進入PeerAwareInstanceRegistryImpl 類的 register(final InstanceInfo info, final boolean isReplication) 方法;
java
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 註釋:續約時間,默認時間是常量值 90 秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 註釋:續約時間,當然也可以從配置文件中取出來,所以説續約時間值也是可以讓我們自己自定義配置的
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 註釋:將註冊方的信息寫入 EurekaServer 的註冊表,父類為 AbstractInstanceRegistry
super.register(info, leaseDuration, isReplication);
// 註釋:EurekaServer 節點之間的數據同步,複製到其他Peer
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
進入super.register(info, leaseDuration, isReplication),如何寫入EurekaServer 的註冊表的,進入AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。
```java
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 註釋:registry 這個變量,就是我們所謂的註冊表,註冊表是保存在內存中的;
Map
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
} ``` - 發現這個方法有點長,大致閲讀,主要更新了註冊表的時間之外,還更新了緩存等其它東西,大家有興趣的可以深究閲讀該方法;
集羣之間的複製
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的這個方法。
java
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 註釋:如果已經複製過,就不再複製
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍歷Eureka Server集羣中的所有節點,進行復制操作
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 沒有複製過,遍歷Eureka Server集羣中的node節點,依次操作,包括取消、註冊、心跳、狀態更新等。
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
-
每當有註冊請求,首先更新 EurekaServer 的註冊表,然後再將信息同步到其它EurekaServer的節點上去;
-
接下來我們看看 node 節點是如何進行復制操作的,進入 replicateInstanceActionsToPeers 方法。
java
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
- 節點之間的複製狀態操作,都在這裏體現的淋漓盡致,那麼我們就拿 Register 類型 node.register(info) 來看,我們來看看 node 究竟是如何做到同步信息的,進入 node.register(info) 方法看看;
同級之間的複製機制
PeerEurekaNode.register(final InstanceInfo info) 方法,一窺究竟如何同步數據。
public void register(final InstanceInfo info) throws Exception {
// 註釋:任務過期時間給任務分發器處理,默認時間偏移當前時間 30秒
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
- 這裏涉及到了 Eureka 的任務批處理,通常情況下Peer之間的同步需要調用多次,如果EurekaServer一多的話,那麼將會有很多http請求,所
以自然而然的孕育出了任務批處理,但是也在一定程度上導致了註冊和下線的一些延遲,突出優勢的同時也勢必會造成一些劣勢,但是這些延遲情況還是能符合
常理在容忍範圍之內的。
-
在 expiryTime 超時時間之內,批次處理要做的事情就是合併任務為一個List,然後發送請求的時候,將這個批次List直接打包發送請求出去,這樣的話,在這個批次的List裏面,可能包含取消、註冊、心跳、狀態等一系列狀態的集合List。
-
我們再接着看源碼,batchingDispatcher.process 這麼一調用,然後我們就直接看這個 TaskDispatchers.createBatchingTaskDispatcher 方法。
```java
public static
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
```
- 這裏的 process 方法會將任務添加到隊列中,有入隊列自然有出隊列,具體怎麼取任務,我就不一一給大家講解了,我就講講最後是怎麼觸發任務的。進入 final TaskExecutors
taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor) 這句代碼的 TaskExecutors.batchExecutors 方法。
java
static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
int workerCount,
final TaskProcessor<T> processor,
final AcceptorExecutor<ID, T> acceptorExecutor) {
final AtomicBoolean isShutdown = new AtomicBoolean();
final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {
@Override
public WorkerRunnable<ID, T> create(int idx) {
return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor);
}
}, workerCount, isShutdown);
}
- 我們發現 TaskExecutors 類中的 batchExecutors 這個靜態方法,有個 BatchWorkerRunnable 返回的實現類,因此我們再次進入 BatchWorkerRunnable 類看看究竟,而且既然是 Runnable,那麼勢必會有 run 方法。
```java
@Override
public void run() {
try {
while (!isShutdown.get()) {
// 註釋:獲取信號量釋放 batchWorkRequests.release(),返回任務集合列表
List
List<T> tasks = getTasksOf(holders);
// 註釋:將批量任務打包請求Peer節點
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
} ``` - 這就是我們 BatchWorkerRunnable 類的 run 方法,這裏面首先要獲取信號量釋放,才能獲得任務集合,一旦獲取到了任務集合的話,那麼就直接調用 processor.process(tasks) 方法請求 Peer 節點同步數據,接下來我們看看 ReplicationTaskProcessor.process 方法;
java
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
// 註釋:這裏通過 JerseyReplicationClient 客户端對象直接發送list請求數據
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
- 感覺快要見到真相了,所以我們迫不及待的進入 JerseyReplicationClient.submitBatchUpdates(ReplicationList replicationList) 方法一窺究竟。
java
@Override
public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
ClientResponse response = null;
try {
response = jerseyApacheClient.resource(serviceUrl)
// 註釋:這才是重點,請求目的相對路徑,peerreplication/batch/
.path(PeerEurekaNode.BATCH_URL_PATH)
.accept(MediaType.APPLICATION_JSON_TYPE)
.type(MediaType.APPLICATION_JSON_TYPE)
.post(ClientResponse.class, replicationList);
if (!isSuccess(response.getStatus())) {
return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
}
ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);
return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (response != null) {
response.close();
}
}
}
- 看到了相對路徑地址,我們搜索下"batch"這樣的字符串看看有沒有對應的接收方法或者被@Path註解進入的;在 eureka-core-1.4.12.jar 這個包下面,果然搜到到了 @Path("batch") 這樣的字樣,直接進入,發現這是 PeerReplicationResource 類的方法 batchReplication,我們進入這方法看看。
java
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
// 註釋:這裏將收到的任務列表,依次循環解析處理,主要核心方法在 dispatch 方法中。
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error(instanceInfo.getAction() + " request processing failed for batch item "
+ instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
-
看到了循環一次遍歷任務進行處理,不知不覺覺得心花怒放,勝利的重點馬上就要到來了,我們進入 PeerReplicationResource.dispatch 方法看看。 ```java private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); } ```
-
隨便抓一個類型,那我們也拿 Register 類型來看,進入 PeerReplicationResource.handleRegister 看看。
java
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
// 註釋:private static final String REPLICATION = "true"; 定義的一個常量值,而且還是回調 ApplicationResource.addInstance 方法
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
- Peer節點的同步旅程終於結束了,最終又回調到了 ApplicationResource.addInstance 這個方法,這個方法在最終是EurekaClient啟動後註冊調用的方法,然而Peer節點的信息同步也調用了這個方法,僅僅只是通過一個變量 isReplication 為true還是false來判斷是否是節點複製。剩下的ApplicationResource.addInstance流程前面已經提到過了,相信大家已經明白了註冊的流程是如何扭轉的,包括批量任務是如何處理EurekaServer節點之間的信息同步的了。
EurekaClient 啟動流程分析
調換運行模式
Run運行discovery-eureka服務,Debug 運行 provider-user 服務,先觀察日誌先;
2017-10-23 19:43:07.688 INFO 1488 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2017-10-23 19:43:07.694 INFO 1488 --- [ main] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml
2017-10-23 19:43:08.134 INFO 1488 --- [ main] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Disable delta property : false
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application is null : false
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true
2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application version is -1: true
2017-10-23 19:43:08.345 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server
2017-10-23 19:43:08.630 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : The response status is 200
2017-10-23 19:43:08.631 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Starting heartbeat executor: renew interval is: 30
2017-10-23 19:43:08.634 INFO 1488 --- [ main] c.n.discovery.InstanceInfoReplicator : InstanceInfoReplicator onDemand update allowed rate per min is 4
2017-10-23 19:43:08.637 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1508758988637 with initial instances count: 0
2017-10-23 19:43:08.657 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application springms-provider-user with eureka with status UP
2017-10-23 19:43:08.658 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1508758988658, current=UP, previous=STARTING]
2017-10-23 19:43:08.659 INFO 1488 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_SPRINGMS-PROVIDER-USER/springms-provider-user:192.168.3.101:7900: registering service...
2017-10-23 19:43:08.768 INFO 1488 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 7900 (http)
2017-10-23 19:43:08.768 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 7900
2017-10-23 19:43:08.773 INFO 1488 --- [ main] c.s.cloud.MsProviderUserApplication : Started ProviderApplication in 882.1 seconds (JVM running for 10.398)
服務提供方主體加載流程
-
【1】:仔細查看下日誌,先是 DefaultLifecycleProcessor 類處理了一些 bean,然後接下來肯定會調用一些實現 SmartLifecycle 類的start 方法;
-
【2】: 接着初始化設置了EurekaClient的狀態為 STARTING,初始化編碼使用的格式,哪些用JSON,哪些用XML;
-
【3】: 緊接着打印了強制獲取註冊信息狀態為false,已註冊的應用大小為0,客户端發送心跳續約,心跳續約間隔為30秒,最後打印Client初始化完成;
EnableEurekaClient 組件。
java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableDiscoveryClient
public @interface EnableEurekaClient {}
@EnableEurekaClient
這個註解類竟然也使用了註解 @EnableDiscoveryClient,那麼我們有必要去這個註解類看看。
java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {}
@EnableDiscoveryClient
這個註解類有個比較特殊的註解 @Import,由此我們猜想,這裏的大多數邏輯是不是都寫在這個 EnableDiscoveryClientImportSelector 類呢?
EnableDiscoveryClientImportSelector
java
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector<EnableDiscoveryClient> {
@Override
protected boolean isEnabled() {
return new RelaxedPropertyResolver(getEnvironment()).getProperty(
"spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
}
@Override
protected boolean hasDefaultFactory() {
return true;
}
}
EnableDiscoveryClientImportSelector 類繼承了 SpringFactoryImportSelector 類,但是重寫了一個 isEnabled() 方法,默認值返回 true,為什麼會返回true。
java
/**
* Select and return the names of which class(es) should be imported based on
* the {@link AnnotationMetadata} of the importing @{@link Configuration} class.
*/
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
EnableDiscoveryClientImportSelector.selectImports
首先通過註解獲取了一些屬性,然後加載了一些類名稱,我們進入loadFactoryNames 方法看看。
java
public static List<String> loadFactoryNames(Class<?> factoryClass, ClassLoader classLoader) {
String factoryClassName = factoryClass.getName();
try {
// 註釋:public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";
// 註釋:這個 jar 包下的一個配置文件
Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :
ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));
List<String> result = new ArrayList<String>();
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url));
String factoryClassNames = properties.getProperty(factoryClassName);
result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames)));
}
return result;
}
catch (IOException ex) {
throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() +
"] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex);
}
}
加載了一個配置文件,配置文件裏面寫了啥呢?打開SpringFactoryImportSelector該文件所在的jar包的spring.factories文件一看。
````
AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.client.CommonsClientAutoConfiguration,\ org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\ org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\ org.springframework.cloud.commons.util.UtilAutoConfiguration
Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\ org.springframework.cloud.client.HostInfoEnvironmentPostProcessor ````
都是一些 Configuration 後綴的類名,所以這些都是加載的一堆堆的配置文件類。 factories對象裏面只有一個類名路徑為 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。
EurekaDiscoveryClientConfiguration
java
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@CommonsLog
public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered {
@Override
public void start() {
// only set the port if the nonSecurePort is 0 and this.port != 0
if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {
this.instanceConfig.setNonSecurePort(this.port.get());
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {
maybeInitializeClient();
if (log.isInfoEnabled()) {
log.info("Registering application " + this.instanceConfig.getAppname()
+ " with eureka with status "
+ this.instanceConfig.getInitialStatus());
}
this.applicationInfoManager
.setInstanceStatus(this.instanceConfig.getInitialStatus());
if (this.healthCheckHandler != null) {
this.eurekaClient.registerHealthCheck(this.healthCheckHandler);
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.instanceConfig));
this.running.set(true);
}
}
}
- 首先看到該類實現了SmartLifecycle 接口,那麼就肯定會實現 start 方法,而且這個 start 方法感覺應在會被加載執行的。
this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 這段代碼有一個觀察者模式的回調存在。
java // ApplicationInfoManager.setInstanceStatus 的方法 public synchronized void setInstanceStatus(InstanceStatus status) {// 打上斷點 InstanceStatus prev = instanceInfo.setStatus(status); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, status)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } }
-
這個方法會因為狀態的改變而回調所有實現 StatusChangeListener 這個類的地方,前提得先註冊到 listeners 中去才行。
-
於是乎,我們斷定,若想要回調,那麼就必須有地方先註冊這個事件,而且這個註冊還必須提前執行在 start 方法前執行,於是我們得先在ApplicationInfoManager 這個類中找到註冊到 listeners 的這個方法。
java
public void registerStatusChangeListener(StatusChangeListener listener) {// 打上斷點
listeners.put(listener.getId(), listener);
}
-
於是我們逆向找下 registerStatusChangeListener 被調用的地方。
-
很不巧的是,盡然只有1個地方被調用,這個地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks 方法又是在 DiscoveryClient 的構造函數裏面調用的,同時我們也對 initScheduledTasks 以及 initScheduledTasks 被調用的構造方法地方 打上斷點。
果不其然,EurekaDiscoveryClientConfiguration.start 方法被調用了,緊接着 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也進入斷點,然後在往下走,又進入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回調處。
- 看着斷點依次經過我們上述分析的地方,然後也符合日誌打印的順序,所以我們現在應該是有必要好好看看 DiscoveryClient.initScheduledTasks 這個方法究竟幹了什麼偉大的事情。然而又想了想,還不如看看 initScheduledTasks 被調用的構造方法。
DiscoveryClient 經過 @Inject 註解過的構造方法。
```java
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// 註釋:定時任務調度準備
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 註釋:實例化心跳定時任務線程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 註釋:實例化緩存刷新定時任務線程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// 註釋:初始化調度任務
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
} ```
-
從往下看,initScheduledTasks 這個方法顧名思義就是初始化調度任務,所以這裏面的內容應該就是重頭戲,進入看看。 ```java private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 註釋:間隔多久去拉取服務註冊信息,默認時間 30秒 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 註釋:定時任務,每間隔 30秒 去拉取一次服務註冊信息 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); }
if (clientConfig.shouldRegisterWithEureka()) { // 註釋:間隔多久發送一次心跳續約,默認間隔時間 30 秒 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer // 註釋:定時任務,每間隔 30秒 去想 EurekaServer 發送一次心跳續約 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator // 註釋:實例信息複製器,定時刷新dataCenterInfo數據中心信息,默認30秒 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 註釋:實例化狀態變化監聽器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } // 註釋:狀態有變化的話,會回調這個方法 instanceInfoReplicator.onDemandUpdate(); } }; // 註釋:註冊狀態變化監聽器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else { logger.info("Not registering with Eureka server per configuration"); } } ```
-
在這個方法從上往下一路註釋分析下來,幹了EurekaClient我們最想知道的一些事情,定時任務獲取註冊信息,定時任務刷新緩存,定時任務心跳續約,定時任務同步數據中心數據,狀態變化監聽回調等。但是唯獨沒看到註冊,這是怎麼回事呢?
-
instanceInfoReplicator.onDemandUpdate() 就是在狀態改變的時候。
```java public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 註釋:這裏進行了實例信息刷新和註冊
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
} ``` - onDemandUpdate 這個方法,唯獨 InstanceInfoReplicator.this.run() 這個方法還有點用,而且還是 run 方法呢,感情 InstanceInfoReplicator 這個類還是實現了 Runnable 接口?經過查看這個類,還真是實現了 Runnable 接口。
- 這個方法應該我們要找的註冊所在的地方。
java
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
- discoveryClient.register() 這個 register 方法,原來註冊方法就是這個。
java
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
- 原來調用了 EurekaHttpClient 封裝的客户端請求對象來進行註冊的,再繼續深探 registrationClient.register 方法,於是我們來到了 AbstractJerseyEurekaHttpClient.register 方法。
java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
// 註釋:打包帶上當前應用的所有信息 info
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
- 調用的是 Jersey RESTful 框架來進行請求的,然後在 EurekaServer 那邊就會在 ApplicationResource.addInstance 方法接收客户端的註冊請求,因此我們的 EurekaClient 是如何註冊的就到此為止了。
- 完整秒殺架構的設計到技術關鍵點的“情報信息”
- 獨一無二的「MySQL調優金字塔」相信也許你擁有了它,你就很可能擁有了全世界。
- 【MySQL技術之旅】(5)該換換你的數據庫版本了,讓我們一同迎接8.0的到來哦!(初探篇)
- ☕【Java技術指南】「Java8編程專題」讓你真正會用對Java新版日期時間API編程指南
- 【Fegin技術專題】「原生態」打開Fegin之RPC技術的開端,你會使用原生態的Fegin嗎?(高級用法)
- 【優化技術專題】「線程間的高性能消息框架」終極關注Disruptor的核心源碼和Java8的@Contended偽共享指南
- 【優化技術專題】「線程間的高性能消息框架」再次細節領略Disruptor的底層原理和優勢分析
- 【Zookeeper核心原理】Paxos協議的原理和實際運行中的應用流程分析
- ☕【Java技術指南】「JPA編程專題」讓你不再對JPA技術中的“持久化型註解”感到陌生了!
- Java技術開發專題系列之【Guava RateLimiter】針對於限流器的入門到精通(含源碼分析介紹)
- ☕【Java技術指南】「JPA編程專題」讓你不再對JPA技術中的“持久化型註解”感到陌生了!
- 【Eureka技術指南】「SpringCloud」從源碼層面讓你認識Eureka工作流程和運作機制(下)
- MySQL技術專題(6)這也許是你的知識盲區-MySQL主從架構以及[半同步機制]
- 優化技術專題-線程間的高性能消息框架-深入淺出Disruptor的使用和原理
- ☕【Java技術指南】「併發編程專題」Fork/Join框架基本使用和原理探究(原理篇)
- ☕【Java技術指南】「併發編程專題」Guava RateLimiter針對於限流器的入門到精通(含源碼分析介紹)
- 【優化技術專題】「温故而知新」基於Quartz系列的任務調度框架的動態化任務實現分析
- ☕【Java技術指南】「併發編程專題」Guava RateLimiter針對於限流器的入門到精通(含實戰和原理分析)
- 【MySQL技術之旅】(4)這也許是你的知識盲區-[MySQL主從架構]之半同步機制
- ☕【Java技術指南】「併發編程專題」CompletionService框架基本使用和原理探究(基礎篇)