Ribbon源码解析(一)
目錄
?
介紹
引入
關鍵組件
Modules模塊
Ribbon和Spring-Cloud-Loadbalancer
Core
IClient
RetryHandler
DefaultLoadBalancerRetryHandler
RequestSpecificRetryHandler
VipAddressResolver
SimpleVipAddressResolver
ribbon-loadbalancer
Server
IPing
AbstractLoadBalancerPing
DummyPing
PingUrl
Ping#isAlive()方法何時調用?有何用?
IPingStrategy
ServerList
AbstractServerList
?ConfigurationBasedServerList
StaticServerList
LoadBalancerStats
LoadBalancerStats#getAvailableZones
AbstractServerPredicate
PredicateKey
成員屬性
?子類實現:
ZoneAffinityPredicate
ZoneAvoidancePredicate
AvailabilityPredicate
CompositePredicate
介紹
Netflix Ribbon是Netflix OSS的一部分,它是一個基于HTTP和TCP客戶端負載均衡器。
Ribbon是一個客戶端負載均衡器,客戶端得到可用的服務器列表然后按照特定的負載均衡策略,分發請求到不同的 服務器 。
-
客戶端需要知道服務器端的服務列表(可能通過配置、可能讓其自己去注冊中心拉取),需要自行決定請求要發送的目標地址。
-
客戶端維護負載均衡服務器,控制負載均衡策略和算法。
引入
<dependency><groupId>com.netflix.ribbon</groupId><artifactId>ribbon-core</artifactId><version>2.7.17</version> </dependency>Ribbon自2.4.0版本起就不強依賴于Archaius這個庫,但是Spring Cloud哪怕到了2020-03-15最新的Hoxton.SR3版本(對應的spring-cloud-starter-netflix-ribbon的2.2.2.RELEASE版本),它依賴Ribbon的依舊是2.3.0(2018.4發布)
關鍵組件
- ServerList:可以響應客戶端的特定服務的服務器列表
- ServerListFilter:可以動態獲得的具有所需特征的候選服務器列表的過濾器
- ServerListUpdater:用于執行動態服務器列表更新
- IRule:負載均衡策略,用于確定從服務器列表返回哪個服務器
- IPing:客戶端用于快速檢查服務器當時是否處于活動狀態(心跳檢測)
- ILoadBalancer:負載均衡器,負責負載均衡調度的管理
?
Modules模塊
- ribbon-core:客戶端配置api和其他共享api。
- ribbon-loadbalancer:可以獨立使用或與其他模塊一起使用的負載均衡器api
- ribbon:集成了負載平衡、容錯、緩存/批處理等功能的api。基本不使用了
- ribbon-eureka:使用Eureka客戶端為云提供動態服務器列表的api(和自己家的eureka天然整合)
- ribbon-httpclient:REST客戶端構建在Apache HttpClient之上,與負載平衡器集成(不支持并被ribbon模塊取代)?;静皇褂昧?/li>
- ribbon-transport:使用具有負載平衡功能的RxNetty傳輸支持HTTP、TCP和UDP協議的客戶端?;静皇褂昧?/li>
Ribbon和Spring-Cloud-Loadbalancer
Spring Cloud提供的對負載均衡的支持功能位于spring-cloud-commons這個工程里。
Spring Cloud自己的負載均衡接口是:ReactiveLoadBalancer,總體來說成熟度還不夠,離大規模商用部署仍有一段距離。
而至于抽象的通用接口org.springframework.cloud.client.loadbalancer.LoadBalancerClient,它目前的唯一實現仍然僅有RibbonLoadBalancerClient,也就是基于Ribbon的實現。
?
Core
core核心包里并沒有任何loadbalance負載均衡的概念,并且也沒有任何http的概念在里面,所以說core是一個高度抽象的包:和lb無關,和協議亦無關。
IClient
Ribbon要想負載均衡,那必然需要有發送請求的能力,而該接口就是它最最最最為核心接口嘍,其它的一切組件均圍繞它來設計和打造,包括LB。
該接口表示可以執行單個請求的客戶端:發送請求Request,獲得響應Response,注意并沒有綁定任何協議
public interface IClient<S extends ClientRequest, T extends IResponse> {// 執行請求并返回響應public T execute(S request, IClientConfig requestConfig) throws Exception; }?
public class ClientRequest implements Cloneable {protected URI uri;protected Object loadBalancerKey = null;protected Boolean isRetriable = null;protected IClientConfig overrideConfig;}?
public interface IResponse extends Closeable {public Object getPayload() throws ClientException;public boolean hasPayload();public boolean isSuccess();public URI getRequestedURI();public Map<String, ?> getHeaders(); }RetryHandler
重試,是類似于Ribbon這種組件里特別重要的概念,因此此接口特別的重要。它負責對執行時若發生異常時的一個處理接口:重試or讓異常繼續拋出。
Ribbon把重試機制放在了ribbon-core包下,而非ribbon-loadbalancer下,是因為重試機制并不是負載均衡的內容,而是execute執行時的概念。
public interface RetryHandler {public static final RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();// 該異常是否可處理(可重試)// sameServer:true表示在同一臺機器上重試。否則去其它機器重試public boolean isRetriableException(Throwable e, boolean sameServer);// 是否是Circuit熔斷類型異常。比如java.net.ConnectException就屬于這種故障// 這種異常類型一般屬于比較嚴重的,發生的次數多了就會把它熔斷(下次不會再找它了)public boolean isCircuitTrippingException(Throwable e);// 要在一臺服務器上執行的最大重試次數public int getMaxRetriesOnSameServer();// 要重試的最大不同服務器數。2表示最多去2臺不同的服務器身上重試public int getMaxRetriesOnNextServer(); }主要實現類如下:
DefaultLoadBalancerRetryHandler
默認的重試實現。它只能識別java.net里的異常做出判斷。若你有其它異常,你可以繼承子類然后復寫相關方法。
public class DefaultLoadBalancerRetryHandler implements RetryHandler {// 這兩個異常會進行重試。代表連接不上嘛,重試是很合理的private List<Class<? extends Throwable>> retriable = Lists.newArrayList(ConnectException.class, SocketTimeoutException.class);// 和電路circuit相關的異常類型private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(SocketException.class, SocketTimeoutException.class);// 不解釋。它哥三個都可以通過IClientConfig配置// `MaxAutoRetries`,默認值是0。也就是說在同一機器上不重試(只會執行一次,失敗就失敗了)protected final int retrySameServer;// `MaxAutoRetriesNextServer`,默認值是1,也就是只會再試下面一臺機器 不行就不行了protected final int retryNextServer;// 重試開關。true:開啟重試 false:不開啟重試// `OkToRetryOnAllOperations`屬性控制其值,默認也是false 也就是說默認并不重試protected final boolean retryEnabled; // 構造器賦值:值可以從IClientConfig里來(常用)// 當然你也可以通過其他構造器傳過來public DefaultLoadBalancerRetryHandler(IClientConfig clientConfig) {this.retrySameServer = clientConfig.get(CommonClientConfigKey.MaxAutoRetries, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES);this.retryNextServer = clientConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER);this.retryEnabled = clientConfig.get(CommonClientConfigKey.OkToRetryOnAllOperations, false);}@Overridepublic boolean isCircuitTrippingException(Throwable e) {return Utils.isPresentAsCause(e, getCircuitRelatedExceptions()); }@Overridepublic boolean isRetriableException(Throwable e, boolean sameServer) {if (retryEnabled) {if (sameServer) {return Utils.isPresentAsCause(e, getRetriableExceptions());} else {return true;}}return false;}}?
RequestSpecificRetryHandler
Specific:特征,細節,特殊的。也就是說它是和Request請求特征相關的重試處理器。
Ribbon會為允許請求的每個請求創建RetryHandler實例,每個請求可以帶有自己的requestConfig,比如每個Client請求都可以有自己的retrySameServer和retryNextServer參數。
相較于默認實現,它主要是針對Request,使得每個Request都能有一份獨自的、自己的重試策略,通過傳入requestConfig來實現,若沒有特別指定那便會使用RetryHandler fallback策略進行兜底。下面是接口方法的實現:
public class RequestSpecificRetryHandler implements RetryHandler {// fallback默認使用的是RetryHandler.DEFAULT// 有點代理的意思private final RetryHandler fallback;private int retrySameServer = -1;private int retryNextServer = -1;// 只有是連接異常,也就是SocketException或者其子類異常才執行重試private final boolean okToRetryOnConnectErrors;// 若是true:只要異常了,任何錯都執行重試private final boolean okToRetryOnAllErrors;protected List<Class<? extends Throwable>> connectionRelated = Lists.newArrayList(SocketException.class); }VipAddressResolver
VIP地址解析器,“VipAddress”是目標服務器場的邏輯名稱,該處理器幫助解析并獲取得到最終的地址。
public interface VipAddressResolver {public String resolve(String vipAddress, IClientConfig niwsClientConfig); }有且僅有唯一一個實現類:SimpleVipAddressResolver
SimpleVipAddressResolver
使用正則表達式,替換字符串內的“變量”。
public class SimpleVipAddressResolver implements VipAddressResolver {private static final Pattern VAR_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");@Overridepublic String resolve(String vipAddressMacro, IClientConfig niwsClientConfig) {if (vipAddressMacro == null || vipAddressMacro.length() == 0) {return vipAddressMacro;}return replaceMacrosFromConfig(vipAddressMacro);}}ribbon-loadbalancer
Ribbon和負載均衡不是完全相等的。Ribbon它的實際定位是更為抽象的:不限定協議的請求轉發。比如它可以集成ribbon-httpclient/transport等模塊來實現請求的控制、轉發。ribbon-loadbalancer是Ribbon的核心
Server
Server的每個屬性設置都沒有synchronization同步控制,是因為它統一依照last win的原則來處理接口,否則效率太低了。
public class Server {// 未知Zone區域,這是每臺Server的默認區域public static final String UNKNOWN_ZONE = "UNKNOWN";// 如192.168.1.1 / www.baidu.comprivate String host;private int port = 80;// 有可能是http/https 也有可能是tcp、udp等private String scheme;// id表示唯一。host + ":" + port -> localhost:8080 // 注意沒有http://前綴 只有host和端口// getInstanceId實例id使用的就是它。因為ip+端口可以唯一確定一個實例private volatile String id;// Server所屬的zone區域private String zone = UNKNOWN_ZONE;// 標記是否這臺機器是否是活著的// =========請注意:它的默認值是false=========private volatile boolean isAliveFlag; // 標記這臺機器是否可以準好可以提供服務了(活著并不代表可以提供服務了)private volatile boolean readyToServe = true; }重寫了equal方法,只有id相同就認為server相同
@Overridepublic String toString() {return this.getId();}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (!(obj instanceof Server))return false;Server svc = (Server) obj;return svc.getId().equals(this.getId());}@Overridepublic int hashCode() {int hash = 7;hash = 31 * hash + (null == this.getId() ? 0 : this.getId().hashCode());return hash;}IPing
定義如何“ping”服務器以檢查其是否活動的接口,類似于心跳檢測。
public interface IPing {// 檢查給定的Server是否為“活動的”,這為在負載平衡時選出一個可用的候選Serverpublic boolean isAlive(Server server); }主要實現類?
AbstractLoadBalancerPing
顧名思義,和LoadBalancer有關的一種實現,用于探測服務器節點的適用性。
public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware {AbstractLoadBalancer lb;public void setLoadBalancer(AbstractLoadBalancer lb){this.lb = lb;}public AbstractLoadBalancer getLoadBalancer(){return lb;}@Overridepublic boolean isAlive(Server server) {return true;} }它是使用較多的ping策略的父類,很明顯,請子類復寫isAlive()方法。它要求必須要關聯上一個負載均衡器AbstractLoadBalancer。若你要實現自己的Ping規則,進行心跳檢測,建議通過繼承該類來實現。
DummyPing
Dummy:仿制品,假的,仿真的。它是AbstractLoadBalancerPing的一個空實現~
public class DummyPing extends AbstractLoadBalancerPing {@Overridepublic boolean isAlive(Server server) {return true;}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {} }它是默認的ping實現,Spring Cloud默認也是使用的它作為默認實現,也就是說根本就沒有心跳的效果。
PingUrl
它位于ribbon-httpclient這個包里面。它使用發送真實的Http請求的方式來做健康檢查,若返回的狀態碼是200就證明能夠ping通,返回true。不建議使用
Ping#isAlive()方法何時調用?有何用?
BaseLoadBalancer里是對此方法的唯一調用處,通過定時器在后臺每隔30秒ping一次。
// 這里是它的PingTask的唯一調用處void setupPingTask() {...lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);...}class PingTask extends TimerTask {public void run() {try {new Pinger(pingStrategy).runPinger();} catch (Exception e) {logger.error("LoadBalancer [{}]: Error pinging", name, e);}}}class Pinger {private final IPingStrategy pingerStrategy;public void runPinger() throws Exception {try {int numCandidates = allServers.length; //調用處,results = pingerStrategy.pingServers(ping, allServers);} finally {pingInProgress.set(false);}}} }IPingStrategy
定義用于ping所有服務器的策略
private static class SerialPingStrategy implements IPingStrategy {@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {int numCandidates = servers.length;boolean[] results = new boolean[numCandidates];for (int i = 0; i < numCandidates; i++) {results[i] = false; /* Default answer is DEAD. */try {if (ping != null) {results[i] = ping.isAlive(servers[i]);}} catch (Exception e) {logger.error("Exception while pinging Server: '{}'", servers[i], e);}}return results;}}ServerList
該接口定義了獲取服務器列表的方法。
public interface ServerList<T extends Server> {// 返回初始狀態的服務器列表。比如初試配置了10臺那它永遠是10個Serverpublic List<T> getInitialListOfServers();// 返回更新后的服務列表。它會周期ping過后,返回活著的public List<T> getUpdatedListOfServers(); }AbstractServerList
該抽象實現有且提供一個public方法:提供loadBalancer使用的過濾器AbstractServerListFilter。
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{...} }?ConfigurationBasedServerList
它可以從Configuration配置中加載服務器列表的實用工具實現類。屬性名的定義格式如下:
clientName,如果使用的是Feign,則為service Name。
<clientName>.<nameSpace>.listOfServers=<comma delimited hostname:port strings> account.ribbon.listOfServers=localhost:8080,localhost:8081#需要配置NIWSServerListClassName,不然listOfServers不起作用 account:ribbon:NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerListlistOfServers: localhost:9057 public class ConfigurationBasedServerList extends AbstractServerList<Server> {private IClientConfig clientConfig;// 在此處兩個方法的實現效果是一模一樣的。@Overridepublic List<Server> getInitialListOfServers() {return getUpdatedListOfServers();}@Overridepublic List<Server> getUpdatedListOfServers() {String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);//逗號分割的服務return derive(listOfServers);}}StaticServerList
它是Spring Cloud對ServerList接口的實現,返回固定值。
LoadBalancerStats
Ribbon內部維護著一個服務器列表ServerList,當實例出現問題時候,需要將這部分異常的服務Server從負載均衡列表中T除掉,參考的就是服務器狀態的管理:ServerStats。LB需要依賴這些統計信息做為判斷的策略,負載均衡器的統計類主要是LoadBalancerStats,其內部持有ServerStats對每個Server的運行情況做了相關統計如:平均響應時間、累計失敗數、熔斷(時間)控制等。
?
LoadBalancerStats#getAvailableZones
獲取所有的可使用zone。不考慮server是否可用。
參考:?https://cloud.tencent.com/developer/article/1601502
?
?
AbstractServerPredicate
它是服務器過濾邏輯的基礎組件,可用于rules and server list filters。它傳入的是一個PredicateKey,含有一個Server和loadBalancerKey,由此可以通過服務器和負載均衡器來開發過濾服務器的邏輯。
PredicateKey
public class PredicateKey {private Object loadBalancerKey;private Server server;}成員屬性
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {protected IRule rule;private volatile LoadBalancerStats lbStats;// 隨機數。當過濾后還剩多臺Server將從中隨機獲取private final Random random = new Random();private final AtomicInteger nextIndex = new AtomicInteger();//一個特殊的Predicate:只有Server參數并無loadBalancerKey參數的PredicateKeyprivate final Predicate<Server> serverOnlyPredicate = new Predicate<Server>() {@Overridepublic boolean apply(@Nullable Server input) { return AbstractServerPredicate.this.apply(new PredicateKey(input));}};//直接返回true的斷言public static AbstractServerPredicate alwaysTrue() { return new AbstractServerPredicate() { @Overridepublic boolean apply(@Nullable PredicateKey input) {return true;}};}public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {return (PredicateKey input) -> p.apply(input);}public static AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) {return (PredicateKey input) -> p.apply(input.getServer());}}?子類實現:
ZoneAffinityPredicate
是否同zone。
ZoneAvoidancePredicate
判斷Server的Zone是否在可用Zone中。
@Overridepublic boolean apply(@Nullable PredicateKey input) {// 若開關關閉了,也就是禁用了這個策略。那就永遠true唄if (!ENABLED.get()) {return true;}// 拿到該Server所在的zone,進而完成判斷String serverZone = input.getServer().getZone();if (serverZone == null) {return true;}// 若可用區只剩一個了,那也不要過濾了(有總比沒有強)if (lbStats.getAvailableZones().size() <= 1) {return true;}...// 拿到全部可用的zone后,判斷該Server坐在的Zone是否屬于可用區內Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());return availableZones.contains(serverZone);}AvailabilityPredicate
對服務的可用性進行過濾(過濾掉不可用的服務器)
CompositePredicate
組合模式。它還具有“回退”到更多(不止一個)不同謂詞之一的功能。如果主的Predicate產生的過濾服務器太少,它將一個接一個地嘗試fallback的Predicate,直到過濾服務器的數量超過一定數量的閾值或百分比閾值。
public class CompositePredicate extends AbstractServerPredicate {private AbstractServerPredicate delegate;private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();private int minimalFilteredServers = 1;private float minimalFilteredPercentage = 0;// 接口方法的實現@Overridepublic boolean apply(@Nullable PredicateKey input) {return delegate.apply(input);} }?
// 從主Predicate獲取**過濾后**的服務器,如果過濾后的服務器的數量還不夠// (應該說還太多),繼續嘗試使用fallback的Predicate繼續過濾@Overridepublic List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {// 1、使用主Predicate完成過濾,留下合格的Server們List<Server> result = super.getEligibleServers(servers, loadBalancerKey);// 2、繼續執行fallback的斷言器Iterator<AbstractServerPredicate> i = fallbacks.iterator();while (i.hasNext())if(result.size() < minimalFilteredServers || result.size() < servers.size() * minimalFilteredPercentage){// 特別注意:這里傳入的是Server,而非在result基礎上過濾// 所以每次執行過濾和上一次的結果沒有半毛錢關系result = i.next().getEligibleServers(servers, loadBalancerKey); }}return result;}?
?
總結
以上是生活随笔為你收集整理的Ribbon源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot配置文件有提示
- 下一篇: Ribbon源码解析(二)