SpringCloud 源码系列(1)—— 注册中心 Eureka(上)
Eureka 是 Netflix?公司开源的一个服务注册与发现的组件,和其他 Netflix 公司的服务组件(例如负载均衡、熔断器、网关等)一起,被 Spring Cloud 整合为 Spring Cloud Netflix 模块。不过 Eureka 2.0 开始闭源了,但 1.x 还在继续维护中,可以继续使用。这篇文章就来深入学习下 Eureka 注册中心,便于我们更好的使用和调优注册中心。 关于版本:本文章使用的 Spring cloud 版本为?Hoxton.SR8,Spring boot 版本为?2.3.3.RELEASE,依赖的 eureka 版本则为 1.9.25。 一、Eureka 初体验Eureka 分为 Eureka Server 和 Eureka Client,Eureka Server 为 Eureka 注册中心,Eureka Client 为 Eureka 客户端。这节先通过demo把注册中心的架子搭起来,看看注册中心的基础架构。 1、Eureka Server① 创建注册中心服务:sunny-register 首先创建一个 maven 工程,服务名称为 sunny-register,并在 pom.xml 中引入注册中心服务端的依赖。 1 <dependencies> 2 dependency3 groupId>org.springframework.cloud</4 artifactId>spring-cloud-starter-netflix-eureka-server5 6 > ② 添加配置文件 在 resources 下添加 application.yml 配置文件,并添加注册中心相关配置。 1 server: 2 port: 8000 3 spring: 4 application: 5 name: sunny-register 6 7 eureka: 8 instance: 9 hostname: dev.lyyzoo.com 10 client: 11 # 是否向注册中心注册自己 12 register-with-eureka: false 13 # 是否检索服务 14 fetch-registry: 15 service-url: 16 defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/ ③ 添加启动类 添加启动类,并在启动类上加上?@EnableEurekaServer 注解,启用注册中心。 package com.lyyzoo.sunny.register; 2 import org.springframework.boot.SpringApplication; org.springframework.boot.autoconfigure.SpringBootApplication; 5 org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @EnableEurekaServer @SpringBootApplication public class RegisterApplication { 10 11 static void main(String[] args) { 12 SpringApplication.run(RegisterApplication.,args); } 14 } ④ 启动注册中心 启动注册中心后,访问 http://dev.lyyzoo.com:8000/,就可以看到注册中心的页面了,现在还没有实例注册上来。(dev.lyyzoo.com 在本地 hosts 文件中映射到 127.0.0.1) 2、Eureka Client创建两个 demo 服务,demo-producer 服务作为生产者提供一个接口,demo-consumer 服务作为消费者去调用 demo-producer 的接口。 ① 创建客户端服务:demo-producer 创建maven工程,服务名称为 demo-producer,在 pom.xml 中引入注册中心客户端的依赖,并添加了 web 的依赖。 > >spring-cloud-starter-netflix-eureka-client>org.springframework.boot>spring-boot-starter-web> ② 添加配置文件 在 resouces 下添加 application.yml 配置文件,添加注册中心客户端相关的配置。 2 port: 8010 5 name: demo-producer serviceUrl: 10 defaultZone: ${EUREKA_DEFAULT_ZONE:http:dev.lyyzoo.com:8000/eureka} ③ 添加启动类 添加启动类,并在启动类上加上 @EnableEurekaClient 注解,启用客户端。 @EnableEurekaClient 2 3 ProducerApplication { 4 6 SpringApplication.run(ProducerApplication.7 8 } ④ 添加一个 rest 接口 添加一个接口用于测试调用: @RestController 2 DemoController { 3 4 private final Logger logger = LoggerFactory.getLogger(getClass()); 5 6 @GetMapping("/v1/uuid") 7 public ResponseEntity<String> getUUID() { 8 String uuid = UUID.randomUUID().toString(); 9 logger.info("generate uuid: {}"10 return ResponseEntity.ok(uuid); 12 } ⑤??创建客户端服务:demo-consumer 类似的方式,再创建消费者服务:demo-producer,这个服务中添加一个消费者接口,通过 RestTemplate 负载均衡的方式来调用 demo-producer 的接口。 因此需要先配置一个带有负载均衡的 RestTemplate: ConsumerApplication { 4 @Bean 6 @LoadBalanced public RestTemplate restTemplate() { 8 return new RestTemplate(); 12 SpringApplication.run(ConsumerApplication.14 } 添加消费者接口,注意这里 url 是写的服务名称,并不是具体的 ip 地址或端口,在微服务场景下,服务间调用也不可能写死某个具体的地址。 @Autowired private RestTemplate restTemplate; 8 9 @GetMapping("/v1/id"10 getId() { 11 ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid",String.); 12 String uuid = result.getBody(); 13 logger.info("request id: {}"14 15 16 } ⑥ 启动注册中心客户端 以两个不同的端口启动 demo-producer,可以通过环境变量的方式制定端口。然后再启动 demo-consumer。 启动完成之后,就可以在注册中心看到注册上来的两个 demo-producer 实例和一个 demo-consumer 实例,并且状态都为 UP。 ⑦ 测试接口 调用消费者服务的接口,多次访问 http://dev.lyyzoo.com:8020/v1/id 接口,会发现生产者服务 demo-consumer 两个实例的控制台会交替的输出日志信息。这就说明消费者客户端通过服务名称访问到生产者了。 3、Eureka 基础架构通过前面的体验,可以发现,服务间调用只需知道某个服务的名称就可以调用这个服务的api了,而不需要指定具体的ip地址和端口,那这是怎么做到的呢? 不难看出,Eureka 的基础架构包含三种角色:
首先需要一个服务注册中心,客户端则向注册中心注册,将自己的信息(比如服务名、服务的 IP 地址和端口信息等)提交给注册中心。客户端向注册中心获取一份服务注册列表的信息,该列表包含了所有向注册中心注册的服务信息。获取服务注册列表信息之后,客户端服务就可以根据服务名找到服务的所有实例,然后通过负载均衡选择其中一个实例,根据其 IP 地址和端口信息,就可以调用服务的API接口了。 这就是注册中心最基础的架构和功能了,提供服务注册和发现,为各个客户端提供服务注册列表信息。但为了完成这些工作,Eureka 有很多的机制来实现以及保证其高可用,如服务注册、服务续约、获取服务注册列表、服务下线、服务剔除等等。Eureka 也提供了很多参数让我们可以根据实际的场景来优化它的一些功能和配置,比如维持心跳的时间、拉取注册表的间隔时间、自我保护机制等等。下面我们就从 eureka 的源码层面来分析下 eureka 的这些功能以及参数,理解其原理,学习它的一些设计。 二、Eureka 源码准备虽然我们在 pom.xml 中依赖的是?spring-cloud-starter-netflix-eureka-server 和?spring-cloud-starter-netflix-eureka-client,但 spring-cloud-starter-netflix 只是对 eureka 做了封装,使得其可以通过 springboot 的方式来启动和初始化,其底层其实是 netflix 的 eureka-core、eureka-client 等。所以我们先分析 netflix eureka 的源码,最后再看看?spring-cloud-starter-netflix 的源码。 1、源码环境准备① 下载源码 Netflix Eureka:https://github.com/Netflix/eureka Spring Cloud Netflix:https://github.com/spring-cloud/spring-cloud-netflix 克隆 eureka 的源码到本地: $ git clone https:github.com/Netflix/eureka.git
由于我们依赖的是 1.9.25 版本,将代码克隆到本地后,将其切换到 1.9.25: $ git checkout -b 1.9.25 然后到 eureka 根目录下执行构建的命令: $ ./gradlew clean build -x test ② IDEA 打开源码 由于 eureka 使用 gradle 管理依赖,所以本地需要先安装 gradle,之后 IDEA 中也需要安装 gradle 的插件,跟 maven 都是类似的,安装教程可自行百度。 2、Eureka 工程结构Eureka 主要包含如下模块:
三、Eureka Server 启动初始化首先要看的是 eureka-server,注册中心启起来之后,客户端才能来注册服务和发现服务。 1、eureka-server 模块① eureka-server 目录
② web.xml web.xml 的内容: <?xml version="1.0" encoding="UTF-8"?> web-app version="2.5" 3 xmlns="http://java.sun.com/xml/ns/javaee" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" 7 <!-- eureka 启动初始化类 --> 8 listener 9 listener-class>com.netflix.eureka.EurekaBootStrap10 11 12 状态过滤器 13 filter14 filter-name>statusFilter15 filter-class>com.netflix.eureka.StatusFilter16 17 18 认证过滤器 19 20 >requestAuthFilter21 >com.netflix.eureka.ServerRequestAuthFilter22 23 24 限流过滤器 25 26 >rateLimitingFilter27 >com.netflix.eureka.RateLimitingFilter28 29 30 >gzipEncodingEnforcingFilter31 >com.netflix.eureka.GzipEncodingEnforcingFilter32 33 34 jersey 容器 35 36 >jersey37 >com.sun.jersey.spi.container.servlet.ServletContainer38 init-param39 param-name>com.sun.jersey.config.property.WebPageContentRegex40 param-value>/(flex|images|js|css|jsp)/.*41 42 43 >com.sun.jersey.config.property.packages44 >com.sun.jersey;com.netflix45 46 47 GZIP content encoding/decoding 48 49 >com.sun.jersey.spi.container.ContainerRequestFilters50 >com.sun.jersey.api.container.filter.GZIPContentEncodingFilter51 52 53 >com.sun.jersey.spi.container.ContainerResponseFilters54 55 56 57 58 filter-mapping59 60 url-pattern>/*61 62 63 64 65 66 67 68 Uncomment this to enable rate limiter filter. 69 <filter-mapping> 70 <filter-name>rateLimitingFilter</filter-name> 71 <url-pattern>/v2/apps</url-pattern> 72 <url-pattern>/v2/apps/*</url-pattern> 73 </filter-mapping> 74 75 76 77 78 >/v2/apps79 >/v2/apps/*80 81 82 83 84 85 86 87 欢迎页 88 welcome-file-list89 welcome-file>jsp/status.jsp90 91 92 web-app> web.xml 中可以得知如下信息:
③ 单元测试类?EurekaClientServerRestIntegrationTest 首先看 setUp 方法,每个测试用例运行之前都会先运行 setUp 方法来初始化运行环境。 @BeforeClass void setUp() throws Exception { 3 初始化 eureka 配置 injectEurekaConfiguration(); 5 启动 eureka server,会找 build/libs 目录下的 eureka-server.*.war 包来运行 6 这一步启动时,就会加载 web.xm 配置文件,然后进入 EurekaBootStrap 初始化类 startServer(); 8 eureka server 配置 createEurekaServerConfig(); 创建 jersey 客户端,使用 jersey 客户端来调用资源 12 httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder() 13 .withClientName("testEurekaClient"14 .withConnectionTimeout(100015 .withReadTimeout(100016 .withMaxConnectionsPerHost(117 .withMaxTotalConnections(118 .withConnectionIdleTimeout(100019 .build(); 20 21 jerseyEurekaClient = httpClientFactory.newClient( DefaultEndpoint(eurekaServiceUrl)); 22 23 ServerCodecs serverCodecs = DefaultServerCodecs(eurekaServerConfig); 24 jerseyReplicationClient = JerseyReplicationClient.createReplicationClient( 25 eurekaServerConfig,26 serverCodecs,1)">27 eurekaServiceUrl 28 ); 29 } 这个类提供了如下的一些测试用例,我们可以运行这些测试用例来进行调试。 2、EurekaBootStrap 初始化EurekaBootStrap 是监听器的入口,实现了?ServletContextListener 接口,主要完成了 eureka server 的启动初始化。 从 contextInitialized 方法进去,整体上来说,分为 eureka 环境初始化和 eureka server 上下文初始化。 @Override contextInitialized(ServletContextEvent event) { try { 4 eureka 环境初始化 initEurekaEnvironment(); 6 eureka server 上下文初始化 initEurekaServerContext(); 9 ServletContext sc = event.getServletContext(); 10 sc.setAttribute(EurekaServerContext..getName(),serverContext); 11 } catch (Throwable e) { 12 logger.error("Cannot bootstrap eureka server :"13 throw new RuntimeException("Cannot bootstrap eureka server :"14 15 } ① eureka环境初始化 initEurekaEnvironment 方法内主要是设置数据中心和运行环境参数:
② eureka server 上下文初始化 initEurekaServerContext 上下文初始化则包含了很多阶段:
protected void initEurekaServerContext() 2 1、eureka 注册中心配置 3 EurekaServerConfig eurekaServerConfig = DefaultEurekaServerConfig(); For backward compatibility 6 JsonXStream.getInstance().registerConverter( V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH); 7 XmlXStream.getInstance().registerConverter( 9 logger.info("Initializing the eureka client..." logger.info(eurekaServerConfig.getJsonCodecName()); 11 ServerCodecs serverCodecs = 12 13 ApplicationInfoManager applicationInfoManager = null; 14 if (eurekaClient == ) { 16 2、eureka 实例配置 17 EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) 18 ? CloudInstanceConfig() 19 : MyDataCenterInstanceConfig(); 21 3、构造 InstanceInfo 实例信息 22 4、构造 ApplicationInfoManager 应用管理器 23 applicationInfoManager = ApplicationInfoManager( 24 instanceConfig, EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); 25 26 5、eureka 客户端配置 27 EurekaClientConfig eurekaClientConfig = DefaultEurekaClientConfig(); 28 6、构造 EurekaClient,DiscoveryClient 封装了客户端相关的操作 29 eurekaClient = DiscoveryClient(applicationInfoManager,eurekaClientConfig); 30 } else31 applicationInfoManager = eurekaClient.getApplicationInfoManager(); 32 34 PeerAwareInstanceRegistry registry; 35 if (isAws(applicationInfoManager.getInfo())) { 36 registry = AwsInstanceRegistry( 37 eurekaServerConfig,1)">38 eurekaClient.getEurekaClientConfig(),1)">39 serverCodecs,1)">40 eurekaClient 41 ); 42 awsBinder = AwsBinderDelegate(eurekaServerConfig,eurekaClient.getEurekaClientConfig(),registry,applicationInfoManager); 43 awsBinder.start(); 44 } 45 7、构造感知eureka集群的注册表 46 registry = PeerAwareInstanceRegistryImpl( 47 48 49 50 51 52 53 54 8、构造eureka-server集群信息 55 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( 56 registry,1)">57 58 eurekaClient.getEurekaClientConfig(),1)">59 60 applicationInfoManager 61 63 9、基于前面构造的对象创建 EurekaServerContext 64 serverContext = DefaultEurekaServerContext( 65 66 67 68 peerEurekaNodes,1)">71 72 将 serverContext 放到 EurekaServerContextHolder 上下文中, 73 这样其它地方都可以通过 EurekaServerContextHolder 拿到 EurekaServerContext 74 EurekaServerContextHolder.initialize(serverContext); 76 10、初始化eureka-server上下文 77 serverContext.initialize(); 78 logger.info("Initialized server context"79 80 11、从相邻的eureka-server同步注册表 81 int registryCount = registry.syncUp(); 82 /12、启动注册表,启动一些定时任务 83 registry.openForTraffic(applicationInfoManager,registryCount); 84 85 /13、注册监控统计 86 EurekaMonitors.registerAllStats(); 87 } 3、面向接口的配置读取初始化中有三个配置接口,EurekaServerConfig、EurekaInstanceConfig、EurekaClientConfig,分别对应了注册中心、eureka实例、eureka客户端的配置获取。 从它们默认实现类的构造方法进去可以看到,EurekaServerConfig 是读取的 eureka-server.properties 配置文件,命名前缀是 eureka.server;EurekaInstanceConfig、EurekaClientConfig 是读取的 eureka-client.properties 配置文件,命名前缀分别是 eureka.instance、eureka.client。 这里可以看到,eureka 在代码中获取配置的方式是通过接口方法的形式来获取的,在其默认的实现类里通过硬编码的方式定义了配置的编码以及默认值。这种基于接口的配置读取方式是可以借鉴的,这种方式读取配置更易于维护,不用维护一堆常量,如果配置编码变了只需更改实现类即可。 例如下面的配置: int getExpectedClientRenewalIntervalSeconds() { final int configured = configInstance.getIntProperty( 4 namespace + "expectedClientRenewalIntervalSeconds" 5 30).get(); return configured > 0 ? configured : 30} double getRenewalPercentThreshold() { configInstance.getDoubleProperty( 12 namespace + "renewalPercentThreshold",0.8516 boolean shouldEnableReplicatedRequestCompression() { 17 configInstance.getBooleanProperty( 18 namespace + "enableReplicatedRequestCompression",1)">false19 } 4、基于建造者模式构造服务实例看?new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get() 这段代码,在 get 方法中完成了服务实例信息的构造。它这里主要用到了建造者设计模式来构建 LeaseInfo 和 InstanceInfo,以 InstanceInfo 为例,它的内部有一个静态的?Builder 类,通过 newBuilder() 方法创建了 InstanceInfo 对象,然后可以调用 Builder 的属性设置方法来设置属性,在设置这些属性的时候,会做一些关联性的校验,在设置完成后,就调用 build() 方法返回对象,也可以在 build 方法中再做一些最终的校验。建造者模式就很适合用于构建这种复杂的对象。 synchronized InstanceInfo get() { if (instanceInfo == 3 续约信息:主要有续约间隔时间(默认30秒)和续约过期时间(默认90秒) 4 LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); 7 if (vipAddressResolver == 9 vipAddressResolver = Archaius1VipAddressResolver(); } 12 基于建造者模式来创建 InstanceInfo 13 InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver); 15 set the appropriate id for the InstanceInfo,falling back to datacenter Id if applicable,else hostname 16 String instanceId = config.getInstanceId(); 17 if (instanceId == null || instanceId.isEmpty()) { 18 DataCenterInfo dataCenterInfo = config.getDataCenterInfo(); 19 if (dataCenterInfo instanceof UniqueIdentifier) { 20 instanceId = ((UniqueIdentifier) dataCenterInfo).getId(); 21 } 22 instanceId = config.getHostName(23 } 24 String defaultAddress; 27 if (config RefreshableInstanceConfig) { 28 Refresh AWS data center info,and return up to date address 29 defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(30 } 31 defaultAddress = config.getHostName(34 fail safe 35 if (defaultAddress == defaultAddress.isEmpty()) { 36 defaultAddress = config.getIpAddress(); 38 39 设置属性 builder.setNamespace(config.getNamespace()) .setInstanceId(instanceId) 42 .setAppName(config.getAppname()) .setAppGroupName(config.getAppGroupName()) 44 .setDataCenterInfo(config.getDataCenterInfo()) 45 .setIPAddr(config.getIpAddress()) 46 .setHostName(defaultAddress) .setPort(config.getNonSecurePort()) .enablePort(PortType.UNSECURE,config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(PortType.SECURE,config.getSecurePortEnabled()) .setVIPAddress(config.getVirtualHostName()) .setSecureVIPAddress(config.getSecureVirtualHostName()) 53 .setHomePageUrl(config.getHomePageUrlPath(),config.getHomePageUrl()) 54 .setStatusPageUrl(config.getStatusPageUrlPath(),config.getStatusPageUrl()) 55 .setASGName(config.getASGName()) .setHealthCheckUrls(config.getHealthCheckUrlPath(),1)"> config.getHealthCheckUrl(),config.getSecureHealthCheckUrl()); 58 59 60 Start off with the STARTING state to avoid traffic 61 if (!config.isInstanceEnabledOnit()) { 62 InstanceStatus initialStatus = InstanceStatus.STARTING; 63 LOG.info("Setting initial instance status as: {}"64 builder.setStatus(initialStatus); 65 } 66 LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise " 67 + "itself as available. You would instead want to control this via a healthcheck handler." InstanceStatus.UP); 70 71 Add any user-specific metadata information 72 for (Map.Entry<String,String> mapEntry : config.getMetadataMap().entrySet()) { 73 String key = mapEntry.getKey(); 74 String value = mapEntry.getValue(); 75 only add the metadata if the value is present 76 if (value != null && !value.isEmpty()) { builder.add(key,value); 78 79 80 81 调用 build 方法做属性校验并创建 InstanceInfo 实例 82 instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); 84 instanceInfo; 86 } LeaseInfo 就是续约信息,可以看到主要的两个配置就是续约间隔时间和多久未续约认为实例过期,实例过期就会被剔除。然后就是基于 config 设置 InstanceInfo,就是实例信息,包含了实例ID、主机名称、端口、LeaseInfo 等等。 5、注册中心构造客户端?DiscoveryClient在集群模式下,eureka server 也会作为客户端注册到其它注册中心,此时,它本身就是一个 eureka client。因此会去构建?EurekaClient,其默认实现类是?DiscoveryClient。DiscoveryClient 包含了 eureka 客户端的大部分核心功能,比如服务注册、续约、维持心跳、拉取注册表等。 一步步进入到DiscoveryClient最复杂的那个构造方法,我们先整体分析下做了哪些事情,抓大放小,很多组件的细节等后面分析具体功能的时候再来看。
1 DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config,AbstractDiscoveryClientOptionalArgs args,1)"> 2 Provider<BackupRegistry> backupRegistryProvider,EndpointRandomizer endpointRandomizer) { 3 if (args != 4 this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; 5 this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; 6 this.eventListeners.addAll(args.getEventListeners()); 7 this.preRegistrationHandler = args.preRegistrationHandler; 8 } 9 this.healthCheckCallbackProvider = 10 this.healthCheckHandlerProvider = 11 this.preRegistrationHandler = 12 13 14 将实例信息、配置信息保存到本地 15 this.applicationInfoManager = applicationInfoManager; 16 InstanceInfo myInfo = applicationInfoManager.getInfo(); 17 clientConfig = config; 18 staticClientConfig = clientConfig; 19 transportConfig = config.getTransportConfig(); 20 instanceInfo = myInfo; 21 if (myInfo != 22 appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); 23 } 24 logger.warn("Setting instanceInfo to a passed in null value" 25 26 27 this.backupRegistryProvider = backupRegistryProvider; 28 this.endpointRandomizer = endpointRandomizer; 29 this.urlRandomizer = EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); 30 localRegionApps.set( Applications()); 31 32 33 fetchRegistryGeneration = new AtomicLong(0 34 remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); 35 从远程拉取注册表的地址数组,使用的原子类,在运行中可能会动态更新地址 36 remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); 37 38 如果要获取注册表,就会注册状态监视器 39 (config.shouldFetchRegistry()) { 40 this.registryStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRY_PREFIX + "lastUpdateSec_",1)">new long[]{15L,30L,60L,120L,240L,480L}); 41 } 42 this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; 43 44 45 如果要注册到 eureka-server,就会创建心跳状态监视器 46 (config.shouldRegisterWithEureka()) { 47 this.heartbeatStalenessMonitor = 48 } 49 this.heartbeatStalenessMonitor = 50 51 52 logger.info("Initializing Eureka in region {}" 53 54 如果不注册到注册中心,且不拉取注册表,就不创建调度器、线程池等资源了 55 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { 56 logger.info("Client configured to neither register nor query for data." 57 scheduler = 58 heartbeatExecutor = 59 cacheRefreshExecutor = 60 eurekaTransport = 61 instanceRegionChecker = new InstanceRegionChecker( PropertyBasedAzToRegionMapper(config),1)"> 62 63 This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() 64 to work with DI'd DiscoveryClient 65 DiscoveryManager.getInstance().setDiscoveryClient( 66 DiscoveryManager.getInstance().setEurekaClientConfig(config); 67 68 initTimestampMs = System.currentTimeMillis(); 69 initRegistrySize = .getApplications().size(); 70 registrySize = initRegistrySize; 71 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}" 72 initTimestampMs,initRegistrySize); 73 74 return; no need to setup up an network tasks and we are done 75 76 77 78 创建定时调度器,默认有2个核心线程,主要处理心跳任务和缓存刷新任务 79 scheduler = Executors.newScheduledThreadPool(2 80 ThreadFactoryBuilder() 81 .setNameFormat("DiscoveryClient-%d" 82 .setDaemon(true 83 .build()); 84 85 维持心跳的线程池,一个核心线程,最大线程数默认5。 86 注意其使用的队列是 SynchronousQueue 队列,这个队列只能放一个任务,一个线程将任务取走后,才能放入下一个任务,否则只能阻塞。 87 heartbeatExecutor = ThreadPoolExecutor( 88 1,clientConfig.getHeartbeatExecutorThreadPoolSize(),0 89 new SynchronousQueue<Runnable>(),1)"> 90 91 .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d" 92 .setDaemon( 93 .build() 94 ); use direct handoff 95 96 刷新缓存的线程池,一个核心线程,最大线程数据默认为5 97 cacheRefreshExecutor = 98 1,clientConfig.getCacheRefreshExecutorThreadPoolSize(),1)"> 99 100 101 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d"102 .setDaemon(103 104 ); 105 106 eureka http 调用客户端,支持 eureka client 与 eureka server 之间的通信 107 eurekaTransport = EurekaTransport(); 108 初始化 eurekaTransport 109 scheduleServerEndpointTask(eurekaTransport,1)">110 111 AzToRegionMapper azToRegionMapper; 112 (clientConfig.shouldUseDnsForFetchingServiceUrls()) { 113 azToRegionMapper = DNSBasedAzToRegionMapper(clientConfig); 114 } 115 azToRegionMapper = PropertyBasedAzToRegionMapper(clientConfig); 116 117 if (null != remoteRegionsToFetch.get()) { 118 azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",1)">119 120 instanceRegionChecker = InstanceRegionChecker(azToRegionMapper,1)">121 } 122 new RuntimeException("Failed to initialize DiscoveryClient!"123 124 125 (clientConfig.shouldFetchRegistry()) { 126 127 拉取注册表:全量抓取和增量抓取 128 boolean primaryFetchRegistryResult = fetchRegistry(129 primaryFetchRegistryResult) { 130 logger.info("Initial registry fetch from primary servers failed"131 132 boolean backupFetchRegistryResult = 133 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { 134 backupFetchRegistryResult = 135 logger.info("Initial registry fetch from backup servers failed"136 137 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { 138 new IllegalStateException("Fetch registry error at startup. Initial fetch failed."139 140 } (Throwable th) { 141 logger.error("Fetch registry error at startup: {}"142 IllegalStateException(th); 143 144 145 146 call and execute the pre registration handler before all background tasks (inc registration) is started 147 this.preRegistrationHandler != 148 .preRegistrationHandler.beforeRegistration(); 149 150 151 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { 152 153 register() ) { 154 new IllegalStateException("Registration error at startup. Invalid server response."155 156 } 157 logger.error("Registration error at startup: {}"158 159 160 161 162 初始化一些调度任务:刷新缓存的调度任务、发送心跳的调度任务、实例副本传播器 163 initScheduledTasks(); 164 165 166 Monitors.registerObject(167 } 168 logger.warn("Cannot register timers"169 170 171 172 173 DiscoveryManager.getInstance().setDiscoveryClient(174 DiscoveryManager.getInstance().setEurekaClientConfig(config); 175 176 初始化的时间 177 initTimestampMs =178 initRegistrySize = 179 registrySize =180 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}"181 initTimestampMs,1)">182 183 184 //////////////////////////////////////////////////////////////////// 185 186 initScheduledTasks() { 187 188 抓取注册表的间隔时间,默认30秒 189 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 190 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间 191 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度 192 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 193 注册表刷新的定时任务 194 cacheRefreshTask = TimedSupervisorTask( 195 "cacheRefresh"196 scheduler,1)">197 cacheRefreshExecutor,1)">198 registryFetchIntervalSeconds,1)">199 TimeUnit.SECONDS,1)">200 expBackOffBound,1)">201 new CacheRefreshThread() 刷新注册表的任务 202 203 30秒后开始调度刷新注册表的任务 204 scheduler.schedule( 205 cacheRefreshTask,1)">206 207 208 209 (clientConfig.shouldRegisterWithEureka()) { 210 续约间隔时间,默认30秒 211 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); 212 心跳调度器的延迟时间扩大倍数,默认10 213 clientConfig.getHeartbeatExecutorExponentialBackOffBound(); 214 logger.info("Starting heartbeat executor: " + "renew interval is: {}"215 216 心跳的定时任务 217 heartbeatTask = 218 "heartbeat"219 220 heartbeatExecutor,1)">221 renewalIntervalInSecs,1)">222 223 224 HeartbeatThread() 225 226 30秒后开始调度心跳的任务 227 228 heartbeatTask,1)">229 230 231 实例副本传播器,用于定时更新自己状态 232 instanceInfoReplicator = InstanceInfoReplicator( 233 234 instanceInfo,1)">235 clientConfig.getInstanceInfoReplicationIntervalSeconds(),1)">236 2); burstSize 237 238 实例状态变更的监听器 239 statusChangeListener = ApplicationInfoManager.StatusChangeListener() { 240 @Override 241 String getId() { 242 return "statusChangeListener"243 244 245 246 notify(StatusChangeEvent statusChangeEvent) { 247 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { 248 logger.error("Saw local status change event {}"249 } 250 logger.info("Saw local status change event {}"251 } 252 instanceInfoReplicator.onDemandUpdate(); 253 254 }; 255 256 向 ApplicationInfoManager 注册监听器 257 (clientConfig.shouldOnDemandUpdateStatusChange()) { 258 applicationInfoManager.registerStatusChangeListener(statusChangeListener); 259 260 261 启动副本传播器,默认延迟时间40秒 262 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); 263 } 264 logger.info("Not registering with Eureka server per configuration"265 266 } 6、定时任务监管器的设计可以看到,eureka client 为了定时发送心跳以及定时抓取注册表,使用了定时任务和调度器,我觉得它这里的定时调度的设计思想是可以参考和借鉴的。 以心跳任务的这段代码为例: 8 cacheRefreshTask = 9 "cacheRefresh" scheduler,1)"> cacheRefreshExecutor,1)">12 registryFetchIntervalSeconds,1)"> TimeUnit.SECONDS,1)"> expBackOffBound,1)">15 18 scheduler.schedule( cacheRefreshTask,1)">20 21 } 上面这段代码其实并不复杂,主要就是创建了一个定时任务,然后使用调度器在一定的延迟之后开始调度。但它这里并不是直接使用调度器调度任务(CacheRefreshThread),也不是以一个固定的频率调度(每隔30秒)。它定义了一个任务的监管器 TimedSupervisorTask,在创建这个监管器的时候,传入了调度器、要执行的任务、以及间隔时间等参数,然后调度器调度?TimedSupervisorTask。 看?TimedSupervisorTask 的构造方法,主要有以下几个点:
TimedSupervisorTask(String name,ScheduledExecutorService scheduler,ThreadPoolExecutor executor,1)"> 2 int timeout,TimeUnit timeUnit,1)"> expBackOffBound,Runnable task) { this.name = name; this.scheduler = scheduler; this.executor = executor; 任务超时时间就等于任务调度的间隔时间 this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; 延迟时间默认为超时时间 this.delay = AtomicLong(timeoutMillis); 最大延迟时间,默认在超时时间的基础上扩大10倍 12 this.maxDelay = timeoutMillis * expBackOffBound; 13 初始化计数器并注册 15 successCounter = Monitors.newCounter("success"16 timeoutCounter = Monitors.newCounter("timeouts"17 rejectedCounter = Monitors.newCounter("rejectedExecutions"18 throwableCounter = Monitors.newCounter("throwables"19 threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); 20 Monitors.registerObject(name,1)">21 } 再看?TimedSupervisorTask 的 run 方法:
run() { 2 Future<?> future = 提交任务到线程池 5 future = executor.submit(task); 6 threadPoolLevelGauge.set((long) executor.getActiveCount()); 7 阻塞直到任务完成或超时 future.get(timeoutMillis,TimeUnit.MILLISECONDS); 9 任务完成后,重置延迟时间为超时时间,即30秒 delay.set(timeoutMillis); 11 threadPoolLevelGauge.set(( 成功次数+1 successCounter.increment(); 14 } (TimeoutException e) { 15 logger.warn("task supervisor timed out" 超时次数+1 17 timeoutCounter.increment(); 18 19 如果任务超时了,就会增大延迟时间,当前延迟时间*2,然后取一个最大值 20 long currentDelay = delay.get(); long newDelay = Math.min(maxDelay,currentDelay * 2 设置为最大的一个延迟时间 delay.compareAndSet(currentDelay,newDelay); 24 25 } (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { 27 logger.warn("task supervisor shutting down,reject the task"28 } 29 logger.warn("task supervisor rejected the task"30 31 rejectedCounter.increment(); 33 } 35 logger.warn("task supervisor shutting down,can't accept the task"36 } 37 logger.warn("task supervisor threw an exception"39 throwableCounter.increment(); 41 } finally42 if (future != 43 future.cancel(45 46 scheduler.isShutdown()) { 47 延迟 delay 时间后,继续调度任务 48 scheduler.schedule(51 } 总结一下这块设计:
7、构造注册表接着构造?PeerAwareInstanceRegistry,从命名来看,这是一个可以感知 eureka 集群的注册表,就是在集群模式下,eureka server 从其它 server 节点拉取注册表。它的默认实现类是?PeerAwareInstanceRegistryImpl,继承自 AbstractInstanceRegistry,就是实例注册表。 ① 构造?PeerAwareInstanceRegistry 进入 PeerAwareInstanceRegistryImpl 的构造方法:
EurekaServerConfig serverConfig,1)"> EurekaClientConfig clientConfig,1)"> ServerCodecs serverCodecs,1)"> EurekaClient eurekaClient super(serverConfig,clientConfig,serverCodecs); this.eurekaClient = eurekaClient; 最近一分钟集群同步的次数计数器 this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1 We first check if the instance is STARTING or DOWN,then we check explicit overrides,1)"> then we check the status of a potentially existing lease. 13 this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule( DownOrStartingRule(),1)">14 new OverrideExistsRule(overriddenInstanceStatusMap),1)"> LeaseExistsRule()); 16 /////////////////////////////////////////////// 19 21 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig,EurekaClientConfig clientConfig,ServerCodecs serverCodecs) { 22 this.serverConfig = serverConfig; 23 this.clientConfig =24 this.serverCodecs = serverCodecs; 25 最近下线的循环队列 this.recentCanceledQueue = new CircularQueue<Pair<Long,String>>(1000 最近注册的循环队列 28 this.recentRegisteredQueue = 29 最近一分钟续约的计数器 this.renewsLastMin = 32 一个定时调度任务,定时剔除最近改变队列中过期的实例 33 .deltaRetentionTimer.schedule(getDeltaRetentionTask(),1)"> serverConfig.getDeltaRetentionTimerIntervalInMs(),1)">35 serverConfig.getDeltaRetentionTimerIntervalInMs()); 36 } 这块的具体细节等后面分析具体功能的时候再来看,我们先知道有这些队列、计数器就行了。 ② 循环队列?CircularQueue 的设计 从构造方法可以看到,它使用了循环队列来保存最近下线和最近注册的实例信息,容量固定为1000,这样就把最近的实例数量控制在1000以内。 CircularQueue 是它自定义的一个循环队列,继承自 AbstractQueue。其内部其实就是代理了?ArrayBlockingQueue,然后重写了入队的 offer 方法,当队列满了,就取出头部的一个元素,然后再放到队列尾部。 class CircularQueue<E> extends AbstractQueue<E>final ArrayBlockingQueue<E> delegate; capacity; public CircularQueue( capacity) { this.capacity =this.delegate = new ArrayBlockingQueue<>(capacity); 9 @Override public Iterator<E> iterator() { delegate.iterator(); 16 size() { delegate.size(); offer(E e) { 如果队列满了,就取出头部的一个元素,然后再放到尾部 23 while (!delegate.offer(e)) { delegate.poll(); 28 29 E poll() { 31 delegate.poll(); E peek() { 36 delegate.peek(); 38 } 8、创建 Eureka Server 上下文并初始化接下来先是创建了?PeerEurekaNodes,应该就是代表 eureka 集群的。然后基于前面创建的一些东西创建 eureka server 上下文?EurekaServerContext,从 DefaultEurekaServerContext 构造方法进去可以看到,只是将前面构造的东西封装起来,便于全局使用。然后将?serverContext 放到?EurekaServerContextHolder 中,这样其它地方就可以通过这个 holder 获取?serverContext 了。 接着就是初始化eureka server上下文:
initialize() { 2 logger.info("Initializing ..." 启动eureka集群 peerEurekaNodes.start(); 注册表初始化 registry.init(peerEurekaNodes); 8 } (Exception e) { RuntimeException(e); 11 logger.info("Initialized"12 } PeerEurekaNodes 的 start 方法: start() { 单个线程的线程池 3 taskExecutor = Executors.newSingleThreadScheduledExecutor( 4 ThreadFactory() { @Override 6 Thread newThread(Runnable r) { 7 Thread thread = new Thread(r,"Eureka-PeerNodesUpdater" 8 thread.setDaemon( 9 thread; 根据集群地址更新 PeerEurekaNode,PeerEurekaNode 就包含了调度其它注册中心的客户端 updatePeerEurekaNodes(resolvePeerUrls()); 16 Runnable peersUpdateTask = Runnable() { 18 19 updatePeerEurekaNodes(resolvePeerUrls()); 21 } 22 logger.error("Cannot update the replica Nodes" 定时跟新集群信息 PeerEurekaNode,如果有eureka-server不可用了,就可以及时下线,或者新上线了eureka-server,可以及时感知到 taskExecutor.scheduleWithFixedDelay( peersUpdateTask,1)"> serverConfig.getPeerEurekaNodesUpdateIntervalMs(),1)">31 TimeUnit.MILLISECONDS 33 34 } IllegalStateException(e); 36 for (PeerEurekaNode node : peerEurekaNodes) { 38 logger.info("Replica node URL: {}"40 } PeerAwareInstanceRegistryImpl 的 init 方法: void init(PeerEurekaNodes peerEurekaNodes) 启动计数器 .numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; 初始化响应缓存,eureka server 构造了一个多级缓存来响应客户端抓取注册表的请求 initializedResponseCache(); 定时调度任务更新续约阀值,主要就是更新 numberOfRenewsPerMinThreshold 这个值,即每分钟续约次数 scheduleRenewalThresholdUpdateTask(); 初始化 RemoteRegionRegistry initRemoteRegionRegistry(); 13 Monitors.registerObject(15 logger.warn("Cannot register the JMX monitor for the InstanceRegistry :"17 } 9、完成 Eureka Server 初始化接下来看最后几步:
syncUp 方法: syncUp() { Copy entire entry from neighboring DS node int count = 0 注册表同步重试次数,默认5次 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++if (i > 0 8 9 同步重试时间,默认30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); 11 } (InterruptedException e) { 12 logger.warn("Interrupted during registry transfer.."13 break16 Applications apps = eurekaClient.getApplications(); (Application app : apps.getRegisteredApplications()) { (InstanceInfo instance : app.getInstances()) { 20 (isRegisterable(instance)) { 21 注册实例 22 register(instance,instance.getLeaseInfo().getDurationInSecs(),1)">23 count++ } 25 } (Throwable t) { 26 logger.error("During DS init copy" count; 32 } openForTraffic 方法: void openForTraffic(ApplicationInfoManager applicationInfoManager,1)"> count) { 期望的客户端每分钟的续约次数 this.expectedNumberOfClientsSendingRenews = 更新每分钟续约阀值 updateRenewsPerMinThreshold(); 7 logger.info("Got {} instances from neighboring DS node" 8 logger.info("Renew threshold is: {}"this.startupTime =if (count > 011 this.peerInstancesTransferEmptyOnStartup = 13 DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { 16 logger.info("Priming AWS connections for all replicas.." primeAwsReplicas(applicationInfoManager); 19 logger.info("Changing status to UP" 设置实例状态为已启动 applicationInfoManager.setInstanceStatus(InstanceStatus.UP); .postInit(); //////////////////////////////////////26 postInit() { 启动 统计最近一分钟续约次数的计数器 renewsLastMin.start(); if (evictionTaskRef.get() != evictionTaskRef.get().cancel(); 定时剔除任务 34 evictionTaskRef.set( EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(),1)"> serverConfig.getEvictionIntervalTimerInMs(),1)"> serverConfig.getEvictionIntervalTimerInMs()); 38 } 10、Eureka Server 启动流程图下面通过一张图来展示下 eureka server 的启动初始化流程。 四、Eureka Client 启动初始化eureka client 的启动初始化我们看 eureka-examples 模块下的?ExampleEurekaClient 这个类,它的 main 方法中就模拟了作为一个 eureka client 启动初始化,并向注册中心发送请求。 eureka server 的初始化中其实已经包含了客户端的初始化,可以看出,客户端的初始化主要有如下的一些东西:
2 ExampleEurekaClient sampleClient = ExampleEurekaClient(); 基于实例配置和实例信息创建应用实例管理器 5 ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager( MyDataCenterInstanceConfig()); 基于应用实例管理器和客户端配置创建 EurekaClient(DiscoveryClient) 7 EurekaClient client = initializeEurekaClient(applicationInfoManager,1)"> DefaultEurekaClientConfig()); use the client sampleClient.sendRequestToServiceUsingEureka(client); shutdown the client eurekaClient.shutdown(); 14 } ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |