日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计

發布時間:2023/12/15 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

降級熔斷框架 Hystrix 源碼解析:滑動窗口統計

概述

Hystrix 是一個開源的降級熔斷框架,用于提高服務可靠性,適用于依賴大量外部服務的業務系統。什么是降級熔斷呢?

降級

業務降級,是指犧牲非核心的業務功能,保證核心功能的穩定運行。簡單來說,要實現優雅的業務降級,需要將功能實現拆分到相對獨立的不同代碼單元,分優先級進行隔離。在后臺通過開關控制,降級部分非主流程的業務功能,減輕系統依賴和性能損耗,從而提升集群的整體吞吐率。

降級的重點是:業務之間有優先級之分。降級的典型應用是:電商活動期間關閉非核心服務,保證核心買買買業務的正常運行。

熔斷

老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的接口也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以采取拒絕或者引流等機制。

同樣在分布式系統中,當被調用的遠程服務無法使用時,如果沒有過載保護,就會導致請求的資源阻塞在遠程服務器上耗盡資源。很多時候,剛開始可能只是出現了局部小規模的故障,然而由于種種原因,故障影響范圍越來越大,最終導致全局性的后果。這種過載保護,就是熔斷器。

在 hystrix 中,熔斷相關的配置有以下幾個:滑動窗口長度,單位毫秒

hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds

滑動窗口滾動桶的長度,單位毫秒

hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize

觸發熔斷的失敗率閾值

hystrix.command.HystrixCommandKey.circuitBreaker.errorThresholdPercentage

觸發熔斷的請求量閾值

hystrix.command.HystrixCommandKey.circuitBreaker.requestVolumeThreshold

從配置信息里可以看出來,熔斷邏輯判斷里使用了滑動窗口來統計服務調用的成功、失敗量。那么這里的滑動窗口是如何實現的呢?下面我們深入源碼來研究一下。

注:使用的源碼版本是 2017-09-13 GitHub 上 master 分支最新代碼。

滑動窗口

在 hystrix 里,大量使用了 RxJava 這個響應式函數編程框架,滑動窗口的實現也是使用了 RxJava 框架。

源碼分析

一個滑動窗口有兩個關鍵要素組成:窗口時長、窗口滾動時間間隔。通常一個窗口會劃分為若干個桶 bucket,每個桶的大小等于窗口滾動時間間隔。也就是說,滑動窗口統計數據時,分兩步:統計一個 bucket 內的數據;

統計一個窗口,即若干個 bucket 的數據。

bucket 統計的代碼位于 BucketedCounterStream 類中,其關鍵的代碼如下所示://?這里的代碼并非全部,只展示了和?bucket?統計相關的關鍵代碼public?abstract?class?BucketedCounterStream?{????protected?final?int?numBuckets;????protected?final?Observable?bucketedStream;????protected?final?AtomicReference?subscription?=?new?AtomicReference(null);????private?final?Func1,?Observable>?reduceBucketToSummary;????protected?BucketedCounterStream(final?HystrixEventStream?inputEventStream,?final?int?numBuckets,?final?int?bucketSizeInMs,????????????????????????????????????final?Func2?appendRawEventToBucket)?{????????this.numBuckets?=?numBuckets;????????this.reduceBucketToSummary?=?new?Func1,?Observable>()?{????????????@Override

public?Observable?call(Observable?eventBucket)?{????????????????return?eventBucket.reduce(getEmptyBucketSummary(),?appendRawEventToBucket);

}

};????????final?List?emptyEventCountsToStart?=?new?ArrayList();????????for?(int?i?=?0;?i?

emptyEventCountsToStart.add(getEmptyBucketSummary());

}????????this.bucketedStream?=?Observable.defer(new?Func0>()?{????????????@Override

public?Observable?call()?{????????????????return?inputEventStream

.observe()

.window(bucketSizeInMs,?TimeUnit.MILLISECONDS)?//bucket?it?by?the?counter?window?so?we?can?emit?to?the?next?operator?in?time?chunks,?not?on?every?OnNext

.flatMap(reduceBucketToSummary)????????????????//for?a?given?bucket,?turn?it?into?a?long?array?containing?counts?of?event?types

.startWith(emptyEventCountsToStart);???????????//start?it?with?empty?arrays?to?make?consumer?logic?as?generic?as?possible?(windows?are?always?full)

}

});

}????abstract?Bucket?getEmptyBucketSummary();

}

