怎么在SpringCloud中实现Eureka注册
这篇文章给大家介绍怎么在SpringCloud中实现Eureka注册,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
10年积累的成都网站设计、成都网站制作经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有嵊泗免费网站建设让你可以放心的选择与我们合作。
一、Eureka的服务端
eureka的服务端核心类是EurekaBootstrap,该类实现了一个ServletContextListener的监听器。因此我们可以断定eureka是基于servlet容器实现的。关键代码如下:
public class EurekaBootStrap implements ServletContextListener { //...省略相关代码 /** * Initializes Eureka, including syncing up with other Eureka peers and publishing the registry. * * @see * javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent) */ @Override public void contextInitialized(ServletContextEvent event) { try { initEurekaEnvironment(); initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } //省略相关代码..... }
我们可以看到在ServletContext初始化完成时,会初始化Eureka环境,然后初始化EurekaServerContext,那么我们在看一看initEurekaServerContext方法:
/** * init hook for server context. Override for custom logic. */ protected void initEurekaServerContext() throws Exception { // ..... ApplicationInfoManager applicationInfoManager = null; if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } //....省略部分代码 }
在这个方法里会创建许多与eureka服务相关的对象,在这里我列举了两个核心对象分别是eurekaClient与PeerAwareInstanceRegistry,关于客户端部分我们等会再说,我们现在来看看PeerAwareInstanceRegistry到底是做什么用的,这里我写贴出关于这个类的类图:
根据类图我们可以清晰的发现PeerAwareInstanceRegistry的最顶层接口为LeaseManager与LookupService,其中LookupService定义了最基本的发现示例的行为而LeaseManager定义了处理客户端注册,续约,注销等操作。那么在这篇文章我们还是重点关注一下LeaseManager的相关接口的实现。回过头来我们在看PeerAwareInstanceRegistry,其实这个类用于多个节点下复制相关信息,比如说一个节点注册续约与下线那么通过这个类将会相关复制(通知)到各个节点。我们来看看它是怎么处理客户端注册的:
/** * Registers the information about the {@link InstanceInfo} and replicates * this information to all peer eureka nodes. If this is replication event * from other replica nodes then it is not replicated. * * @param info * the {@link InstanceInfo} to be registered and replicated. * @param isReplication * true if this is a replication event from other replica nodes, * false otherwise. */ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
我们可以看到它调用了父类的register方法后又通过replicateToPeers复制对应的行为到其他节点,具体如何复制的先不在这里讨论,我们重点来看看注册方法,我们在父类里找到register()方法:
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap > gNewMap = new ConcurrentHashMap >(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease lease = new Lease (registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); //。。。省略部分代码 }
通过源代码,我们来简要梳理一下流程:
1)首先根据appName获取一些列的服务实例对象,如果为Null,则新创建一个map并把当前的注册应用程序信息添加到此Map当中,这里有一个Lease对象,这个类描述了泛型T的时间属性,比如说注册时间,服务启动时间,最后更新时间等,大家可以关注一下它的实现:
/* * Copyright 2012 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.netflix.eureka.lease; import com.netflix.eureka.registry.AbstractInstanceRegistry; /** * Describes a time-based availability of a {@link T}. Purpose is to avoid * accumulation of instances in {@link AbstractInstanceRegistry} as result of ungraceful * shutdowns that is not uncommon in AWS environments. * * If a lease elapses without renewals, it will eventually expire consequently * marking the associated {@link T} for immediate eviction - this is similar to * an explicit cancellation except that there is no communication between the * {@link T} and {@link LeaseManager}. * * @author Karthik Ranganathan, Greg Kim */ public class Lease{ enum Action { Register, Cancel, Renew }; public static final int DEFAULT_DURATION_IN_SECS = 90; private T holder; private long evictionTimestamp; private long registrationTimestamp; private long serviceUpTimestamp; // Make it volatile so that the expiration task would see this quicker private volatile long lastUpdateTimestamp; private long duration; public Lease(T r, int durationInSecs) { holder = r; registrationTimestamp = System.currentTimeMillis(); lastUpdateTimestamp = registrationTimestamp; duration = (durationInSecs * 1000); } /** * Renew the lease, use renewal duration if it was specified by the * associated {@link T} during registration, otherwise default duration is * {@link #DEFAULT_DURATION_IN_SECS}. */ public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } /** * Cancels the lease by updating the eviction time. */ public void cancel() { if (evictionTimestamp <= 0) { evictionTimestamp = System.currentTimeMillis(); } } /** * Mark the service as up. This will only take affect the first time called, * subsequent calls will be ignored. */ public void serviceUp() { if (serviceUpTimestamp == 0) { serviceUpTimestamp = System.currentTimeMillis(); } } /** * Set the leases service UP timestamp. */ public void setServiceUpTimestamp(long serviceUpTimestamp) { this.serviceUpTimestamp = serviceUpTimestamp; } /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. */ public boolean isExpired() { return isExpired(0l); } /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. * * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will * not be fixed. * * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms. */ public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } /** * Gets the milliseconds since epoch when the lease was registered. * * @return the milliseconds since epoch when the lease was registered. */ public long getRegistrationTimestamp() { return registrationTimestamp; } /** * Gets the milliseconds since epoch when the lease was last renewed. * Note that the value returned here is actually not the last lease renewal time but the renewal + duration. * * @return the milliseconds since epoch when the lease was last renewed. */ public long getLastRenewalTimestamp() { return lastUpdateTimestamp; } /** * Gets the milliseconds since epoch when the lease was evicted. * * @return the milliseconds since epoch when the lease was evicted. */ public long getEvictionTimestamp() { return evictionTimestamp; } /** * Gets the milliseconds since epoch when the service for the lease was marked as up. * * @return the milliseconds since epoch when the service for the lease was marked as up. */ public long getServiceUpTimestamp() { return serviceUpTimestamp; } /** * Returns the holder of the lease. */ public T getHolder() { return holder; } }
2)根据当前注册的ID,如果能在map中取到则做以下操作:
2.1)根据当前存在节点的触碰时间和注册节点的触碰时间比较,如果前者的时间晚于后者的时间,那么当前注册的实例就以已存在的实例为准
2.2)否则更新其每分钟期望的续约数量及其阈值
3)将当前的注册节点存到map当中,至此我们的注册过程基本告一段落了
二、eureka客户端
在服务端servletContext初始化完毕时,会创建DiscoveryClient。熟悉eureka的朋友,一定熟悉这两个属性:fetchRegistry与registerWithEureka。在springcloud中集成eureka独立模式运行时,如果这两个值不为false,那么启动会报错,为什么会报错呢?其实答案就在DiscoveryClient的构造函数中:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, ProviderbackupRegistryProvider) { //....省略部分代码 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); //....省略部分代码 initScheduledTasks(); //.... }
根据源代码,我们可以得出以下结论:
1)如果shouldRegisterWithEureka与shouldFetchRegistry都为false,那么直接return。
2)创建发送心跳与刷新缓存的线程池
3)初始化创建的定时任务
那么我们在看看initScheduledTasks()方法里有如下代码:
// Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS);
此处是触发一个定时执行的线程,以秒为单位,根据renewalIntervalInSecs值定时执行发送心跳,HeartbeatThread线程执行如下:
/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
我们可以看到run方法里很简单执行renew方法,如果成功记录一下时间。renew方法:
/** * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponsehttpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
在这里发送心跳如果返回的是404,那么会执行注册操作,注意我们根据返回值httpResponse可以断定这一切的操作都是基于http请求的,到底是不是呢?我们继续看一下register方法:
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponsehttpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
在这里又调用了eurekaTransport里registrationClient的方法:
private static final class EurekaTransport { private ClosableResolver bootstrapResolver; private TransportClientFactory transportClientFactory; private EurekaHttpClient registrationClient; private EurekaHttpClientFactory registrationClientFactory; private EurekaHttpClient queryClient; private EurekaHttpClientFactory queryClientFactory; void shutdown() { if (registrationClientFactory != null) { registrationClientFactory.shutdown(); } if (queryClientFactory != null) { queryClientFactory.shutdown(); } if (registrationClient != null) { registrationClient.shutdown(); } if (queryClient != null) { queryClient.shutdown(); } if (transportClientFactory != null) { transportClientFactory.shutdown(); } if (bootstrapResolver != null) { bootstrapResolver.shutdown(); } } }
在这里我们可以看到,eureka的客户端是使用http请求进行注册服务的,也就是说当我们创建DiscoveryClient就会向服务端进行实例的注册。
三、服务端提供的rest服务
服务端提供用于处理客户端注册请求的代码我们已经看过了,既然客户端是通过走HTTP协议进行注册的,那服务端总要有处理这个http请求的地址吧,其实eureka服务端是采用jax-rs标准提供rest方式进行暴露服务的,我们可以看一下这个类ApplicationResoure的addInstance方法:
/** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
关于怎么在SpringCloud中实现Eureka注册就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
当前标题:怎么在SpringCloud中实现Eureka注册
本文路径:http://scyanting.com/article/pcpjie.html