日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【源码系列】Eureka源码分析

發布時間:2023/12/31 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【源码系列】Eureka源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

??對于服務注冊中心、服務提供者、服務消費者這個三個主要元素來說,服務提供者和服務消費者(即Eureka客戶端)在整個運行機制中是大部分通信行為的主動發起者(服務注冊、續約、下線等),而注冊中心主要是處理請求的接收者。所以,我們從Eureka的客戶端為入口分析它是如何完成這些主動通信的。
??一般情況下,我們將一個SpringBoot應用注冊到 Eureka Server 或者從 Eureka Server 獲取服務器列表時,就做了兩件事:

  • 在應用啟動類添加注解 @EnableDiscoveryClient
  • 在 application.properties 文件上用 eureka.client.service-url.defaultZone 參數指定注冊中心的地址
  • 我們先看看 @EnableDiscoveryClient 這個注解的源碼,如下:

    /*** Annotation to enable a DiscoveryClient implementation.* @author Spencer Gibb*/ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient {/*** If true, the ServiceRegistry will automatically register the local server.*/boolean autoRegister() default true; }

    通過注釋可以知道,該注解可以開啟 DiscoveryClient 實例,然后我們搜索 DiscoveryClient 會發現一個類和一個接口,它們的關系如圖。

    ?


    enter description here
    右邊的org.springframework.cloud.client.discovery.DiscoveryClient 是SpringCloud的接口,體現了面向接口編程的思想,定義了用來發現服務的常用抽象方法。org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是該接口的實現,是對Eureka發現服務的封裝,內部依賴了一個EurekaClient接口,所以真正實現發現服務的是com.netflix.discovery.DiscoveryClient類。
    查看類注釋的內容:

    ?

    /*** The class that is instrumental for interactions with <tt>Eureka Server</tt>.** <p>* <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the* instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with* <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from* <tt>Eureka Server</tt> during shutdown* <p>* d) <em>Querying</em> the list of services/instances registered with* <tt>Eureka Server</tt>* <p>** <p>* <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>* {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips* which do not change. All of the functions defined above fail-over to other* {@link java.net.URL}s specified in the list in the case of failure.* </p>** @author Karthik Ranganathan, Greg Kim* @author Spencer Gibb**/ @Singleton public class DiscoveryClient implements EurekaClient {... }

    這個類用于幫助與 Eureka Server 相互協作
    Eureka Client客戶端負責以下內容:

  • 向Eureka Server 注冊服務實例
  • 向 Eureka Server 服務續約
  • 服務關閉時取消租約
  • 查詢注冊在 Eureka Server 上的服務或實例列表
    Eureka Client 還需要配置一個 Eureka Server 的服務列表。
  • 哪里對Eureka Server的URL列表配置?

    根據我們配置的屬性名eureka.client.serviceUrl.defaultZone,通過serviceUrl可以找到該屬性相關的加載屬性,就是DiscoveryClient里有個getEurekaServiceUrlsFromConfig()方法但是棄用了,改用EndpointUtils這個工具類,代碼如下:

  • /**?
  • * Get the list of all eureka service urls from properties file for the eureka client to talk to.?
  • *?
  • * @param clientConfig the clientConfig to use?
  • * @param instanceZone The zone in which the client resides?
  • * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise?
  • * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order?
  • */?
  • public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {?
  • Map<String, List<String>> orderedUrls = new LinkedHashMap<>();?
  • String region = getRegion(clientConfig);?
  • String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());?
  • if (availZones == null || availZones.length == 0) {?
  • availZones = new String[1];?
  • availZones[0] = DEFAULT_ZONE;?
  • }?
  • logger.debug("The availability zone for the given region {} are {}", region, availZones);?
  • int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);?
  • ?
  • String zone = availZones[myZoneOffset];?
  • List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);?
  • if (serviceUrls != null) {?
  • orderedUrls.put(zone, serviceUrls);?
  • }?
  • int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);?
  • while (currentOffset != myZoneOffset) {?
  • zone = availZones[currentOffset];?
  • serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);?
  • if (serviceUrls != null) {?
  • orderedUrls.put(zone, serviceUrls);?
  • }?
  • if (currentOffset == (availZones.length - 1)) {?
  • currentOffset = 0;?
  • } else {?
  • currentOffset++;?
  • }?
  • }?
  • ?
  • if (orderedUrls.size() < 1) {?
  • throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");?
  • }?
  • return orderedUrls;?
  • }?
  • ?
  • Region,Zone

    getRegion()方法可以看出一個微服務應用只可以屬于一個Region,如果沒配置則為default,可以通過eureka.client.region屬性來定義。

    public static String getRegion(EurekaClientConfig clientConfig) {String region = clientConfig.getRegion();if (region == null) {region = DEFAULT_REGION;}region = region.trim().toLowerCase();return region;}

    getAvailabilityZones()方法可以看出Region與Zone的關系,一個Region可以有多個Zone,設置時可以用逗號來分隔。默認采用defaultZone。

    public String[] getAvailabilityZones(String region) {String value = (String)this.availabilityZones.get(region);if(value == null) {value = "defaultZone";}return value.split(",");}

    在獲取Region和Zone的信息后,根據傳入的參數按一定的算法確定加載位于哪一個Zone的serviceUrls。

    ?


    enter description here
    getEurekaServerServiceUrls方法是EurekaClientConfigBean的實現類,該方法用來獲取一個Zone下配置的所以serviceUrl,通過標注出來的地方可以知道,eureka.client.serviceUrl.defaultZone屬性可以配置多個,用逗號來分隔。

    ?

    ?


    enter description here
    注意: Ribbon具有區域親和特性,Ribbon的默認策略會優先訪問同客戶端處于同一個Zone中的實例。所以通過Zone屬性的定義,配置實際部署的物理結構,我們就可以有效地設計出對區域性故障的容錯集群。

    ?

    服務注冊

    前面說了多個服務注冊中心信息的加載,這里再看看 DiscoveryClient 類是如何實現服務注冊的。通過查看該類的構造函數,發現它調用了以下方法。

    /*** Initializes all scheduled tasks.*/private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}

    這里先根據配置判斷是不是要注冊到 Eureka,然后創建心跳檢測任務,獲取 instanceInfoReplicator。InstanceInfoReplicator類實現 Runnable接口,instanceInfoReplicator實例會執行一個定時任務,這個定時任務的內容可以查看該類的run()方法。

    ?


    enter description here
    這里定時刷新實例信息,discoveryClient.register()這里觸發了服務注冊,register()的內容如下:

    ?

    ?


    服務注冊的方法
    通過注釋也能看出來,這里是通過發送REST請求的方式進行的,com.netflix.appinfo.InstanceInfo就是注冊時客戶端給服務端的元數據。

    ?

    服務獲取與服務續約

    上面說到的 initScheduledTasks() 方法還有兩個定時任務,分別是服務獲取和服務續約。

    private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}

    clientConfig.shouldFetchRegistry()這里其實是通過eureka.client.fetch-registry參數來判斷的,默認為true,它可以定期更新客戶端的服務清單,從而客戶端能訪問到健康的服務實例。
    服務續約也是發送REST請求實現的。

    boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}

    服務獲取的過程省略。

    服務下線

    服務端根據實例Id和appName執行remove操作。

    void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}

    注冊中心處理

    前面的分析都是從客戶端出發的,現在看看 Eureka Server是如何處理各種Rest請求的。這種請求的定義都在com.netflix.eureka.resources包下。
    以服務注冊為例:
    調用 ApplicationResource 類下的 addInstance()方法。

    @POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);if(this.isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if(this.isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if(this.isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if(this.isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if(!this.appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();} else if(info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if(info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();} else {DataCenterInfo dataCenterInfo = info.getDataCenterInfo();if(dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();if(this.isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if(experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();}if(dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;String effectiveId = amazonInfo.get(MetaDataKey.instanceId);if(effectiveId == null) {amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}this.registry.register(info, "true".equals(isReplication));return Response.status(204).build();}}

    在對注冊信息進行校驗后,會調用org.springframework.cloud.netflix.eureka.server.InstanceRegistry的register(InstanceInfo info, int leaseDuration, boolean isReplication)方法。

    ?


    enter description here

    ?

    ?


    enter description here
    首先會把新服務注冊事件傳播出去,然后調用父類com.netflix.eureka.registry.AbstractInstanceRegistry中的實現。

    ?

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {this.read.lock();Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());...} finally {this.read.unlock();}}

    ?


    保存實例信息的雙層Map
    InstanceInfo的元數據信息保存在一個ConcurrentHashMap中,它是一個雙層的Map結構,第一層的key是服務名(即InstanceInfo的appName屬性),第二層的key是實例名(即InstanceInfo的InstanceId屬性)。
    ApplicationResource中的其他方法可以自行研究。

    ?

    轉載于:https://www.cnblogs.com/2YSP/p/11072255.html

    總結

    以上是生活随笔為你收集整理的【源码系列】Eureka源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。