首先我們看這幾行代碼,這幾行代碼功能是:將服務調用級別的輸入數據流 inputEventStream 以 bucketSizeInMs 毫秒為一個桶進行了匯總,匯總的結果輸入到桶級別數據流 bucketedStream。this.bucketedStream?=?Observable.defer(new?Func0>()?{????????????@Override

public?Observable?call()?{????????????????return?inputEventStream

.observe()

.window(bucketSizeInMs,?TimeUnit.MILLISECONDS)?//?window?窗函數匯聚?bucketSizeInMs?毫秒內的數據后,每隔?bucketSizeInMs?毫秒批量發送出去

.flatMap(reduceBucketToSummary)????????????????//?flatMap?方法接收到?window?窗函數發來的數據,使用?reduceBucketToSummary?函數進行匯總統計

.startWith(emptyEventCountsToStart);???????????//?給?bucketedStream?發布源設定一個起始值

}

});

RxJava 基于觀察者模式,又叫“發布-訂閱”模式。inputEventStream 是 HystrixEventStream 對象,其 observe() 方法返回的是一個被觀察者 Observable 對象,也可以說是一個發布源 Publisher。public?interface?HystrixEventStream?{????Observable?observe();

}

在 Hystrix 中有多種數據發布源,與服務調用的熔斷相關的是 HystrixCommandCompletionStream:每一次服務調用結束,調用 write 方法記錄成功、失敗等信息;

write 方法調用了 writeOnlySubject.onNext,writeOnlySubject 是一個線程安全的發布源 PublishSubject,用于發布 HystrixCommandCompletion 類型的數據,onNext 功能是發布一個事件或數據;

observe 方法返回的可訂閱數據源 readOnlyStream 是 writeOnlySubject 的只讀版本。public?class?HystrixCommandCompletionStream?implements?HystrixEventStream?{????private?final?HystrixCommandKey?commandKey;?//?服務調用標記?key

private?final?Subject?writeOnlySubject;????private?final?Observable?readOnlyStream;

HystrixCommandCompletionStream(final?HystrixCommandKey?commandKey)?{????????this.commandKey?=?commandKey;????????this.writeOnlySubject?=?new?SerializedSubject(PublishSubject.create());????????this.readOnlyStream?=?writeOnlySubject.share();

}????public?void?write(HystrixCommandCompletion?event)?{

writeOnlySubject.onNext(event);

}????@Override

public?Observable?observe()?{????????return?readOnlyStream;

}

}

上面分析了 bucket 統計和事件發布源相關的代碼,下面我們再看一下 window 統計的代碼。滑動窗口統計的代碼在 BucketedRollingCounterStream 類中,window 統計和 bucket 統計原理是一樣的,只是維度不同:bucket 統計的維度是時間,比如 bucketSizeInMs 毫秒;

window 統計的維度是若干數據,在這里是 numBuckets 個 bucket。

注意:numBuckets 的值等于 hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds 除以 hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize,numBuckets 是整數,所以 sleepWindowInMilliseconds 必須是 bucketSize 的整數倍,否則 Hystrix 就會拋出異常。public?abstract?class?BucketedRollingCounterStream?extends?BucketedCounterStream?{????private?Observable?sourceStream;????private?final?AtomicBoolean?isSourceCurrentlySubscribed?=?new?AtomicBoolean(false);????protected?BucketedRollingCounterStream(HystrixEventStream?stream,?final?int?numBuckets,?int?bucketSizeInMs,???????????????????????????????????????????final?Func2?appendRawEventToBucket,???????????????????????????????????????????final?Func2?reduceBucket)?{????????super(stream,?numBuckets,?bucketSizeInMs,?appendRawEventToBucket);

Func1,?Observable>?reduceWindowToSummary?=?new?Func1,?Observable>()?{????????????@Override

public?Observable?call(Observable?window)?{????????????????return?window.scan(getEmptyOutputValue(),?reduceBucket).skip(numBuckets);

}

};????????this.sourceStream?=?bucketedStream??????//stream?broken?up?into?buckets

.window(numBuckets,?1)??????????//emit?overlapping?windows?of?buckets

.flatMap(reduceWindowToSummary)?//convert?a?window?of?bucket-summaries?into?a?single?summary

.doOnSubscribe(new?Action0()?{????????????????????@Override

public?void?call()?{

isSourceCurrentlySubscribed.set(true);

}

})

.doOnUnsubscribe(new?Action0()?{????????????????????@Override

public?void?call()?{

isSourceCurrentlySubscribed.set(false);

}

})

.share()????????????????????????//?multiple?subscribers?should?get?same?data

.onBackpressureDrop();??????????//?如果消費者處理數據太慢導致數據堆積,就丟棄部分數據

}????@Override

public?Observable?observe()?{????????return?sourceStream;

}

}

接下來我們介紹一下 BucketedRollingCounterStream 構造函數的主要參數:HystrixEventStream stream:數據發布源;

int numBuckets:每個窗口內部 bucket 個數;

int bucketSizeInMs:bucket 時長,也是窗口滾動時間間隔;

