SpringCloud 源码系列(2)—— 注册中心 Eureka(中)
五、服务注册1、实例信息注册器初始化服务注册的代码位置不容易发现,我们看 DiscoveryClient 初始化调度任务的这个方法,这段代码会去初始化一个实例信息复制器 InstanceInfoReplicator,这个复制器就包含了实例的注册(明明是注册却叫 Replicator 感觉怪怪的)。 ① DiscoveryClient 初始化调度器的流程
1 private void initScheduledTasks() { 2 // 省略定时刷新注册表的任务... 3 4 if (clientConfig.shouldRegisterWithEureka()) { 5 省略定时心跳的任务... 6 7 实例信息复制器,用于定时更新自己状态,并向注册中心注册 8 instanceInfoReplicator = new InstanceInfoReplicator( 9 this,10 instanceInfo,1)">11 clientConfig.getInstanceInfoReplicationIntervalSeconds(),1)">12 2); burstSize 13 14 实例状态变更的监听器 15 statusChangeListener = ApplicationInfoManager.StatusChangeListener() { 16 @Override 17 public String getId() { 18 return "statusChangeListener"; 19 } 20 21 22 public notify(StatusChangeEvent statusChangeEvent) { 23 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { 24 logger.error("Saw local status change event {}"25 } else { 26 logger.info("Saw local status change event {}"27 } 28 instanceInfoReplicator.onDemandUpdate(); 29 30 }; 31 32 向 ApplicationInfoManager 注册状态变更监听器 33 (clientConfig.shouldOnDemandUpdateStatusChange()) { 34 applicationInfoManager.registerStatusChangeListener(statusChangeListener); 35 } 36 37 启动实例信息复制器,默认延迟时间40秒 38 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); 39 } 40 logger.info("Not registering with Eureka server per configuration"); 41 } 42 } ② InstanceInfoReplicator 的构造方法
1 InstanceInfoReplicator(DiscoveryClient discoveryClient,InstanceInfo instanceInfo,int replicationIntervalSeconds,1)">int burstSize) { this.discoveryClient = discoveryClient; 3 this.instanceInfo = instanceInfo; 单线程的调度器 5 this.scheduler = Executors.newScheduledThreadPool(1 6 ThreadFactoryBuilder() 7 .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") 8 .setDaemon(true 9 .build()); 10 11 this.scheduledPeriodicRef = new AtomicReference<Future>(); 12 started 设置为 false 13 this.started = new AtomicBoolean(false14 以分钟为单位的限流器 15 this.rateLimiter = RateLimiter(TimeUnit.MINUTES); 16 间隔时间,默认为30秒 17 this.replicationIntervalSeconds = replicationIntervalSeconds; 18 this.burstSize = burstSize; 19 允许每分钟更新的频率 60 * 2 / 30 = 4 20 this.allowedRatePerMinute = 60 * this.burstSize / .replicationIntervalSeconds; 21 logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}"22 } ③ 启动 InstanceInfoReplicator
void start( initialDelayMs) { 启动时 started 设置为 true if (started.compareAndSet(false,1)">)) { 4 设置为 dirty,便于下一次心跳时同步到 eureka server 5 instanceInfo.setIsDirty(); 6 延迟40秒后开始调度当前任务 7 Future next = scheduler.schedule( 8 将 Future 放到本地变量中 scheduledPeriodicRef.set(next); } 12 13 /////// 14 15 synchronized setIsDirty() { 16 isInstanceInfoDirty = 17 lastDirtyTimestamp = System.currentTimeMillis(); 18 } 2、客户端实例注册① 实现注册的run方法 接着看 InstanceInfoReplicator 的 run 方法,这个方法就是完成注册的核心位置。
run() { try 3 更新本地实例信息,如果实例信息有变更,则 dirty=true 4 discoveryClient.refreshInstanceInfo(); 5 设置为 dirty 时的时间戳 7 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { 9 注册实例 discoveryClient.register(); 11 设置 dirty=false 12 instanceInfo.unsetIsDirty(dirtyTimestamp); 14 } catch (Throwable t) { 15 logger.warn("There was a problem with the instance info replicator"16 } finally17 30秒之后再调度 18 Future next = scheduler.schedule(20 21 } ② 实例信息刷新 再来细看下?refreshInstanceInfo 刷新实例信息的方法:
refreshInstanceInfo() { 如果有必要,就更新数据中心的信息 3 applicationInfoManager.refreshDataCenterInfoIfRequired(); 如果有必要,就更新续约信息,比如动态更新了配置文件,这时就更新续约信息 LeaseInfo,并将实例设置为 dirty applicationInfoManager.refreshLeaseInfoIfRequired(); 7 InstanceStatus status; 8 9 用监控检查器检查实例的状态 10 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); 11 } (Exception e) { 12 logger.warn("Exception from healthcheckHandler.getStatus,setting status to DOWN"13 status = InstanceStatus.DOWN; 14 15 if (null != status) { 设置实例状态,实例状态变了会触发状态变更的监听器 18 applicationInfoManager.setInstanceStatus(status); 21 22 ////////////////////////////////23 24 refreshLeaseInfoIfRequired() { 25 当前实例续约信息 26 LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); 27 if (leaseInfo == 28 return30 从配置中获取续约信息 31 int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); 32 int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); 33 如果续约信息变了,就重新创建续约信息,并设置实例为 dirty 34 if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { 35 LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() 36 .setRenewalIntervalInSecs(currentLeaseRenewal) 37 .setDurationInSecs(currentLeaseDuration) .build(); 39 instanceInfo.setLeaseInfo(newLeaseInfo); 40 42 43 44 45 46 HealthCheckHandler getHealthCheckHandler() { 47 HealthCheckHandler healthCheckHandler = .healthCheckHandlerRef.get(); 48 if (healthCheckHandler == 49 可以自定义 HealthCheckHandler 实现健康检查 50 healthCheckHandlerProvider) { 51 healthCheckHandler = healthCheckHandlerProvider.get(); 52 } else healthCheckCallbackProvider) { 53 可以自定义 HealthCheckCallback 实现健康检查,HealthCheckCallback 已过期,建议使用 HealthCheckHandler 54 healthCheckHandler = HealthCheckCallbackToHandlerBridge(healthCheckCallbackProvider.get()); 55 56 57 null == healthCheckHandler) { 58 没有自定义的就是用默认的桥接类 59 healthCheckHandler = new HealthCheckCallbackToHandlerBridge(60 61 this.healthCheckHandlerRef.compareAndSet(62 63 64 return 65 66 67 ////////////////////////////////////// 68 69 setInstanceStatus(InstanceStatus status) { 70 InstanceStatus next = instanceStatusMapper.map(status); 71 if (next == 72 73 74 75 如果状态变更了,才会返回之前的状态,然后触发状态变更监听器 76 InstanceStatus prev = instanceInfo.setStatus(next); 77 if (prev != 78 for (StatusChangeListener listener : listeners.values()) { 79 80 listener.notify( StatusChangeEvent(prev,next)); 81 } 82 logger.warn("failed to notify listener: {}"83 84 85 86 } ③ 向 eureka server 注册 在 run 方法里调用了?discoveryClient.register() 方法实现了客户端实例向注册中心的注册,进入到 register 方法可以看到,他就是使用前面构造的?EurekaTransport 来发起远程调用。 一层层进去,很容易发现就是调用了 eureka-server 的 POST /apps/{appName} 接口,后面我们就从 eureka-core 中找这个接口就可以找到注册中心实现服务注册的入口了。 boolean register() throws Throwable { 2 logger.info(PREFIX + "{}: registering service..." 3 EurekaHttpResponse<Void> httpResponse; registrationClient => JerseyReplicationClient 6 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); 7 } 8 logger.warn(PREFIX + "{} - registration failed {}"throw e; (logger.isInfoEnabled()) { 12 logger.info(PREFIX + "{} - registration status: {}"return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); 16 17 18 public EurekaHttpResponse<Void> register(InstanceInfo info) { 调用的是 POST apps/{appName} 接口 21 String urlPath = "apps/" + info.getAppName(); 22 ClientResponse response = 23 24 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); 25 addExtraHeaders(resourceBuilder); 26 response = resourceBuilder 27 .header("Accept-Encoding","gzip" .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) 30 post 方法 31 .post(ClientResponse.class anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); 33 } 34 (logger.isDebugEnabled()) { 35 logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}"36 response == null ? "N/A" : response.getStatus()); 38 if (response != response.close(); 42 } ④ 注册中心设置实例状态为已启动 再回想下注册中心的初始化流程,在最后调用?openForTraffic 方法时,最后也会调用 ApplicationInfoManager 的 setInstanceStatus 方法,将实例状态设置为已启动,这个时候就会触发客户端注册到注册中心的动作。 applicationInfoManager.setInstanceStatus(InstanceStatus.UP); ⑤ 完成监听实例变更的方法 状态变更器会调用 onDemandUpdate 方法来完成实例状态变更后的逻辑。
boolean onDemandUpdate() { 限流控制 (rateLimiter.acquire(burstSize,allowedRatePerMinute)) { if (!scheduler.isShutdown()) { 5 scheduler.submit( Runnable() { 6 @Override 7 8 logger.debug("Executing on-demand update of local InstanceInfo" 9 10 如果上一次的任务还没有执行完,直接取消掉,然后执行注册的任务 11 Future latestPeriodic = scheduledPeriodicRef.get(); 12 if (latestPeriodic != null && !latestPeriodic.isDone()) { 13 logger.debug("Canceling the latest scheduled update,it will be rescheduled at the end of on demand update"14 latestPeriodic.cancel( } 17 InstanceInfoReplicator..run(); }); 20 21 } 22 logger.warn("Ignoring onDemand update due to stopped scheduler"23 25 } 26 logger.warn("Ignoring onDemand update due to rate limiter"27 29 } ⑥ 限流器 最后简单看下限流器 RateLimiter 的设计:
/** 2 * Rate limiter implementation is based on token bucket algorithm. There are two parameters: * <ul> * <li> * burst size - maximum number of requests allowed into the system as a burst * </li> 8 * average rate - expected number of requests per second (RateLimiters using MINUTES is also supported) * </ul> * * @author Tomasz Bak 13 */ RateLimiter { final long rateToMsConversion; 17 final AtomicInteger consumedTokens = AtomicInteger(); final AtomicLong lastRefillTime = new AtomicLong(0 @Deprecated 22 RateLimiter() { 23 (TimeUnit.SECONDS); 25 26 RateLimiter(TimeUnit averageRateUnit) { switch (averageRateUnit) { 28 case SECONDS: 29 rateToMsConversion = 1000break31 MINUTES: 32 rateToMsConversion = 60 * 100033 34 default: 35 throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported"38 39 boolean acquire(int burstSize,1)"> averageRate) { 40 acquire(burstSize,averageRate,System.currentTimeMillis()); 42 43 long averageRate,1)"> currentTimeMillis) { 44 if (burstSize <= 0 || averageRate <= 0) { Instead of throwing exception,we just let all the traffic go 45 47 48 refillToken(burstSize,currentTimeMillis); consumeToken(burstSize); 50 51 52 void refillToken(53 上一次填充 token 的时间 54 long refillTime = lastRefillTime.get(); 55 时间差 56 long timeDelta = currentTimeMillis - refillTime; 固定生成令牌的速率,即每分钟4次 58 例如刚好间隔15秒进来一个请求,就是 15000 * 4 / 60000 = 1,newTokens 代表间隔了多少次,如果等于0,说明间隔不足15秒 59 long newTokens = timeDelta * averageRate /60 if (newTokens > 061 long newRefillTime = refillTime == 0 62 ? currentTimeMillis 63 注意这里不是直接设置的当前时间戳,而是根据 newTokens 重新计算的,因为有可能同一周期内同时有多个请求进来,这样可以保持一个固定的周期 64 : refillTime + newTokens * rateToMsConversion / averageRate; 65 (lastRefillTime.compareAndSet(refillTime,newRefillTime)) { 66 while (67 调整令牌的数量 68 int currentLevel = consumedTokens.get(); 69 int adjustedLevel = Math.min(currentLevel,burstSize); 70 currentLevel 可能为2,重置为了 0 或 1 71 int newLevel = (int) Math.max(0,adjustedLevel - newTokens); 72 (consumedTokens.compareAndSet(currentLevel,newLevel)) { 73 74 75 76 77 78 79 80 boolean consumeToken(81 82 83 突发数量为2,也就是允许15秒内最多有两次请求进来 84 if (currentLevel >=85 86 87 if (consumedTokens.compareAndSet(currentLevel,currentLevel + 188 89 90 91 92 93 reset() { 94 consumedTokens.set(095 lastRefillTime.set(096 97 } 3、Eureka Server 接收注册请求① 找到实例注册的API入口 从前面的分析中,我们知道服务端注册的API是 POST /apps/{appName},由于 eureka 是基于 jersey 来通信的,想找到API入口还是有点费劲的,至少没有 springmvc 那么容易。 先看?ApplicationsResource 这个类,可以找到?getApplicationResource 这个方法的路径是符合 /apps/{appName} 这个规则的。然后可以看到它里面创建了?ApplicationResource,再进入到这个类里面,就可以找到 @Post 标注的?addInstance 方法,这就是注册的入口了。可以看到它是调用了注册表的 register 方法来注册实例的。 1 @Path("/{version}/apps" 2 @Produces({"application/xml","application/json"}) ApplicationsResource { final EurekaServerConfig serverConfig; PeerAwareInstanceRegistry registry; 6 ResponseCache responseCache; 7 符合规则 /apps/{appName} 9 @Path("{appId}"10 ApplicationResource getApplicationResource( 11 @PathParam("version") String version,1)">12 @PathParam("appId") String appId) { CurrentRequestVersion.set(Version.toEnum(version)); 15 真正的入口 16 ApplicationResource(appId,serverConfig,registry); 17 } CurrentRequestVersion.remove(); 22 23 24 25 @Produces({"application/xml",1)">26 ApplicationResource { 27 28 29 @POST 31 @Consumes({"application/json","application/xml" Response addInstance(InstanceInfo info,1)">33 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 34 logger.debug("Registering instance {} (replication={})"35 36 registry.register(info,"true".equals(isReplication)); return Response.status(204).build(); 204 to be backwards compatible 39 } addInstance 接口有两个参数:
② 实例注册 进入到注册表的 register 方法,可以看到主要就是调用父类的 register 方法注册实例,然后同步到 eureka server 集群中的其它 server 节点。集群同步放到后面来看,现在只需要知道注册实例时会同步到其它server节点即可。 @Override void register(final InstanceInfo info,1)"> isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 如果实例中没有周期的配置,就设置为默认的 90 秒 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0 6 leaseDuration = info.getLeaseInfo().getDurationInSecs(); 9 super.register(info,leaseDuration,1)"> 复制到集群其它 server 节点 11 replicateToPeers(Action.Register,info.getAppName(),info,1)">12 } 接着看父类的注册方法,它的主要流程如下:
void register(InstanceInfo registrant,1)">int leaseDuration,1)"> read.lock(); registry => ConcurrentHashMap<String,Lease<InstanceInfo>>> 5 Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == 8 初次注册时,创建一个 ConcurrentHashMap,key 为 appName final ConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String,Lease<InstanceInfo>>10 gMap = registry.putIfAbsent(registrant.getAppName(),gNewMap); 12 gMap = gNewMap; 15 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 16 Retain the last dirty timestamp without overwriting it,if there is already a lease if (existingLease != null && (existingLease.getHolder() != 18 已存在的实例的最后更新时间 19 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); 新注册的实例的最后更新时间 21 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 22 logger.debug("Existing lease found (existing={},provided={}"24 如果存在的实例比新注册尽量的实例后更新,就直接把新注册的实例设置为已存在的实例 25 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27 " than the one that is being registered {}"28 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"29 registrant = existingLease.getHolder(); 31 } 32 新注册时,续约信息不存在 33 synchronized (lock) { 34 this.expectedNumberOfClientsSendingRenews > 035 Since the client wants to register it,increase the number of clients sending renews 36 期望续约的客户端数量 + 1 37 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 138 更新每分钟续约请求次数的阀值,这个阀值在后面很多地方都会用到 updateRenewsPerMinThreshold(); 42 logger.debug("No previous lease information found; it is new registration"43 创建新的续约 45 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration); 46 47 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 49 gMap.put(registrant.getId(),lease); 放入最近注册的队列 51 recentRegisteredQueue.add(new Pair<Long,String>( 52 System.currentTimeMillis(),1)">53 registrant.getAppName() + "(" + registrant.getId() + ")")); 覆盖状态 InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 56 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 57 + "overrides"overriddenInstanceStatusMap.containsKey(registrant.getId())) { 59 logger.info("Not found overridden id {} and hence adding it" overriddenInstanceStatusMap.put(registrant.getId(),registrant.getOverriddenStatus()); 61 63 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 64 if (overriddenStatusFromMap != 65 logger.info("Storing overridden status {} from map"66 registrant.setOverriddenStatus(overriddenStatusFromMap); 69 Set the status based on the overridden status rules 70 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant,existingLease,1)">71 仅仅是变更实例状态,不会设置为 dirty 72 registrant.setStatusWithoutDirty(overriddenInstanceStatus); 73 74 If the lease is registered with UP status,set lease service up timestamp 75 (InstanceStatus.UP.equals(registrant.getStatus())) { 76 UP 时设置 Lease 的时间戳 lease.serviceUp(); 79 设置动作是 ADDED,这个在后面会做 switch 判断 80 registrant.setActionType(ActionType.ADDED); 添加到最近变更的队列 82 recentlyChangedQueue.add( RecentlyChangedItem(lease)); 83 设置最后更新时间 registrant.setLastUpdatedTimestamp(); 85 失效缓存 invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress()); 87 logger.info("Registered instance {}/{} with status {} (replication={})"88 registrant.getAppName(),registrant.getId(),registrant.getStatus(),1)">89 } read.unlock(); 92 } 更新每分钟续约次数的阈值: 1 protected updateRenewsPerMinThreshold() { 2 每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约百分比 3 例如,一共注册了 10 个实例,那么期望续约的客户端数量为 10,间隔时间默认为 30秒,就是每个客户端应该每30秒发送一次心跳,续约百分比默认为 0.85 4 每分钟续约次数阈值 = 10 * (60.0 / 30) * 0.85 = 17,也就是说每分钟至少要接收到 17 此续约请求 5 this.numberOfRenewsPerMinThreshold = (int) (.expectedNumberOfClientsSendingRenews 6 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) 7 * serverConfig.getRenewalPercentThreshold()); 8 } 这就是注册表 registry 缓存服务实例信息的结构,可以看出 eureka 是基于内存来组织注册表的,使用的是 ConcurrentHashMap 来保证多线程并发安全。 4、Eureka Server 控制台前面已经将服务实例注册上去了,现在来看下 eureka server 的控制台页面是怎么获取这些数据的。 前面已经分析过 eureka-server 的 web.xml 中配置了欢迎页为 status.jsp ,这就是控制台的页面。 从 status.jsp 可以看出,其实就是从 EurekaServerContext 上下文获取注册表,然后读取注册表注册的服务实例,然后遍历展示到表格中。 1 <%@ page language="java" import="java.util.*,java.util.Map.Entry,com.netflix.discovery.shared.Pair,1)"> 2 com.netflix.discovery.shared.*,com.netflix.eureka.util.*,com.netflix.appinfo.InstanceInfo.* 3 com.netflix.appinfo.DataCenterInfo.*,com.netflix.appinfo.AmazonInfo.MetaDataKey,com.netflix.eureka.resources.* 4 com.netflix.eureka.*,com.netflix.appinfo.*,com.netflix.eureka.util.StatusUtil" pageEncoding="UTF-8" %> 5 <% 6 String path = request.getContextPath(); 7 String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/" 8 %> 9 10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 11 12 <html> 13 <head> 14 <base href="<%=basePath%>"> 15 16 <title>Eureka</title> 17 <link rel="stylesheet" type="text/css" href="./css/main.css"> 18 <script type="text/javascript" src="./js/jquery-1.11.1.js" ></script> 19 <script type="text/javascript" src="./js/jquery.dataTables.js" ></script> 20 <script type="text/javascript" > 21 $(document).ready(function() { 22 $('table.stripeable tr:odd').addClass('odd' 23 $('table.stripeable tr:even').addClass('even' 24 $('#instances thead th').each(function () { 25 var title = $('#instances thead th').eq($().index()).text(); 26 $(this).html(title + '</br><input type="text" placeholder="Search ' + title + '" />' 27 }); 28 DataTable 29 var table = $('#instances').DataTable({"paging": }); 30 Apply the search 31 table.columns().eq(0).each(function (colIdx) { 32 $('input',table.column(colIdx).header()).on('keyup change' 33 table.column(colIdx).search(.value).draw(); 34 }); 35 36 }); 37 </script> 38 </head> 39 40 <body id="one"> 41 <jsp:include page="header.jsp" /> 42 <jsp:include page="navbar.jsp" /> 43 <div id="content"> 44 <div class="sectionTitle">Instances currently registered with Eureka</div> 45 <table id='instances' class="stripeable"> 46 <thead><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></thead> 47 <tfoot><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></tfoot> 48 <tbody> 49 <% 50 获取 eureka server 上下文 EurekaServerContext 51 EurekaServerContext serverContext = (EurekaServerContext) pageContext.getServletContext() 52 .getAttribute(EurekaServerContext..getName()); 53 从上下文中取出注册表, 54 (Application app : serverContext.getRegistry().getSortedApplications()) { 55 out.print("<tr><td><b>" + app.getName() + "</b></td>" 56 Map<String,Integer> amiCounts = new HashMap<String,Integer> 57 Map<InstanceStatus,List<Pair<String,String>>> instancesByStatus = 58 new HashMap<InstanceStatus,String>>> 59 Map<String,Integer> zoneCounts = 60 61 (InstanceInfo info : app.getInstances()){ 62 String id = info.getId(); 63 String url = info.getStatusPageUrl(); 64 InstanceStatus status = info.getStatus(); 65 String ami = "n/a" 66 String zone = "" 67 if(info.getDataCenterInfo().getName() == Name.Amazon){ 68 AmazonInfo dcInfo = (AmazonInfo)info.getDataCenterInfo(); 69 ami = dcInfo.get(MetaDataKey.amiId); 70 zone = dcInfo.get(MetaDataKey.availabilityZone); 71 } 72 73 Integer count = amiCounts.get(ami); 74 if(count != ){ 75 amiCounts.put(ami,Integer.valueOf(count.intValue()+1 76 } 77 amiCounts.put(ami,Integer.valueOf(1 78 79 80 count = zoneCounts.get(zone); 81 82 zoneCounts.put(zone,1)"> 83 } 84 zoneCounts.put(zone,1)"> 85 86 List<Pair<String,String>> list = instancesByStatus.get(status); 87 88 if(list == 89 list = new ArrayList<Pair<String,String>> 90 instancesByStatus.put(status,list); 91 92 list.add(new Pair<String,1)">(id,url)); 93 } 94 StringBuilder buf = StringBuilder(); 95 for (Iterator<Entry<String,Integer>> iter = 96 amiCounts.entrySet().iterator(); iter.hasNext();) { 97 Entry<String,Integer> entry = iter.next(); 98 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append(")," 99 100 out.println("<td>" + buf.toString() + "</td>"101 buf = 102 103 zoneCounts.entrySet().iterator(); iter.hasNext();) { 104 Entry<String,1)">105 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("),1)">106 107 out.println("<td>" + buf.toString() + "</td>"108 buf = 109 for (Iterator<Entry<InstanceStatus,String>>>> iter = 110 instancesByStatus.entrySet().iterator(); iter.hasNext();) { 111 Entry<InstanceStatus,String>>> entry =112 List<Pair<String,String>> value = entry.getValue(); 113 InstanceStatus status = entry.getKey(); 114 if(status != InstanceStatus.UP){ 115 buf.append("<font color=red size=+1><b>"116 117 buf.append("<b>").append(status.name()).append("</b> (").append(value.size()).append(") - "118 119 buf.append("</font></b>"120 121 122 for(Pair<String,1)"> p : value) { 123 String id = p.first(); 124 String url = p.second(); 125 if(url != null && url.startsWith("http")){ 126 buf.append("<a href="").append(url).append("">"127 }128 url = 129 } 130 buf.append(id); 131 132 buf.append("</a>"133 134 buf.append(",1)">135 136 137 out.println("<td>" + buf.toString() + "</td></tr>"138 } 139 %> 140 </tbody> 141 </table> 142 </div> 143 <div> 144 <div class="sectionTitle">General Info</div> 145 <table id='generalInfo' 146 <tr><th>Name</th><th>Value</th></tr> 147 <% 148 StatusInfo statusInfo = ( StatusUtil(serverContext)).getStatusInfo(); 149 Map<String,String> genMap = statusInfo.getGeneralStats(); 150 for (Map.Entry<String,1)"> entry : genMap.entrySet()) { 151 out.print("<tr>"152 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"153 out.print("</tr>"154 155 Map<String,String> appMap = statusInfo.getApplicationStats(); 156 entry : appMap.entrySet()) { 157 out.print("<tr>"158 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"159 out.print("</tr>"160 161 %> 162 </table> 163 </div> 164 <div> 165 <div class="sectionTitle">Instance Info</div> 166 <table id='instanceInfo' 167 <tr><th>Name</th><th>Value</th></tr> 168 <% 169 InstanceInfo instanceInfo = statusInfo.getInstanceInfo(); 170 Map<String,String> instanceMap = 171 instanceMap.put("ipAddr"172 instanceMap.put("status"173 if(instanceInfo.getDataCenterInfo().getName() == DataCenterInfo.Name.Amazon) { 174 AmazonInfo info = (AmazonInfo) instanceInfo.getDataCenterInfo(); 175 instanceMap.put("availability-zone"176 instanceMap.put("public-ipv4"177 instanceMap.put("instance-id"178 instanceMap.put("public-hostname"179 instanceMap.put("ami-id"180 instanceMap.put("instance-type"181 182 entry : instanceMap.entrySet()) { 183 out.print("<tr>"184 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"185 out.print("</tr>"186 187 %> 188 </table> 189 </div> 190 191 </body> 192 </html> 5、服务注册的整体流程图下面通过一张图来看看服务实例注册的整个流程。 六、抓取注册表1、Eureka Client 启动时全量抓取注册表客户端启动初始化 DiscoveryClient 时,其中有段代码如下:这一步调用 fetchRegistry 就是在启动时全量抓取注册表缓存到本地中。 (clientConfig.shouldFetchRegistry()) { 拉取注册表:全量抓取和增量抓取 boolean primaryFetchRegistryResult = fetchRegistry(primaryFetchRegistryResult) { 6 logger.info("Initial registry fetch from primary servers failed"boolean backupFetchRegistryResult = if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { 10 backupFetchRegistryResult = 11 logger.info("Initial registry fetch from backup servers failed"13 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { 14 new IllegalStateException("Fetch registry error at startup. Initial fetch failed." (Throwable th) { 17 logger.error("Fetch registry error at startup: {}"18 IllegalStateException(th); 20 } 进入?fetchRegistry 方法,可以看到,首先获取本地的 Applications,如果为空就会调用 getAndStoreFullRegistry 方法全量抓取注册表并缓存到本地。 boolean fetchRegistry( forceFullRegistryFetch) { 2 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); 获取本地的应用实例 6 Applications applications = getApplications(); (clientConfig.shouldDisableDelta() 9 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 10 || forceFullRegistryFetch 11 || (applications == 12 || (applications.getRegisteredApplications().size() == 013 || (applications.getVersion() == -1)) Client application does not have latest library supporting delta { 全量抓取注册表 getAndStoreFullRegistry(); 增量更新注册表 getAndUpdateDelta(applications); applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); 23 } (Throwable e) { 24 logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}" appPathIdentifier,clientConfig.getRegistryFetchIntervalSeconds(),ExceptionUtils.getStackTrace(e)); 26 27 } if (tracer != tracer.stop(); 31 32 发出缓存刷新的通知 onCacheRefreshed(); 36 Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); registry was fetched successfully,so return true 40 41 } 进入?getAndStoreFullRegistry? 方法可以发现,就是调用 GET /apps 接口抓取全量注册表,因此等会服务端就从这个入口进去看抓取全量注册表的逻辑。注册表抓取回来之后,就放到本地变量?localRegionApps 中。 41 }
2、Eureka Server 注册表多级缓存机制① 全量抓取注册表的接口 全量抓取注册表的接口是 GET /apps,跟找注册接口是类似的,最终可以找到?ApplicationsResource 的?getContainers 方法就是全量抓取注册表的入口。
@GET public Response getContainers(@PathParam("version" @HeaderParam(HEADER_ACCEPT) String acceptHeader,1)"> @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,1)"> @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,1)"> @Context UriInfo uriInfo,1)"> 7 @Nullable @QueryParam("regions") String regionsStr) { 省略部分代码... JSON 类型 11 KeyType keyType = Key.KeyType.JSON; 12 String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { 14 keyType = Key.KeyType.XML; 15 returnMediaType = MediaType.APPLICATION_XML; 全量注册表的缓存key 19 Key cacheKey = Key(Key.EntityType.Application,1)"> ResponseCacheImpl.ALL_APPS,1)"> keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept),regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { 压缩返回 27 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING,HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE,returnMediaType) 31 } 根据缓存 key 从 responseCache 获取全量注册表 33 response = Response.ok(responseCache.get(cacheKey)) CurrentRequestVersion.remove(); 37 response; 38 } ②?ResponseCache 多级缓存读取 ResponseCache 就是 eureka server 读取注册表的核心组件,它的内部采用了多级缓存的机制来快速响应客户端抓取注册表的请求,下面就来看看 ResponseCache。 缓存读取的流程:
public String get( Key key) { get(key,shouldUseReadOnlyResponseCache); 4 //////////////////////////////////////////////////// 7 String get(final Key key,1)"> useReadOnlyCache) { => getValue 9 Value payload = getValue(key,useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { 11 12 } payload.getPayload(); 19 Value getValue(20 Value payload = 21 22 (useReadOnlyCache) { 开启使用只读缓存,则先从只读缓存读取 readOnlyCacheMap => ConcurrentHashMap<Key,Value> final Value currentPayload = readOnlyCacheMap.get(key); 26 if (currentPayload != 27 payload = currentPayload; 28 } 29 只读缓存中没有,则从读写缓存中读取,然后放入只读缓存中 readWriteCacheMap => LoadingCache<Key,1)">31 payload = readWriteCacheMap.get(key); 32 readOnlyCacheMap.put(key,payload); 34 } 35 未开启只读缓存,就从读写缓存中读取 36 payload =38 } 39 logger.error("Cannot get value for key : {}"41 payload; 42 } ③?ResponseCache 初始化 分析 eureka server EurekaBootStrap 启动初始化时,最后有一步去初始化 eureka server 上下文,它里面就会去初始化注册表,初始化注册表的时候就会初始化?ResponseCache,这里就来分析下这个初始化干了什么。
ResponseCacheImpl(EurekaServerConfig serverConfig,ServerCodecs serverCodecs,AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; 是否使用只读缓存,默认为 true this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); 保存注册表 7 this.registry = registry; 缓存更新间隔时间,默认30秒 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); 使用 google guava cache 构造一个读写缓存 this.readWriteCacheMap = 12 初始容量为1000 CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) 14 缓存的数据在写入多久后过期,默认180秒,也就是说 readWriteCacheMap 会定时过期 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),TimeUnit.SECONDS) 16 .removalListener(new RemovalListener<Key,Value>() { @Override 18 void onRemoval(RemovalNotification<Key,1)"> notification) { 19 Key removedKey = notification.getKey(); 20 (removedKey.hasRegions()) { 21 Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions,removedKey); } } }) 26 当key对应的元素不存在时,使用定义 CacheLoader 加载元素 27 .build(new CacheLoader<Key,1)">29 public Value load(Key key) Exception { 30 (key.hasRegions()) { 31 Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions,key); 34 获取元素 35 Value value = generatePayload(key); 36 value; }); 39 (shouldUseReadOnlyResponseCache) { 41 如果配置了使用只读缓存,就开启一个定时任务,定期将 readWriteCacheMap 的数据同步到 readOnlyCacheMap 中 42 默认间隔时间是 30 秒 timer.schedule(getCacheUpdateTask(),1)">44 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) 45 + responseCacheUpdateIntervalMs),1)"> responseCacheUpdateIntervalMs); 48 49 50 Monitors.registerObject(51 } 52 logger.warn("Cannot register the JMX monitor for the InstanceRegistry"53 54 } generatePayload 方法: private Value generatePayload(Key key) { 2 Stopwatch tracer = String payload; (key.getEntityType()) { Application: boolean isRemoteRegionRequested = key.hasRegions(); 8 获取所有应用 10 (ALL_APPS.equals(key.getName())) { 11 (isRemoteRegionRequested) { 12 tracer = serializeAllAppsWithRemoteRegionTimer.start(); 13 payload = getPayLoad(key,registry.getApplicationsFromMultipleRegions(key.getRegions())); 14 } 15 tracer = serializeAllAppsTimer.start(); 16 从注册表读取所有服务实例 17 payload =20 增量获取应用 21 (ALL_APPS_DELTA.equals(key.getName())) { 22 23 tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); 26 payload = registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); 28 } 29 tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); 32 payload =34 } 35 tracer = serializeOneApptimer.start(); 36 payload =38 39 VIP: 40 SVIP: 41 tracer = serializeViptimer.start(); 42 payload =43 44 45 logger.error("Unidentified entity type: {} found in the cache key."46 payload = ""47 Value(payload); 50 } 51 54 55 } 3、Eureka Server?注册表多级缓存过期机制这节来总结下 eureka server 注册表多级缓存的过期时机,其实前面都已经分析过了。 ① 主动过期 分析服务注册时已经说过,服务注册完成后,调用了?invalidateCache 来失效缓存,进去可以看到就是将读写缓存?readWriteCacheMap 中的服务、所有服务、增量服务的缓存失效掉。 那这里就要注意了,如果服务注册、下线、故障之类的,这里只是失效了读写缓存,然后可能要间隔30秒才能同步到只读缓存?readOnlyCacheMap,那么其它客户端可能要隔30秒后才能感知到。 invalidateCache(String appName,@Nullable String vipAddress,@Nullable String secureVipAddress) { invalidate cache 3 responseCache.invalidate(appName,vipAddress,secureVipAddress); 4 } 缓存失效: invalidate(String appName,1)"> (Key.KeyType type : Key.KeyType.values()) { (Version v : Version.values()) { invalidate( 6 失效服务的缓存 7 8 9 失效所有 APP 的缓存 失效增量 APP 的缓存 13 ); vipAddress) { 17 invalidate( Key(Key.EntityType.VIP,EurekaAccept.full)); 19 secureVipAddress) { 20 invalidate( Key(Key.EntityType.SVIP,secureVipAddress,1)"> invalidate(Key... keys) { (Key key : keys) { 28 logger.debug("Invalidating the response cache key : {} {} {} {},{}" key.getEntityType(),key.getName(),key.getVersion(),key.getType(),key.getEurekaAccept()); 30 31 失效读写缓存 readWriteCacheMap.invalidate(key); 33 Collection<Key> keysWithRegions = regionSpecificKeys.get(key); null != keysWithRegions && !keysWithRegions.isEmpty()) { (Key keysWithRegion : keysWithRegions) { 36 logger.debug("Invalidating the response cache key : {} {} {} {} {}" key.getEntityType(),1)"> readWriteCacheMap.invalidate(keysWithRegion); 42 } ② 定时过期 读写缓存 readWriteCacheMap 在构建的时候,指定了一个自动过期的时间,默认值是180秒,所以往 readWriteCacheMap 中放入一个数据过后,等180秒过后,它就自动过期了。然后下次读取的时候发现缓存中没有这个 key,就会使用 CacheLoader 重新加载到这个缓存中。 这种定时过期机制就是每隔一段时间来同步注册表与缓存的数据。 ③ 被动过期 初始化 ResponseCache 时,如果启用了只读缓存,就会创建一个定时任务(每隔30秒运行一次)来同步 readWriteCacheMap 与 readOnlyCacheMap 中的数据,对于?readOnlyCacheMap 来说这就是一种被动过期。 TimerTask getCacheUpdateTask() { TimerTask() { @Override 5 logger.debug("Updating the client cache from response cache" (Key key : readOnlyCacheMap.keySet()) { 8 logger.debug("Updating the client cache from response cache for key : {} {} {} {}" key.getEntityType(),key.getType()); 11 CurrentRequestVersion.set(key.getVersion()); 获取读写缓存中的数据 14 Value cacheValue =15 获取只读缓存中的数据 16 Value currentCacheValue =17 如果 readOnlyCacheMap 中缓存的值与 readWriteCacheMap 缓存的值不同,就用 readWriteCacheMap 的值覆盖 readOnlyCacheMap 的值 18 if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key,cacheValue); 21 } 22 logger.error("Error while updating the client cache from response cache for key {}"23 } CurrentRequestVersion.remove(); }; 29 } 4、Eureka Client 定时拉取增量注册表① 客户端注册表刷新定时任务 前面介绍 DiscoveryClient 初始化时,在初始化调度任务这一步,如果要抓取注册表,就会创建一个调度器每隔 30 秒执行一次?cacheRefreshTask,它对?CacheRefreshThread 做了封装,进去可以看到,它其实就是调用?refreshRegistry 方法刷新注册表。 抓取注册表的间隔时间,默认30秒 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 注册表刷新的定时任务 9 cacheRefreshTask = TimedSupervisorTask( 10 "cacheRefresh" scheduler,1)"> cacheRefreshExecutor,1)"> registryFetchIntervalSeconds,1)"> TimeUnit.SECONDS,1)"> expBackOffBound,1)">16 new CacheRefreshThread() 刷新注册表的任务 ); 30秒后开始调度刷新注册表的任务 scheduler.schedule( cacheRefreshTask,1)">23 } refreshRegistry 方法: class CacheRefreshThread implements Runnable { refreshRegistry(); @VisibleForTesting refreshRegistry() { 10 boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); 11 12 boolean remoteRegionsModified = This makes sure that a dynamic change to remote regions to fetch is honored. 14 String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); 15 latestRemoteRegions) { 16 String currentRemoteRegions = remoteRegionsToFetch.get(); latestRemoteRegions.equals(currentRemoteRegions)) { Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync 19 (instanceRegionChecker.getAzToRegionMapper()) { 20 (remoteRegionsToFetch.compareAndSet(currentRemoteRegions,latestRemoteRegions)) { 21 String[] remoteRegions = latestRemoteRegions.split(",1)"> remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); 24 remoteRegionsModified = 25 } 26 logger.info("Remote regions to fetch modified concurrently," + 27 " ignoring change from {} to {}"30 } 31 Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); 36 抓取注册表 boolean success = fetchRegistry(remoteRegionsModified); (success) { 39 registrySize = localRegionApps.get().size(); 40 lastSuccessfulRegistryFetchTimestamp =43 44 StringBuilder allAppsHashCodes = 45 allAppsHashCodes.append("Local region apps hashcode: " allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); 47 allAppsHashCodes.append(",is fetching remote regions? " allAppsHashCodes.append(isFetchingRemoteRegionRegistries); 49 entry : remoteRegionVsApps.entrySet()) { 50 allAppsHashCodes.append(",Remote region: "51 allAppsHashCodes.append(entry.getKey()); 52 allAppsHashCodes.append(",apps hashcode: " allAppsHashCodes.append(entry.getValue().getAppsHashCode()); 55 logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} "56 allAppsHashCodes); 57 58 } 59 logger.error("Cannot fetch registry from server"61 } refreshRegistry 里面又调用了?fetchRegistry 抓取注册表,fetchRegistry 在前面分析全量抓取注册表时已经展示过了。全量抓取注册表之后,本地 applications 不为空了,这时就会走?getAndUpdateDelta 增量更新的方法。 15 logger.info("Disable delta property : {}"16 logger.info("Single vip registry refresh property : {}"17 logger.info("Force full registry fetch : {}"18 logger.info("Application is null : {}",(applications == 19 logger.info("Registered Applications size is zero : {}"20 (applications.getRegisteredApplications().size() == 021 logger.info("Application version is -1: {}",(applications.getVersion() == -124 } 30 } 31 logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}"34 } 35 发出刷新缓存的通知 46 47 48 } ② 增量更新本地注册表 接着看?getAndUpdateDelta 增量更新方法:
可以看到,eureka 增量抓取的思路来更新本地缓存,并使用了 hash 值来保证服务端与本地的数据一致性。在分布式系统里,要进行数据同步,采用 hash 值比对的思想,这是值得学习的一个思路。 void getAndUpdateDelta(Applications applications) long currentUpdateGeneration = fetchRegistryGeneration.get(); 4 Applications delta = 调用远程接口增量抓取:GET apps/delta 6 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { 8 delta = httpResponse.getEntity(); 如果增量抓取的数据为空,就会进行一次全量抓取 if (delta == 13 logger.warn("The server does not allow the delta revision to be applied because it is not safe. " 14 + "Hence got the full registry." getAndStoreFullRegistry(); if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,currentUpdateGeneration + 119 logger.debug("Got delta update with apps hashcode {}"20 String reconcileHashCode = ""21 加锁更新本地注册表 (fetchRegistryUpdateLock.tryLock()) { 24 抓取到增量的注册表后,跟本地的注册表合并 updateDelta(delta); 26 注册表合并完成后,根据本地 applications 计算一个 hash 值 27 reconcileHashCode = getReconcileHashCode(applications); fetchRegistryUpdateLock.unlock(); 32 logger.warn("Cannot acquire update lock,aborting getAndUpdateDelta" delta 中会返回 server 端注册表的 hash 值,如果和本地计算出来的 hash 值不一样, 说明本地注册表跟 server 端注册表不一样,就会从 server 全量拉取注册表更新到本地缓存 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { 37 reconcileAndLogDifference(delta,reconcileHashCode); this makes a remoteCall 40 logger.warn("Not updating application delta as another thread is updating it already"41 logger.debug("Ignoring delta update with apps hashcode {},as another thread is updating it already"43 } ③ 增量注册表合并到本地 再来看下增量注册表合并到本地发方法?updateDelta,其实就是遍历返回来的服务实例,然后根据实例的? ActionType 分别处理,比如前面分析实例注册时 ActionType 就设置了 ADDED,后面分析实例下线时还可以看到设置了 ActionType 为 DELETED。 updateDelta(Applications delta) { int deltaCount = 0 变量增量注册的服务 (Application app : delta.getRegisteredApplications()) { 遍历实例 (InstanceInfo instance : app.getInstances()) { 7 Applications applications = 8 String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); instanceRegionChecker.isLocalRegion(instanceRegion)) { 10 Applications remoteApps = remoteRegionVsApps.get(instanceRegion); remoteApps) { 12 remoteApps = Applications(); remoteRegionVsApps.put(instanceRegion,remoteApps); 15 applications = remoteApps; 18 ++deltaCount; ADDED 新增的实例:服务注册 (ActionType.ADDED.equals(instance.getActionType())) { 21 Application existingApp = applications.getRegisteredApplications(instance.getAppName()); 22 if (existingApp == applications.addApplication(app); 25 logger.debug("Added instance {} to the existing apps in region {}" applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); MODIFIED 变更的实例:续约,信息变更 29 (ActionType.MODIFIED.equals(instance.getActionType())) { 30 Application existingApp =34 logger.debug("Modified instance {} to the existing apps "37 DELETED 移除实例:实例下线、故障 (ActionType.DELETED.equals(instance.getActionType())) { 41 Application existingApp =42 if (existingApp != 43 logger.debug("Deleted instance {} to the existing apps " existingApp.removeInstance(instance); 45 /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty,we remove the application. 48 49 (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); 56 logger.debug("The total number of instances fetched by the delta processor : {}"57 58 getApplications().setVersion(delta.getVersion()); 59 getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); 60 61 (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); 63 applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); 64 65 } 5、Eureka Server 返回增量注册表① 抓取增量注册表的入口 从前分析知道,增量抓取注册表单接口为 GET/apps/delta,可以很容易找到位于?ApplicationsResource 下的?getContainerDifferential 就是抓取增量注册表的入口。 可以看到,跟抓取注册表类似,也是先构建一个缓存的Key,然后从多级缓存 ResponseCache 中获取。这里的key是 ALL_APPS_DELTA。 1 @Path("delta" Response getContainerDifferential( 4 @PathParam("version" @HeaderParam(HEADER_ACCEPT) String acceptHeader,1)"> @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,1)"> @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,1)"> 8 @Context UriInfo uriInfo,@Nullable @QueryParam("regions"10 Key cacheKey = 增量服务:ALL_APPS_DELTA ResponseCacheImpl.ALL_APPS_DELTA,1)"> Response response; 19 response =24 从多级缓存中获取增量注册表 25 response = Response.ok(responseCache.get(cacheKey)).build(); 29 30 } 与全量抓取注册表,读取多级缓存的流程都是类似的,唯一的区别就是 Key 不同,全量抓取时是 ALL_APPS,增量抓取时 ALL_APPS_DELTA,区别就在于?readWriteCacheMap 加载数据到缓存中时走的逻辑不一样,可以再看看下面的?generatePayload 方法就知道了。 32 获取增量注册表 33 payload =35 } 36 tracer =37 payload =39 41 42 tracer =43 payload =46 logger.error("Unidentified entity type: {} found in the cache key."47 payload = ""48 52 56 } ② 增量注册表的设计 之后会调用?registry.getApplicationDeltas() 获取增量注册表,进去可以发现,增量的注册表其实就是?recentlyChangedQueue 这个最近变更队列里的数据,通过遍历?recentlyChangedQueue 生成?Applications。 在返回 apps 之前,先获取了本地所有应用,并计算了一个 hash 值,然后设置到 apps 中。这就和前一节对应起来了,抓取增量注册表时,服务端会返回一个全量注册表的 hash 值,然后客户端将增量注册表合并到本地后,再根据本地的全量注册表计算一个 hash 值,然后将两个 hash 值做对比,如果不一致,说明服务端和客户端的数据是不一致的,这时客户端就会重新向服务端全量拉取注册表到本地。 Applications getApplicationDeltas() { GET_ALL_CACHE_MISS_DELTA.increment(); 3 Applications apps = apps.setVersion(responseCache.getVersionDelta().get()); 5 Map<String,Application> applicationInstancesMap = write.lock(); 最近变更队列 recentlyChangedQueue,这就是增量的注册表 recentlyChangedQueue 只保留了最近3分钟有变化的实例,如实例上线、下线、故障剔除 10 Iterator<RecentlyChangedItem> iter = .recentlyChangedQueue.iterator(); 11 logger.debug("The number of elements in the delta queue is : {}"12 .recentlyChangedQueue.size()); while (iter.hasNext()) { 14 Lease<InstanceInfo> lease = iter.next().getLeaseInfo(); 15 InstanceInfo instanceInfo = lease.getHolder(); logger.debug( 17 "The instance id {} is found with status {} and actiontype {}" instanceInfo.getId(),instanceInfo.getStatus().name(),instanceInfo.getActionType().name()); 19 Application app = applicationInstancesMap.get(instanceInfo .getAppName()); 21 if (app == 22 app = Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(),app); apps.addApplication(app); 26 app.addInstance( InstanceInfo(decorateInstanceInfo(lease))); 28 29 获取所有应用实例 32 Applications allApps = getApplications(!disableTransparentFallback); 根据所有应用实例计算一个 hash 值,并设置到要返回的 apps 中 apps.setAppsHashCode(allApps.getReconcileHashCode()); apps; 36 } write.unlock(); 39 } 再来看看?recentlyChangedQueue 是如何设计来保存增量信息的。 再看看前面提到过的注册表初始化的构造方法,最后创建了一个每隔30秒执行一次的定时调度任务。这个任务会遍历?recentlyChangedQueue 这个队列,判断每个元素的最后更新时间是否超过了 180 秒,如果超过了,就会从队列中移除这个元素。超过 180 秒的实例变更信息,就会认为这些变更信息都已经同步到客户端了,因为客户端是每隔30秒拉取一次增量注册表的。因此客户端多次拉取增量注册表可能拉取到同样的变更信息,不过最终合并到本地都是一样的。 因此可以看出,eureka 利用?recentlyChangedQueue 这个最近变更队列保存了最近3分钟以内实例的变更信息,如新服务注册、服务下线等,然后客户端每次就是拉取这个变更队列。 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig,EurekaClientConfig clientConfig,ServerCodecs serverCodecs) { this.clientConfig = clientConfig; 最近下线的循环队列 this.recentCanceledQueue = new CircularQueue<Pair<Long,String>>(1000 最近注册的循环队列 this.recentRegisteredQueue = 最近一分钟续约的计数器 this.renewsLastMin = new MeasuredRate(1000 * 60 * 1 一个定时调度任务,定时剔除最近改变队列中过期的实例 .deltaRetentionTimer.schedule(getDeltaRetentionTask(),1)"> 调度任务延迟 30 秒开始执行 serverConfig.getDeltaRetentionTimerIntervalInMs(),1)"> 默认每隔 30 秒执行一次 serverConfig.getDeltaRetentionTimerIntervalInMs()); //////////////////////////////////////// TimerTask getDeltaRetentionTask() { 24 最近变更的队列 29 Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); 30 (it.hasNext()) { 最近更新时间超过 180 秒就认为数据已经同步到各个客户端了,就从队列中移除 32 if (it.next().getLastUpdateTime() < 33 retentionTimeInMSInDeltaQueue:delta队列数据保留时间,默认 180 秒 34 System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); 36 } 41 43 } 7、Eureka 抓取注册表总体流程图下面还是用一张图整体展示下服务抓取注册表的整理流程。 服务注册、服务下线、实例故障剔除都会将读写缓存 readWriteCacheMap 中对应的实例失效掉,然后加入到最近变更队列 recentlyChangedQueue 中,因此这三种情况下,增量抓取注册表的逻辑都是类似的。 七、服务续约在分布式系统中,服务续约机制是非常重要的,这样能让中心系统(注册中心)知道客户端还存活着。接下来就来看看服务续约的机制。 1、Eureka Client 定时发送心跳在初始化 DiscoveryClient 的调度任务时,下面这部分代码就是在创建定时发送心跳的任务,心跳每隔30秒发送一次。发送心跳的接口是?PUT /apps/{appName}/{instanceId}。 定时刷新本地缓存... 续约间隔时间,默认30秒 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); 心跳调度器的延迟时间扩大倍数,默认10 clientConfig.getHeartbeatExecutorExponentialBackOffBound(); 9 logger.info("Starting heartbeat executor: " + "renew interval is: {}" 心跳的定时任务 12 heartbeatTask = 13 "heartbeat" heartbeatExecutor,1)"> renewalIntervalInSecs,1)"> HeartbeatThread() 30秒后开始调度心跳的任务 heartbeatTask,1)"> 服务注册... 28 logger.info("Not registering with Eureka server per configuration"////////////////////////////////////////////// 33 class HeartbeatThread 35 (renew()) { 37 lastSuccessfulHeartbeatTimestamp = renew() { 45 EurekaHttpResponse<InstanceInfo>47 发送心跳的接口:PUT /apps/{appName}/{instanceId} 48 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,1)">49 logger.debug(PREFIX + "{} - Heartbeat status: {}" Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); 52 logger.info(PREFIX + "{} - Re-registering apps/{}"long timestamp = instanceInfo.setIsDirtyWithTime(); 54 服务端未找到对应的实例,就重新注册 55 register(); 56 instanceInfo.unsetIsDirty(timestamp); 59 success; 续约成功 62 Status.OK.getStatusCode(); 63 } 64 logger.error(PREFIX + "{} - was unable to send heartbeat!"65 67 } 2、Eureka Server 接收心跳续约顺着?PUT /apps/{appName}/{instanceId} 找可以发现,服务端接收注册的入口在 InstanceResource 的?renewLease 方法中,它调用了注册表单 renew 方法进行服务续约。 @PUT Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,1)"> 4 @QueryParam("overriddenstatus") String overriddenStatus,1)"> 5 @QueryParam("status") String status,1)"> 6 @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); 调用注册表的 renew 进行服务续约 boolean isSuccess = registry.renew(app.getName(),id,isFromReplicaNode); Not found in the registry,immediately ask for a register isSuccess) { 13 logger.warn("Not Found (Renew): {} - {}" Response.status(Status.NOT_FOUND).build(); Check if we need to sync based on dirty time stamp,the client instance might have changed some value if (lastDirtyTimestamp != serverConfig.shouldSyncWhenTimestampDiffers()) { 20 response = .validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp),1)"> Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() 23 && (overriddenStatus != 24 && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) 25 && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(),InstanceStatus.valueOf(overriddenStatus)); 28 } 29 response = Response.ok().build(); 31 logger.debug("Found (Renew): {} - {}; reply status={}"33 } 进去可以看到,调用了父类的 renew 方法续约,然后会判断 isReplication ,如果是复制,说明是来自 eureka-server 集群中其它节点的同步请求,就复制到其它节点。复制到其它集群这块代码在前面已经提到过了,就不再展示。 boolean renew(final String appName,1)">final String id,1)"> 调用父类(AbstractInstanceRegistry)的 renew 续约 .renew(appName,isReplication)) { 4 续约完成后同步到集群其它节点 5 replicateToPeers(Action.Heartbeat,1)">null,1)">6 7 8 9 } 接着看父类的 renew 续约方法:
boolean renew(String appName,String id,1)"> RENEW.increment(isReplication); 根据服务名从注册表取出租约信息 4 Map<String,1)"> registry.get(appName); 5 Lease<InstanceInfo> leaseToRenew = if (gMap != 根据实例ID取出实例租约信息 8 leaseToRenew = gMap.get(id); if (leaseToRenew == RENEW_NOT_FOUND.increment(isReplication); 12 logger.warn("DS: Registry: lease doesn't exist,registering resource: {} - {}"15 InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != touchASGCache(instanceInfo.getASGName()); 18 InstanceStatus overriddenInstanceStatus = .getOverriddenInstanceStatus( instanceInfo,leaseToRenew,1)">if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { 21 logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" 22 + "; re-register required" RENEW_NOT_FOUND.increment(isReplication); instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( 28 "The instance status {} is different from overridden instance status {} for instance {}. " 29 + "Hence setting the status to overridden status" overriddenInstanceStatus.name(),1)"> instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); 最近一分钟续约计数器+1 renewsLastMin.increment(); 续约 leaseToRenew.renew(); //////////////////////////////////////////////// 更新最后更新时间,在当前时间的基础上加了周期时间,默认90秒 48 lastUpdateTimestamp = System.currentTimeMillis() + duration; 49 } 八、服务下线1、Eureka Client 下线eureka client 服务关闭停止时,会触发 DiscoveryClient 的 shutdown 关闭 eureka-client,我们就从 shutdown 方法来看看 eureka-client 的下线。
@PreDestroy shutdown() { if (isShutdown.compareAndSet( 5 logger.info("Shutting down DiscoveryClient ..." 移除状态变更监听器 if (statusChangeListener != null && applicationInfoManager != applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); 停止调度任务,释放资源: instanceInfoReplicator、heartbeatExecutor、cacheRefreshExecutor scheduler、cacheRefreshTask、heartbeatTask cancelScheduledTasks(); If APPINFO was registered if (applicationInfoManager != null 19 && clientConfig.shouldRegisterWithEureka() 20 && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); 调用 eureka-server 的下线接口下线实例 unregister(); 继续释放资源 if (eurekaTransport != eurekaTransport.shutdown(); heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); 33 Monitors.unregisterObject(34 35 logger.info("Completed shut down of DiscoveryClient" unregister() { It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != 43 logger.info("Unregistering ..." 取消注册:DELETE /apps/{appName}/{instanceId} 45 EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(),1)">46 logger.info(PREFIX + "{} - deregister status: {}"47 } 48 logger.error(PREFIX + "{} - de-registration failed{}"51 } 2、Eureka Server 服务下线顺着 DELETE /apps/{appName}/{instanceId} 接口可以找到 InstanceResouce 的?cancelLease 方法就是客户端下线的入口。 进入注册的 cancel 方法,可以看到与前面的一些接口是类似的,先调用服务的 cancel 方法下线实例,然后调用 replicateToPeers 复制到集群中其它节点。然后 cancel 方法其实是调用的?internalCancel 方法。 @DELETE Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 下线实例 registry.cancel(app.getName(),1)"> 6 "true" (isSuccess) { 9 logger.debug("Found (Cancel): {} - {}"10 11 } 12 logger.info("Not Found (Cancel): {} - {}"13 15 } 16 logger.error("Error (cancel): {} - {}" Response.serverError().build(); ////////////////////////////////////////// boolean cancel( String id,1)">24 .cancel(appName,1)">26 replicateToPeers(Action.Cancel,1)">boolean cancel(String appName,1)"> internalCancel(appName,1)">37 } 再来看下 internalCancel 方法:
boolean internalCancel(String appName,1)"> CANCEL.increment(isReplication); 根据服务名称取出服务租约信息 6 Map<String,1)"> 7 Lease<InstanceInfo> leaseToCancel = 根据实例ID移除实例租约信息 10 leaseToCancel = gMap.remove(id); 将移除的实例ID加入到最近下线的队列中 13 recentCanceledQueue.add(14 InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != 16 logger.debug("Removed instance id {} from the overridden map which has value {}"if (leaseToCancel == CANCEL_NOT_FOUND.increment(isReplication); 20 logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}"22 } 下线实例,设置了实例的下线时间 evictionTimestamp 为当前时间戳 leaseToCancel.cancel(); 25 InstanceInfo instanceInfo = leaseToCancel.getHolder(); 26 String vip = 27 String svip = 设置实例 ActionType 为 DELETED instanceInfo.setActionType(ActionType.DELETED); 加入最近变更队列中 32 recentlyChangedQueue.add( RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); 34 vip = instanceInfo.getVIPAddress(); 35 svip = instanceInfo.getSecureVipAddress(); 37 invalidateCache(appName,vip,svip); 39 logger.info("Cancelled instance {}/{} (replication={})"41 } 44 45 期望续约的客户端数量 - 1 48 Since the client wants to cancel it,reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews - 150 更新每分钟续约次数的阈值 updateRenewsPerMinThreshold(); 54 55 56 } 九、服务故障服务正常停止时,会触发 DiscoveryClient 的 shutdown 方法关闭 eureka-client,并向注册中心发送下线的通知。但如果客户端宕机或非正常关机,注册中心就无法接收到客户端下线的通知了,这时注册中心就会有一个定时任务,根据心跳来判断客户端实例是否故障下线了,然后摘除故障的实例。 1、摘除实例的定时任务初始化在 EurekaBootStrap 初始化的最后几步中,调用了注册表的?openForTraffic 做一些最后的设置,其中最后一步调用了 super.postInit 方法做最后的初始化,里面就创建了定时摘除过期实例的调度任务。 1 registry.openForTraffic(applicationInfoManager,registryCount);
void openForTraffic(ApplicationInfoManager applicationInfoManager,1)"> count) { 期望的客户端每分钟的续约次数 this.expectedNumberOfClientsSendingRenews = count; 更新每分钟续约阈值 updateRenewsPerMinThreshold(); 6 logger.info("Got {} instances from neighboring DS node" 7 logger.info("Renew threshold is: {}"this.startupTime =if (count > 0this.peerInstancesTransferEmptyOnStartup = 12 DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { 15 logger.info("Priming AWS connections for all replicas.." primeAwsReplicas(applicationInfoManager); 18 logger.info("Changing status to UP" 设置实例状态为已启动 applicationInfoManager.setInstanceStatus(InstanceStatus.UP); 调用父类的后置初始化 .postInit(); 23 } postInit:
postInit() { 启动 统计最近一分钟续约次数的计数器 renewsLastMin.start(); if (evictionTaskRef.get() != evictionTaskRef.get().cancel(); 定时剔除任务 8 evictionTaskRef.set( EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(),1)"> serverConfig.getEvictionIntervalTimerInMs(),1)"> 每隔60秒执行一次 serverConfig.getEvictionIntervalTimerInMs()); 13 } 2、定时摘除过期的实例①?摘除实例的定时任务 可以看到,每次运行摘除实例的任务时,会先获取一个补偿时间,因为两次 EvictionTask 执行的间隔时间可能超过了设置的60秒,比如 GC 导致的停顿或本地时间漂移导致计时不准确等。然后就调用了 evict 方法摘除实例。 在计算时间差的场景中,这种补偿时间的思路是值得学习的,要考虑到时间差的不准确性。 class EvictionTask extends TimerTask { 2 final AtomicLong lastExecutionNanosRef = new AtomicLong(0l @Override 获取补偿时间,因为两次 EvictionTask 执行的间隔时间可能超过了设置的60秒,比如 GC 导致的停顿或本地时间漂移导致计时不准确 long compensationTimeMs = getCompensationTimeMs(); 10 logger.info("Running the evict task with compensationTime {}ms" evict(compensationTimeMs); 12 } 13 logger.error("Could not run the evict task" getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); 19 long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); 20 if (lastNanos == 0Lreturn 0L 两次任务运行的间隔时间 long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); 25 补偿时间 = 任务运行间隔时间 - 剔除任务的间隔时间(默认60秒) long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0L ? 0L : compensationTime; long getCurrentTimeNano() { for testing System.nanoTime(); 33 } ② 摘除实例 摘除实例的过程如下:
void evict( additionalLeaseMs) { 2 logger.debug("Running the evict task" 是否启用了租约过期 isLeaseExpirationEnabled()) { 6 logger.debug("DS: lease expiration is currently disabled." We collect first all expired items,to evict them in random order. For large eviction sets,1)"> if we do not that,we might wipe out whole apps before self preservation kicks in. By randomizing it,1)"> the impact should be evenly distributed across all applications. 13 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>for (Entry<String,Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { 15 Map<String,Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != leaseEntry : leaseMap.entrySet()) { 18 Lease<InstanceInfo> lease = leaseEntry.getValue(); 判断实例租约是否过期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != 21 加入到过期的集合列表中 expiredLeases.add(lease); 先获取注册表已注册的实例数量 int registrySize = () getLocalRegistrySize(); 注册表数量保留的阈值:已注册的实例数 * 续约百分比阈值(默认0.85) int registrySizeThreshold = (int) (registrySize * 剔除的数量限制,也就是说一次最多只能剔除 15% 的实例 int evictionLimit = registrySize - registrySizeThreshold; 得到最小的剔除数量 int toEvict = Math.min(expiredLeases.size(),evictionLimit); if (toEvict > 038 logger.info("Evicting {} items (expired={},evictionLimit={})"40 Random random = Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++42 根据要剔除的数量从 expiredLeases 中随机剔除 toEvict 个过期实例 43 int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases,i,next); 45 Lease<InstanceInfo> lease = expiredLeases.get(i); 46 47 String appName = lease.getHolder().getAppName(); 实例ID 49 String id = lease.getHolder().getId(); EXPIRED.increment(); 51 logger.warn("DS: Registry: expired lease for {}/{}"52 调用下线的方法 53 internalCancel(appName,1)">56 } ③ 分批次摘取实例 可以看到,过期的实例并不是一次性摘除的,而是计算了一个阈值 toEvict,一次只随机摘除?toEvict 个过期实例,采用了分批摘取+随机摘取的机制。 比如注册表一共有20个实例,那么最多可以摘除的实例数?toEvict = 20 - 20 * 0.85 = 3 个,也就是说就算有5个实例过期了,那这一次也只能随机摘除其中3个,另外两个要等到下一次执行摘除任务时再摘除。 这种分批摘取机制+随机摘取机制可能会导致有些过期实例要过很久才能下线,尤其是在开发环境这种频繁启动、停止服务的场景中。 3、如何判断实例是否过期从上面可以看到,eureka 调用了?lease.isExpired(additionalLeaseMs) 来判断实例是否过期。进入 isExpired 这个方法可以看到,如果已经设置了摘除时间,或者?当前时间 > (实例最后更新时间 + 续约周期(90秒) + 补偿时间),就认为实例已经过期了,说明实例已经超过一个周期没有续约了,就认为这个客户端实例发生了故障,无法续约,要被摘除掉。 * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than * what it should be,the expiry will actually be 2 * duration. This is a minor bug and should only affect * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage,this will * not be fixed. @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms. 10 boolean isExpired( 已经设置过剔除时间,或者 当前时间 > (实例最后更新时间 + 续约周期(90秒) + 补偿时间) return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); 14 } 这里其实要注意的是另外一个问题,可以看看?isExpired 的注释,eureka 说这其实是一个bug,但不打算修复了,因为它的?duration 其实是被加了两次的,下面来看看怎么回事。 先看下?lastUpdateTimestamp 是如何更新的,在客户端续约的时候会更新?lastUpdateTimestamp,将其设置为 当前时间 + duration 间隔周期(默认90秒), 更新最后更新时间,在当前时间的基础上加了一个周期间隔时间,默认90秒 3 lastUpdateTimestamp = System.currentTimeMillis() +4 } 这个 duration 在注册的时候也有设置,我们通过这个来看看它的含义是什么。可以看到,如果客户端没有配置 durationInSecs,就会设置为默认的 90 秒。 从 getDurationInSecs 的注释可以看出,这个 duration 的意思是等待客户端多久没有续约之后就将其剔除,默认为 90 秒。比如客户端每隔 30 秒续约一次,那可能超过3次没有续约之后,就会认为客户端实例故障了,就要将其摘除掉。 5 leaseDuration =10 replicateToPeers(Action.Register,1)"> * Returns client specified setting for eviction (e.g. how long to wait w/o * renewal event) @return time in milliseconds since epoch. 20 getDurationInSecs() { durationInSecs; 23 } 但实际上并不是90秒后摘除实例,可以看到 isExpired 里面将 lastUpdateTimestamp 又加了一个 duration,也就是 180 秒了。也就是说客户端实例超过 180 秒未续约才被认为是故障了,然后要将其摘除。 isExpired 的注释也说了,续约的方法 renew() 错误的计算了?lastUpdateTimestamp,实际的过期周期是 2 * duration,但是 eureka 并不打算修复这个bug,因为它的影响范围很小。 所以这里得出一个结论,客户端关闭了(非正常下线),注册表中的实例并不是90秒后就摘除了,至少是 180 秒后才会被摘除。 十、自我保护机制如果网段偶尔抖动或暂时不可用,就无法接收到客户端的续约,因此 eureka server 为了保证可用性,就会去判断最近一分钟收到的心跳次数是否小于指定的阈值,是的就会触发自我保护机制,关闭租约失效剔除,不再摘除实例,从而保护注册信息。 1、摘除实例时的自我保护机制摘除实例的 evict 方法调用了 isLeaseExpirationEnabled 这个方法判断是否触发自我保护机制,如果返回 false,就不会摘除实例了。 先看看 isLeaseExpirationEnabled 这个方法:
isLeaseExpirationEnabled() { 先判断是否启用了自我保护机制 isSelfPreservationModeEnabled()) { The self preservation mode is disabled,hence allowing the instances to expire. 每分钟续约阈值大于0,且 最近一分钟续约次数 大于 每分钟续约阈值 return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; isSelfPreservationModeEnabled() { serverConfig.shouldEnableSelfPreservation(); 13 } 这个每分钟续约阈值?numberOfRenewsPerMinThreshold 在前面很多地方都看到过了,服务注册、下线、openForTraffic、以及有个定时任务每隔15分钟都会调用下面这个方法来更新?numberOfRenewsPerMinThreshold。 每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约阈值 8 } expectedNumberOfClientsSendingRenews 在实例注册的时候会 + 1,在实例下线的时候会 - 1,其代表的就是期望续约的客户端数量。 /////////////// 实例注册 更新每分钟续约请求次数的阈值,这个阈值在后面很多地方都会用到 updateRenewsPerMinThreshold(); / 实例下线(下线、故障摘除) 22 } 而最近一分钟续约次数计数器 renewsLastMin 在每个客户端续约的时候就会+1,可以回看下 renew 方法,最后调用了 renewsLastMin.increment() 增加一次续约次数。而 renewsLastMin.getCount() 返回的是上一分钟总的续约次数。 getNumOfRenewsInLastMin() { renewsLastMin.getCount(); 3 } 根据以上代码举个例子来看看实例故障时的自我保护机制:
这就是 eureka-server 的自我保护机制,他认为如果短时间内有过的的实例未发送心跳(超过15%),它会认为是自己网络故障导致客户端不能发送心跳,就进入自我保护机制,避免误摘除实例。 2、自我保护机制导致实例未下线的情况在开发环境中,因为会频繁重启服务,会发现有些服务已经是下线状态了(DOWN),但服务实例一直未被摘除,就是因为 eureka-server 的自我保护机制导致的,下面来看下。 ① 启用自我保护机制的情况 首先 eureka-server 做了如下配置,启用注册中心: eureka: 2 server: # 是否启用自我保护机制 4 enable-self-preservation: true 启动几个客户端实例: 然后快速将 demo-consumer 停止掉(如果正常关闭,会调用 cancel 下线实例),这时就会看到 demo-consumer 已经DOWN了,但是实例一直未被移除。 可以看到,上一分钟续约的次数为 4 次,期望每分钟续约次数为6次,因为不满足判断的条件,所以就触发了自我保护机制,导致一直无法摘除实例。 注意期望续约的客户端数量为4,而实际注册的客户端实例是3个,这是因为 springcloud 在调用?openForTraffic 设置了初始值为 1。 ② 关闭自我保护机制的情况 配置如下,关闭自我保护机制: false
这时注册中心控制台会提示我们关闭了自我保护机制: 同样的操作,快速停掉实例,发现实例还是未被摘除: 那其实是因为实例要180秒后才会被认为是过期的,所以等3分钟以后,实例就会下线了。 3 }
③ 快速关闭多个实例 这次同时关闭2个实例来看看,在2分钟之后,发现只有一个实例下线了,这因为eureka-server一次只会摘除15%的实例。 ④ DOWN 是怎么来的 那么DOWN这个状态是怎么来的呢?由于我本地是用IDEA启动的客户端实例,其实在关闭的时候,会触发状态变更监听器,然后就会触发一次注册的调用,注册的状态是 DOWN,因此实例状态马上就变为 DOWN 了。 如果直接 kill 这个进程,就不会触发状态变更监听器了,注册中心的实例就不会变为 DOWN 了,但是实例已经下线变为不可用的状态了。 ⑤ 实例快速下线 经过前面的测试可以总结出来,要想实例快速下线,可以调整如下一些参数。 eureka-server 配置: false 5 # 每分钟续约阈值 6 renewal-percent-threshold: 0 # 摘除实例的定时任务的间隔时间 8 eviction-interval-timer-in-ms: 10000 eureka-client 配置: instance: # 判断实例多久未发送心跳就判定为过期 4 lease-expiration-duration-in-seconds: 60 3、最近一分钟计数器的设计来看下最近一分钟续约次数计数器 renewsLastMin 是如何统计上一分钟的续约次数的,renewsLastMin 的类型是 MeasuredRate,这个类的设计也是值得学习的一个点。 MeasuredRate 利用两个桶来计数,一个保存上一间隔时间的计数,一个保存当前这一间隔时间的计数,然后使用定时任务每隔一定间隔时间就将当前这个桶的计数替换到上一个桶里。然后增加计数的时候增加当前桶,获取数量的时候从上一个桶里获取,就实现了获取上一个间隔时间的计数。 MeasuredRate { static final Logger logger = LoggerFactory.getLogger(MeasuredRate. 利用了两个桶来计数,一个是上一分钟,一个是当前这一分钟 final AtomicLong lastBucket = final AtomicLong currentBucket = sampleInterval; Timer timer; volatile isActive; * sampleInterval in milliseconds 14 public MeasuredRate( sampleInterval) { 间隔时间 this.sampleInterval = 定时器 this.timer = new Timer("Eureka-MeasureRateTimer",1)">this.isActive = start() { isActive) { 25 timer.schedule(27 28 每分钟执行一次,将当前这一分钟的次数设置到上一分钟的桶里 30 lastBucket.set(currentBucket.getAndSet(031 } 32 logger.error("Cannot reset the Measured Rate" },sampleInterval,sampleInterval); 37 isActive = 40 stop() { (isActive) { timer.cancel(); 44 isActive = 45 * Returns the count in the last sample interval. 50 51 getCount() { 获取计数时是获取的上一分钟这个桶的计数 lastBucket.get(); 55 56 * Increments the count in the current sample interval. 58 59 increment() { 增加计数的时候是增加的当前这个桶的计数 currentBucket.incrementAndGet(); 63 } 4、服务故障摘除和自我保护机制图下面用一张图来总结下服务故障摘除和自我保护机制。 ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |