我们知道,在Eureka中,可以使用如下方法使Eureka主动下线,那么本篇文章就来分析一下子这个下线的流程

public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
} cancelScheduledTasks(); // If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
} if (eurekaTransport != null) {
eurekaTransport.shutdown();
} heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient");
}
}

主要做了这么几件事:

  1. 解除状态监听器
  2. 取消心跳、刷新线程
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
  1. 向服务端发起下线通知
 void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
} @Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
  1. 停止各个监听器
服务端接受下线消息

下线消息的处理在InstanceResource类中

@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication)); if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
} }
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
//往集群同步下线信息
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}

先看具体的下线逻辑,与租约过期清除的处理逻辑是一致的

   protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
//删除租约信息
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
//删除客户端状态信息
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}

其中invalidateCache则是删除当前服务中与该实例相关的缓存

集群的同步下线信息则跟集群信息注册的逻辑差不多

05-12 04:17