appendRawEventToBucket:bucket 內部統計函數,其功能是起始值 Bucket 加上 Event 后,輸出 Bucket 類型值,對對個數據的處理具有累積的效果;

reduceBucket:和 appendRawEventToBucket 類似,用于 window 統計。

BucketedRollingCounterStream 提供了完整的滑動窗口統計的服務,想要使用滑動窗口來統計數據的繼承實現 BucketedRollingCounterStream 即可。 接下來我們看一下用于滑動統計服務調用成功、失敗次數的 RollingCommandEventCounterStream 類:public?class?RollingCommandEventCounterStream?extends?BucketedRollingCounterStream?{????private?static?final?ConcurrentMap?streams?=?new?ConcurrentHashMap();????private?static?final?int?NUM_EVENT_TYPES?=?HystrixEventType.values().length;????public?static?RollingCommandEventCounterStream?getInstance(HystrixCommandKey?commandKey,?int?numBuckets,?int?bucketSizeInMs)?{

RollingCommandEventCounterStream?initialStream?=?streams.get(commandKey.name());????????if?(initialStream?!=?null)?{????????????return?initialStream;

}?else?{????????????synchronized?(RollingCommandEventCounterStream.class)?{

RollingCommandEventCounterStream?existingStream?=?streams.get(commandKey.name());????????????????if?(existingStream?==?null)?{

RollingCommandEventCounterStream?newStream?=?new?RollingCommandEventCounterStream(commandKey,?numBuckets,?bucketSizeInMs,

HystrixCommandMetrics.appendEventToBucket,?HystrixCommandMetrics.bucketAggregator);

streams.putIfAbsent(commandKey.name(),?newStream);????????????????????return?newStream;

}?else?{????????????????????return?existingStream;

}

}

}

}????private?RollingCommandEventCounterStream(HystrixCommandKey?commandKey,?int?numCounterBuckets,?int?counterBucketSizeInMs,

Func2?reduceCommandCompletion,

Func2?reduceBucket)?{????????super(HystrixCommandCompletionStream.getInstance(commandKey),?numCounterBuckets,?counterBucketSizeInMs,?reduceCommandCompletion,?reduceBucket);

}

}

RollingCommandEventCounterStream 構造函數是私有的,需要通過 getInstance 方法來獲取實例,這么做是為了確保每個依賴服務 HystrixCommandKey 只生成一個 RollingCommandEventCounterStream 實例。我們看一下構造 BucketedRollingCounterStream 的時候傳入的參數,appendRawEventToBucket、reduceBucket 的實現分別是 HystrixCommandMetrics.appendEventToBucket、HystrixCommandMetrics.bucketAggregator,其主要功能就是一個對各種 HystrixEventType 事件的累加求和。public?class?HystrixCommandMetrics?extends?HystrixMetrics?{????private?static?final?HystrixEventType[]?ALL_EVENT_TYPES?=?HystrixEventType.values();????public?static?final?Func2?appendEventToBucket?=?new?Func2()?{????????@Override

public?long[]?call(long[]?initialCountArray,?HystrixCommandCompletion?execution)?{

ExecutionResult.EventCounts?eventCounts?=?execution.getEventCounts();????????????for?(HystrixEventType?eventType:?ALL_EVENT_TYPES)?{????????????????switch?(eventType)?{????????????????????case?EXCEPTION_THROWN:?break;?//this?is?just?a?sum?of?other?anyway?-?don't?do?the?work?here

default:

initialCountArray[eventType.ordinal()]?+=?eventCounts.getCount(eventType);????????????????????????break;

}

}????????????return?initialCountArray;

}

};????public?static?final?Func2?bucketAggregator?=?new?Func2()?{????????@Override

public?long[]?call(long[]?cumulativeEvents,?long[]?bucketEventCounts)?{????????????for?(HystrixEventType?eventType:?ALL_EVENT_TYPES)?{????????????????switch?(eventType)?{????????????????????case?EXCEPTION_THROWN:????????????????????????for?(HystrixEventType?exceptionEventType:?HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES)?{

cumulativeEvents[eventType.ordinal()]?+=?bucketEventCounts[exceptionEventType.ordinal()];

}????????????????????????break;????????????????????default:

cumulativeEvents[eventType.ordinal()]?+=?bucketEventCounts[eventType.ordinal()];????????????????????????break;

}

}????????????return?cumulativeEvents;

}

};

}

