一.SpringCloud源码剖析-Eureka核心API
二.SpringCloud源码剖析-Eureka Client 初始化过程
三.SpringCloud源码剖析-Eureka服务注册
四.SpringCloud源码剖析-Eureka服务发现
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
什么是服务续约EureakClient会定时向EureakServer发送续约心跳(默认30s/次) ,来告诉EurekaServer自己的健康状况,默认情况下3次续约失败(90s),EurekaServer考虑剔除续约失败的客户端服务
初始化定时任务在EuerakClient中依然是通过scheduler定时实现,在com.netflix.discovery.DiscoveryClient#initScheduledTasks中进行初始化,源码如下
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { //如果开启服务注册 if (clientConfig.shouldRegisterWithEureka()) { //续约心跳 30s/次 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer //心跳任务 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() //HeartbeatThread心跳的线程 ), renewalIntervalInSecs, TimeUnit.SECONDS);//心跳时间30s/次 ...省略... /** //心跳任务,本身是一个线程对象 * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { //调用renew()方法续约 if (renew()) { //记录最后续约成功时间 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }服务续约线程HeartbeatThread
这里我们看到,通过scheduler定时执行HeartbeatThread线程,在HeartbeatThread中又调用了Discovery.renew()方法执行服务续约,源码如下
/** 使用Rest请求像Eureak进行服务续约 * Renew with the eureka service by making the appropriate REST call */ boolean renew() { //Http相应对象 EurekaHttpResponse<InstanceInfo> httpResponse; try { //使用eurekaTransport得到http客户端,发起心跳请求 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); //如果续约返回404,尝试重新发起服务注册,并设置Dirty boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
这里依然使用的是eurekaTransport得到一个EurekaHttpClient实例,然后通过EurekaHttpClientDecorator装饰器执行sendHeartBeat方法,源码如下
public abstract class EurekaHttpClientDecorator implements EurekaHttpClient { ...省略... //发送心跳 @Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName, //服务名 final String id,//服务ID final InstanceInfo info,//服务注册对象 final InstanceStatus overriddenStatus) { //服务注册状态 return execute(new RequestExecutor<InstanceInfo>() { @Override public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) { //发送心跳 return delegate.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public RequestType getRequestType() { return RequestType.SendHeartBeat; } }); }
通过EurekaHttpClientDecorator装饰器会先后会执行RetryableEurekaHttpClient(失败重试),RedirectingEurekaHttpClient(重定向到不同的EurekaServer)MetricsCollectingEurekaHttpClient(统计执行情况)针对于各种情况的Http客户端,然后 最终通过AbstractJerseyEurekaHttpClient(JerseyApplicationClient),使用jerseyClient发起心跳请求
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { @Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { //webResource是对Web请求的封装 WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) //服务状态 UP .queryParam("status", info.getStatus().toString()) //最后刷新时间 .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //发送put请求 response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
这里EureakClient会向EureakServer发送自己的服务状态,以及服务最后续约时间,使用的是PUT发起Rest请求。
总结
- DiscoveryClient.initScheduledTasks初始化续约心跳定时任务,30s/次执行HeartbeatThread线程发送续约请求
- HeartbeatThread调用DiscoveryClient.renew()续约
- renew方法中使用eurekaTransport得到EureakaHttpClient实例EurekaHttpClientDecorator 装饰器执行请求
- EurekaHttpClientDecorator 调用JerseyApplicationClient向EureakServer发送Rest请求,把服务状态和最后的续约时间当做参数
下一章推荐《Eureka Server服务续约》