日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Ribbon源码解析(一)

發布時間:2024/4/13 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Ribbon源码解析(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

介紹

引入

關鍵組件

Modules模塊

Ribbon和Spring-Cloud-Loadbalancer

Core

IClient

RetryHandler

DefaultLoadBalancerRetryHandler

RequestSpecificRetryHandler

VipAddressResolver

SimpleVipAddressResolver

ribbon-loadbalancer

Server

IPing

AbstractLoadBalancerPing

DummyPing

PingUrl

Ping#isAlive()方法何時調用?有何用?

IPingStrategy

ServerList

AbstractServerList

?ConfigurationBasedServerList

StaticServerList

LoadBalancerStats

LoadBalancerStats#getAvailableZones

AbstractServerPredicate

PredicateKey

成員屬性

?子類實現:

ZoneAffinityPredicate

ZoneAvoidancePredicate

AvailabilityPredicate

CompositePredicate


介紹

Netflix Ribbon是Netflix OSS的一部分,它是一個基于HTTP和TCP客戶端負載均衡器。

Ribbon是一個客戶端負載均衡器,客戶端得到可用的服務器列表然后按照特定的負載均衡策略,分發請求到不同的 服務器 。

  • 客戶端需要知道服務器端的服務列表(可能通過配置、可能讓其自己去注冊中心拉取),需要自行決定請求要發送的目標地址。

  • 客戶端維護負載均衡服務器,控制負載均衡策略和算法。

引入

<dependency><groupId>com.netflix.ribbon</groupId><artifactId>ribbon-core</artifactId><version>2.7.17</version> </dependency>

Ribbon自2.4.0版本起就不強依賴于Archaius這個庫,但是Spring Cloud哪怕到了2020-03-15最新的Hoxton.SR3版本(對應的spring-cloud-starter-netflix-ribbon的2.2.2.RELEASE版本),它依賴Ribbon的依舊是2.3.0(2018.4發布)

關鍵組件

  • ServerList:可以響應客戶端的特定服務的服務器列表
  • ServerListFilter:可以動態獲得的具有所需特征的候選服務器列表的過濾器
  • ServerListUpdater:用于執行動態服務器列表更新
  • IRule:負載均衡策略,用于確定從服務器列表返回哪個服務器
  • IPing:客戶端用于快速檢查服務器當時是否處于活動狀態(心跳檢測)
  • ILoadBalancer:負載均衡器,負責負載均衡調度的管理
    ?

Modules模塊

  • ribbon-core:客戶端配置api和其他共享api。
  • ribbon-loadbalancer:可以獨立使用或與其他模塊一起使用的負載均衡器api
  • ribbon:集成了負載平衡、容錯、緩存/批處理等功能的api。基本不使用了
  • ribbon-eureka:使用Eureka客戶端為云提供動態服務器列表的api(和自己家的eureka天然整合)
  • ribbon-httpclient:REST客戶端構建在Apache HttpClient之上,與負載平衡器集成(不支持并被ribbon模塊取代)?;静皇褂昧?/li>
  • ribbon-transport:使用具有負載平衡功能的RxNetty傳輸支持HTTP、TCP和UDP協議的客戶端?;静皇褂昧?/li>

Ribbon和Spring-Cloud-Loadbalancer

Spring Cloud提供的對負載均衡的支持功能位于spring-cloud-commons這個工程里。

Spring Cloud自己的負載均衡接口是:ReactiveLoadBalancer,總體來說成熟度還不夠,離大規模商用部署仍有一段距離。
而至于抽象的通用接口org.springframework.cloud.client.loadbalancer.LoadBalancerClient,它目前的唯一實現仍然僅有RibbonLoadBalancerClient,也就是基于Ribbon的實現。
?

Core

core核心包里并沒有任何loadbalance負載均衡的概念,并且也沒有任何http的概念在里面,所以說core是一個高度抽象的包:和lb無關,和協議亦無關。

IClient

Ribbon要想負載均衡,那必然需要有發送請求的能力,而該接口就是它最最最最為核心接口嘍,其它的一切組件均圍繞它來設計和打造,包括LB。

該接口表示可以執行單個請求的客戶端:發送請求Request,獲得響應Response,注意并沒有綁定任何協議

public interface IClient<S extends ClientRequest, T extends IResponse> {// 執行請求并返回響應public T execute(S request, IClientConfig requestConfig) throws Exception; }

?

public class ClientRequest implements Cloneable {protected URI uri;protected Object loadBalancerKey = null;protected Boolean isRetriable = null;protected IClientConfig overrideConfig;}

?

public interface IResponse extends Closeable {public Object getPayload() throws ClientException;public boolean hasPayload();public boolean isSuccess();public URI getRequestedURI();public Map<String, ?> getHeaders(); }

RetryHandler

重試,是類似于Ribbon這種組件里特別重要的概念,因此此接口特別的重要。它負責對執行時若發生異常時的一個處理接口:重試or讓異常繼續拋出。

Ribbon把重試機制放在了ribbon-core包下,而非ribbon-loadbalancer下,是因為重試機制并不是負載均衡的內容,而是execute執行時的概念。

public interface RetryHandler {public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();// 該異常是否可處理(可重試)// sameServer:true表示在同一臺機器上重試。否則去其它機器重試public boolean isRetriableException(Throwable e, boolean sameServer);// 是否是Circuit熔斷類型異常。比如java.net.ConnectException就屬于這種故障// 這種異常類型一般屬于比較嚴重的,發生的次數多了就會把它熔斷(下次不會再找它了)public boolean isCircuitTrippingException(Throwable e);// 要在一臺服務器上執行的最大重試次數public int getMaxRetriesOnSameServer();// 要重試的最大不同服務器數。2表示最多去2臺不同的服務器身上重試public int getMaxRetriesOnNextServer(); }

主要實現類如下:

DefaultLoadBalancerRetryHandler

默認的重試實現。它只能識別java.net里的異常做出判斷。若你有其它異常,你可以繼承子類然后復寫相關方法。

public class DefaultLoadBalancerRetryHandler implements RetryHandler {// 這兩個異常會進行重試。代表連接不上嘛,重試是很合理的private List<Class<? extends Throwable>> retriable = Lists.newArrayList(ConnectException.class, SocketTimeoutException.class);// 和電路circuit相關的異常類型private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(SocketException.class, SocketTimeoutException.class);// 不解釋。它哥三個都可以通過IClientConfig配置// `MaxAutoRetries`,默認值是0。也就是說在同一機器上不重試(只會執行一次,失敗就失敗了)protected final int retrySameServer;// `MaxAutoRetriesNextServer`,默認值是1,也就是只會再試下面一臺機器 不行就不行了protected final int retryNextServer;// 重試開關。true:開啟重試 false:不開啟重試// `OkToRetryOnAllOperations`屬性控制其值,默認也是false 也就是說默認并不重試protected final boolean retryEnabled; // 構造器賦值:值可以從IClientConfig里來(常用)// 當然你也可以通過其他構造器傳過來public DefaultLoadBalancerRetryHandler(IClientConfig clientConfig) {this.retrySameServer = clientConfig.get(CommonClientConfigKey.MaxAutoRetries, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES);this.retryNextServer = clientConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER);this.retryEnabled = clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations, false);}@Overridepublic boolean isCircuitTrippingException(Throwable e) {return Utils.isPresentAsCause(e, getCircuitRelatedExceptions()); }@Overridepublic boolean isRetriableException(Throwable e, boolean sameServer) {if (retryEnabled) {if (sameServer) {return Utils.isPresentAsCause(e, getRetriableExceptions());} else {return true;}}return false;}}

?

RequestSpecificRetryHandler

Specific:特征,細節,特殊的。也就是說它是和Request請求特征相關的重試處理器。

Ribbon會為允許請求的每個請求創建RetryHandler實例,每個請求可以帶有自己的requestConfig,比如每個Client請求都可以有自己的retrySameServer和retryNextServer參數。

相較于默認實現,它主要是針對Request,使得每個Request都能有一份獨自的、自己的重試策略,通過傳入requestConfig來實現,若沒有特別指定那便會使用RetryHandler fallback策略進行兜底。下面是接口方法的實現:

public class RequestSpecificRetryHandler implements RetryHandler {// fallback默認使用的是RetryHandler.DEFAULT// 有點代理的意思private final RetryHandler fallback;private int retrySameServer = -1;private int retryNextServer = -1;// 只有是連接異常,也就是SocketException或者其子類異常才執行重試private final boolean okToRetryOnConnectErrors;// 若是true:只要異常了,任何錯都執行重試private final boolean okToRetryOnAllErrors;protected List<Class<? extends Throwable>> connectionRelated = Lists.newArrayList(SocketException.class); }

VipAddressResolver

VIP地址解析器,“VipAddress”是目標服務器場的邏輯名稱,該處理器幫助解析并獲取得到最終的地址。

public interface VipAddressResolver {public String resolve(String vipAddress, IClientConfig niwsClientConfig); }

有且僅有唯一一個實現類:SimpleVipAddressResolver

SimpleVipAddressResolver

使用正則表達式,替換字符串內的“變量”。

public class SimpleVipAddressResolver implements VipAddressResolver {private static final Pattern VAR_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");@Overridepublic String resolve(String vipAddressMacro, IClientConfig niwsClientConfig) {if (vipAddressMacro == null || vipAddressMacro.length() == 0) {return vipAddressMacro;}return replaceMacrosFromConfig(vipAddressMacro);}}

ribbon-loadbalancer

Ribbon和負載均衡不是完全相等的。Ribbon它的實際定位是更為抽象的:不限定協議的請求轉發。比如它可以集成ribbon-httpclient/transport等模塊來實現請求的控制、轉發。ribbon-loadbalancer是Ribbon的核心

Server

Server的每個屬性設置都沒有synchronization同步控制,是因為它統一依照last win的原則來處理接口,否則效率太低了。

public class Server {// 未知Zone區域,這是每臺Server的默認區域public static final String UNKNOWN_ZONE = "UNKNOWN";// 如192.168.1.1 / www.baidu.comprivate String host;private int port = 80;// 有可能是http/https 也有可能是tcp、udp等private String scheme;// id表示唯一。host + ":" + port -> localhost:8080 // 注意沒有http://前綴 只有host和端口// getInstanceId實例id使用的就是它。因為ip+端口可以唯一確定一個實例private volatile String id;// Server所屬的zone區域private String zone = UNKNOWN_ZONE;// 標記是否這臺機器是否是活著的// =========請注意:它的默認值是false=========private volatile boolean isAliveFlag; // 標記這臺機器是否可以準好可以提供服務了(活著并不代表可以提供服務了)private volatile boolean readyToServe = true; }

重寫了equal方法,只有id相同就認為server相同

@Overridepublic String toString() {return this.getId();}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (!(obj instanceof Server))return false;Server svc = (Server) obj;return svc.getId().equals(this.getId());}@Overridepublic int hashCode() {int hash = 7;hash = 31 * hash + (null == this.getId() ? 0 : this.getId().hashCode());return hash;}

IPing

定義如何“ping”服務器以檢查其是否活動的接口,類似于心跳檢測。

public interface IPing {// 檢查給定的Server是否為“活動的”,這為在負載平衡時選出一個可用的候選Serverpublic boolean isAlive(Server server); }

主要實現類?

AbstractLoadBalancerPing

顧名思義,和LoadBalancer有關的一種實現,用于探測服務器節點的適用性。

public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware {AbstractLoadBalancer lb;public void setLoadBalancer(AbstractLoadBalancer lb){this.lb = lb;}public AbstractLoadBalancer getLoadBalancer(){return lb;}@Overridepublic boolean isAlive(Server server) {return true;} }

它是使用較多的ping策略的父類,很明顯,請子類復寫isAlive()方法。它要求必須要關聯上一個負載均衡器AbstractLoadBalancer。若你要實現自己的Ping規則,進行心跳檢測,建議通過繼承該類來實現。

DummyPing

Dummy:仿制品,假的,仿真的。它是AbstractLoadBalancerPing的一個空實現~

public class DummyPing extends AbstractLoadBalancerPing {@Overridepublic boolean isAlive(Server server) {return true;}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {} }

它是默認的ping實現,Spring Cloud默認也是使用的它作為默認實現,也就是說根本就沒有心跳的效果。

PingUrl

它位于ribbon-httpclient這個包里面。它使用發送真實的Http請求的方式來做健康檢查,若返回的狀態碼是200就證明能夠ping通,返回true。不建議使用

Ping#isAlive()方法何時調用?有何用?

BaseLoadBalancer里是對此方法的唯一調用處,通過定時器在后臺每隔30秒ping一次。

// 這里是它的PingTask的唯一調用處void setupPingTask() {...lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);...}class PingTask extends TimerTask {public void run() {try {new Pinger(pingStrategy).runPinger();} catch (Exception e) {logger.error("LoadBalancer [{}]: Error pinging", name, e);}}}class Pinger {private final IPingStrategy pingerStrategy;public void runPinger() throws Exception {try {int numCandidates = allServers.length; //調用處,results = pingerStrategy.pingServers(ping, allServers);} finally {pingInProgress.set(false);}}} }

IPingStrategy

定義用于ping所有服務器的策略

private static class SerialPingStrategy implements IPingStrategy {@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {int numCandidates = servers.length;boolean[] results = new boolean[numCandidates];for (int i = 0; i < numCandidates; i++) {results[i] = false; /* Default answer is DEAD. */try {if (ping != null) {results[i] = ping.isAlive(servers[i]);}} catch (Exception e) {logger.error("Exception while pinging Server: '{}'", servers[i], e);}}return results;}}

ServerList

該接口定義了獲取服務器列表的方法。

public interface ServerList<T extends Server> {// 返回初始狀態的服務器列表。比如初試配置了10臺那它永遠是10個Serverpublic List<T> getInitialListOfServers();// 返回更新后的服務列表。它會周期ping過后,返回活著的public List<T> getUpdatedListOfServers(); }

AbstractServerList

該抽象實現有且提供一個public方法:提供loadBalancer使用的過濾器AbstractServerListFilter。

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{...} }

?ConfigurationBasedServerList

它可以從Configuration配置中加載服務器列表的實用工具實現類。屬性名的定義格式如下:

clientName,如果使用的是Feign,則為service Name。

<clientName>.<nameSpace>.listOfServers=<comma delimited hostname:port strings> account.ribbon.listOfServers=localhost:8080,localhost:8081#需要配置NIWSServerListClassName,不然listOfServers不起作用 account:ribbon:NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerListlistOfServers: localhost:9057 public class ConfigurationBasedServerList extends AbstractServerList<Server> {private IClientConfig clientConfig;// 在此處兩個方法的實現效果是一模一樣的。@Overridepublic List<Server> getInitialListOfServers() {return getUpdatedListOfServers();}@Overridepublic List<Server> getUpdatedListOfServers() {String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);//逗號分割的服務return derive(listOfServers);}}

StaticServerList

它是Spring Cloud對ServerList接口的實現,返回固定值。

LoadBalancerStats

Ribbon內部維護著一個服務器列表ServerList,當實例出現問題時候,需要將這部分異常的服務Server從負載均衡列表中T除掉,參考的就是服務器狀態的管理:ServerStats。LB需要依賴這些統計信息做為判斷的策略,負載均衡器的統計類主要是LoadBalancerStats,其內部持有ServerStats對每個Server的運行情況做了相關統計如:平均響應時間、累計失敗數、熔斷(時間)控制等。

?

LoadBalancerStats#getAvailableZones

獲取所有的可使用zone。不考慮server是否可用。

參考:?https://cloud.tencent.com/developer/article/1601502

?

?

AbstractServerPredicate

它是服務器過濾邏輯的基礎組件,可用于rules and server list filters。它傳入的是一個PredicateKey,含有一個Server和loadBalancerKey,由此可以通過服務器和負載均衡器來開發過濾服務器的邏輯。

PredicateKey

public class PredicateKey {private Object loadBalancerKey;private Server server;}

成員屬性

public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {protected IRule rule;private volatile LoadBalancerStats lbStats;// 隨機數。當過濾后還剩多臺Server將從中隨機獲取private final Random random = new Random();private final AtomicInteger nextIndex = new AtomicInteger();//一個特殊的Predicate:只有Server參數并無loadBalancerKey參數的PredicateKeyprivate final Predicate<Server> serverOnlyPredicate = new Predicate<Server>() {@Overridepublic boolean apply(@Nullable Server input) { return AbstractServerPredicate.this.apply(new PredicateKey(input));}};//直接返回true的斷言public static AbstractServerPredicate alwaysTrue() { return new AbstractServerPredicate() { @Overridepublic boolean apply(@Nullable PredicateKey input) {return true;}};}public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {return (PredicateKey input) -> p.apply(input);}public static AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) {return (PredicateKey input) -> p.apply(input.getServer());}}

?子類實現:

ZoneAffinityPredicate

是否同zone。

ZoneAvoidancePredicate

判斷Server的Zone是否在可用Zone中。

@Overridepublic boolean apply(@Nullable PredicateKey input) {// 若開關關閉了,也就是禁用了這個策略。那就永遠true唄if (!ENABLED.get()) {return true;}// 拿到該Server所在的zone,進而完成判斷String serverZone = input.getServer().getZone();if (serverZone == null) {return true;}// 若可用區只剩一個了,那也不要過濾了(有總比沒有強)if (lbStats.getAvailableZones().size() <= 1) {return true;}...// 拿到全部可用的zone后,判斷該Server坐在的Zone是否屬于可用區內Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());return availableZones.contains(serverZone);}

AvailabilityPredicate

對服務的可用性進行過濾(過濾掉不可用的服務器)

CompositePredicate

組合模式。它還具有“回退”到更多(不止一個)不同謂詞之一的功能。如果主的Predicate產生的過濾服務器太少,它將一個接一個地嘗試fallback的Predicate,直到過濾服務器的數量超過一定數量的閾值或百分比閾值

public class CompositePredicate extends AbstractServerPredicate {private AbstractServerPredicate delegate;private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();private int minimalFilteredServers = 1;private float minimalFilteredPercentage = 0;// 接口方法的實現@Overridepublic boolean apply(@Nullable PredicateKey input) {return delegate.apply(input);} }

?

// 從主Predicate獲取**過濾后**的服務器,如果過濾后的服務器的數量還不夠// (應該說還太多),繼續嘗試使用fallback的Predicate繼續過濾@Overridepublic List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {// 1、使用主Predicate完成過濾,留下合格的Server們List<Server> result = super.getEligibleServers(servers, loadBalancerKey);// 2、繼續執行fallback的斷言器Iterator<AbstractServerPredicate> i = fallbacks.iterator();while (i.hasNext())if(result.size() < minimalFilteredServers || result.size() < servers.size() * minimalFilteredPercentage){// 特別注意:這里傳入的是Server,而非在result基礎上過濾// 所以每次執行過濾和上一次的結果沒有半毛錢關系result = i.next().getEligibleServers(servers, loadBalancerKey); }}return result;}

?

?

總結

以上是生活随笔為你收集整理的Ribbon源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。