這個滑動窗口是在 Hystrix 哪里使用的呢?必然是熔斷邏輯里啊。熔斷邏輯位于 HystrixCircuitBreaker 類中,其使用滑動窗口的關鍵代碼如下。主要是調用了 BucketedRollingCounterStream 的 observe 方法,對統計數據的發布源進行了訂閱,收到統計數據后,對熔斷器狀態 circuitOpened 進行更新。/*?package?*/class?HystrixCircuitBreakerImpl?implements?HystrixCircuitBreaker?{????????private?final?HystrixCommandProperties?properties;????????private?final?HystrixCommandMetrics?metrics;????????enum?Status?{

CLOSED,?OPEN,?HALF_OPEN;

}????????private?final?AtomicReference?status?=?new?AtomicReference(Status.CLOSED);????????private?final?AtomicLong?circuitOpened?=?new?AtomicLong(-1);????????private?final?AtomicReference?activeSubscription?=?new?AtomicReference(null);????????protected?HystrixCircuitBreakerImpl(HystrixCommandKey?key,?HystrixCommandGroupKey?commandGroup,?final?HystrixCommandProperties?properties,?HystrixCommandMetrics?metrics)?{????????????this.properties?=?properties;????????????this.metrics?=?metrics;????????????//On?a?timer,?this?will?set?the?circuit?between?OPEN/CLOSED?as?command?executions?occur

Subscription?s?=?subscribeToStream();

activeSubscription.set(s);

}????????private?Subscription?subscribeToStream()?{????????????return?metrics.getHealthCountsStream()

.observe()

.subscribe(new?Subscriber()?{????????????????????????@Override

public?void?onCompleted()?{

}????????????????????????@Override

public?void?onError(Throwable?e)?{

}????????????????????????@Override

public?void?onNext(HealthCounts?hc)?{????????????????????????????//?判斷請求次數,是否達到閾值。畢竟請求量太小,熔斷的意義也就不大了

if?(hc.getTotalRequests()?

}?else?{????????????????????????????????//?判斷失敗率是否達到閾值

if?(hc.getErrorPercentage()?

}?else?{????????????????????????????????????//?失敗率達到閾值,則修改熔斷狀態為?OPEN

if?(status.compareAndSet(Status.CLOSED,?Status.OPEN))?{

circuitOpened.set(System.currentTimeMillis());

}

}

}

}

});

}

}

手動寫一個示例

前面解析了 Hystrix 中滑動窗口的實現,由于考慮了各種細節其實現非常復雜,所以我們寫了一個簡易版本的滑動窗口統計,方便觀察學習。import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?rx.Observable;import?rx.functions.Func1;import?rx.functions.Func2;import?rx.subjects.PublishSubject;import?rx.subjects.SerializedSubject;import?java.util.concurrent.TimeUnit;/**

*?模擬滑動窗口計數

*?Created?by?albon?on?17/6/24.

*/public?class?RollingWindowTest?{????private?static?final?Logger?logger?=?LoggerFactory.getLogger(WindowTest.class);????public?static?final?Func2?INTEGER_SUM?=

(integer,?integer2)?->?integer?+?integer2;????public?static?final?Func1,?Observable>?WINDOW_SUM?=

window?->?window.scan(0,?INTEGER_SUM).skip(3);????public?static?final?Func1,?Observable>?INNER_BUCKET_SUM?=

integerObservable?->?integerObservable.reduce(0,?INTEGER_SUM);????public?static?void?main(String[]?args)?throws?InterruptedException?{

PublishSubject?publishSubject?=?PublishSubject.create();

SerializedSubject?serializedSubject?=?publishSubject.toSerialized();

serializedSubject

.window(5,?TimeUnit.SECONDS)?//?5秒作為一個基本塊

.flatMap(INNER_BUCKET_SUM)???????????//?基本塊內數據求和

.window(3,?1)??????????????//?3個塊作為一個窗口,滾動布數為1

.flatMap(WINDOW_SUM)?????????????????//?窗口數據求和

.subscribe((Integer?integer)?->

logger.info("[{}]?call?......?{}",?//?輸出統計數據到日志

Thread.currentThread().getName(),?integer));????????//?緩慢發送數據,觀察效果

for?(int?i=0;?i<100;?++i)?{????????????if?(i?

serializedSubject.onNext(1);

}?else?{

serializedSubject.onNext(2);

}

Thread.sleep(1000);

}

}

}

總結

一個滑動窗口統計主要分為兩步:bucket 統計,bucket 的大小決定了滑動窗口滾動時間間隔;

window 統計,window 的時長決定了包含的 bucket 的數目。

Hystrix 實現滑動窗口利用了 RxJava 這個響應式函數編程框架,主要是其中的幾個函數:window:根據指定時間或指定數量對數據流進行聚集,相當于 1 對 N 的轉換;

flatMap:將輸入數據流,轉換成另一種格式的數據流,在滑動窗口統計中起到了數據求和的功能(當然其功能并不限于求和)。

Hystrix 最核心的基礎組件,當屬提供觀察者模式(發布-訂閱模式)的 RxJava。

參考文獻

作者:albon

鏈接:https://www.jianshu.com/p/c1b6497889b4

總結

以上是生活随笔為你收集整理的Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。