日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hystrix指标窗口实现原理

發(fā)布時間:2025/5/22 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hystrix指标窗口实现原理 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、引子

Hystrix是一個熔斷中間件,能夠?qū)崿F(xiàn)fast-fail并走備用方案。Hystrix基于滑動窗口判定服務(wù)失敗占比選擇性熔斷。滑動窗口的實現(xiàn)方案有很多種,指標(biāo)計數(shù)也有很多種實現(xiàn)常見的就是AtomicInteger進行原子增減維護計數(shù),具體的方案就不探討了。

Hystrix是基于Rxjava去實現(xiàn)的,那么如何利用RxJava實現(xiàn)指標(biāo)的匯聚和滑動窗口實現(xiàn)呢?當(dāng)然本篇不是作為教程去介紹RxJava的使用姿勢,本篇文章主要解說Hystrix是什么一個思路完成這項功能。

二、指標(biāo)數(shù)據(jù)上傳

看HystrixCommand執(zhí)行的主入口

public Observable<R> toObservable() {final AbstractCommand<R> _cmd = this;final Action0 terminateCommandCleanup = new Action0() {@Overridepublic void call() {if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {handleCommandEnd(false); //user code never ran} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {handleCommandEnd(true); //user code did run}}};//mark the command as CANCELLED and store the latency (in addition to standard cleanup)final Action0 unsubscribeCommandCleanup = new Action0() {@Overridepublic void call() {if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {.......省略干擾代碼...........handleCommandEnd(false); //user code never ran} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {.......省略干擾代碼...........handleCommandEnd(true); //user code did run}}};.......省略干擾代碼...........return Observable.defer(new Func0<Observable<R>>() {.......省略干擾代碼...........return afterCache.doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook);} });

我們的主入口Observable當(dāng)doOnTerminate 和 doOnUnsubscribe 的時候觸發(fā) handleCommandEnd 方法,從字面意思就是當(dāng)command執(zhí)行結(jié)束處理一些事情。

private void handleCommandEnd(boolean commandExecutionStarted) {........省略干擾代碼..........executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);if (executionResultAtTimeOfCancellation == null) {metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);} else {metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);}........省略干擾代碼.......... }

注意看 metrics.markCommandDone,調(diào)用了HystrixCommandMetrics的markCommandDone方法,把一個executionResult傳入了進來。ExecutionResult 這是個什么鬼呢?
我們截取部分代碼瀏覽下

public class ExecutionResult {private final EventCounts eventCounts;private final Exception failedExecutionException;private final Exception executionException;private final long startTimestamp;private final int executionLatency; //time spent in run() methodprivate final int userThreadLatency; //time elapsed between caller thread submitting request and response being visible to itprivate final boolean executionOccurred;private final boolean isExecutedInThread;private final HystrixCollapserKey collapserKey;private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length;private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);

以大家聰慧的頭腦應(yīng)該能夠猜測到這個類就是當(dāng)前HystrixCommand的 執(zhí)行結(jié)果記錄,只不過這個結(jié)果不僅僅是結(jié)果,也包含了各種狀態(tài)以及出現(xiàn)的異常。它的身影在Hystrix執(zhí)行原理里講的各Observable里出現(xiàn),跟著HystrixCommand整個生命周期。

回到上面講,當(dāng)時command執(zhí)行完畢后,調(diào)用了HystrixCommandMetrics的markCommandDone方法

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);if (executionStarted) {concurrentExecutionCount.decrementAndGet();} }

最終調(diào)用量HystrixThreadEventStream. executionDone方法的HystrixThreadEventStream是ThreadLocal方式,和當(dāng)前線程綁定

//HystrixThreadEventStream.threadLocalStreams private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {@Overrideprotected HystrixThreadEventStream initialValue() {return new HystrixThreadEventStream(Thread.currentThread());} };

executionDone代碼如下

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);writeOnlyCommandCompletionSubject.onNext(event); }

這里根據(jù) executionResult, threadpoolkey,comandKey,生成 了一個HystrixCommandCompletion然后通過writeOnlyCommandCompletionSubject寫入,writeOnlyCommandCompletionSubject整個東西,我們等會再看。現(xiàn)在思考下HystrixCommandCompletion是什么?HystrixCommandCompletion包含了 ExecutionResult和HystrixRequestContext,它是一種HystrixEvent,標(biāo)識著command執(zhí)行完成的一個事件,該事件是當(dāng)前這個點HystrixCommand的請求信息,執(zhí)行結(jié)果,狀態(tài)等數(shù)據(jù)的載體。

從上面類圖可以看到不僅僅HystrixCommandCompletion一種還有其它的Event,這里就不一一介紹了。

當(dāng)writeOnlyCommandCompletionSubject onNext的時候會觸發(fā) writeCommandCompletionsToShardedStreams執(zhí)行里面的call()方法。

private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {@Overridepublic void call(HystrixCommandCompletion commandCompletion) {HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());commandStream.write(commandCompletion);if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());threadPoolStream.write(commandCompletion);}} };

這個方法的意思是,會把HystrixCommandCompletion 通過HystrixCommandCompletionStream 寫入,如果當(dāng)前command使用的是線程池隔離策略的話 會通過 HystrixThreadPoolCompletionStream 再寫一遍。HystrixCommandCompletionStream 和HystrixThreadPoolCompletionStream 他們兩個概念類似,我們拿著前者解釋,這個是個什么東西。
HystrixCommandCompletionStream 以commandKey為key,維護在內(nèi)存中,調(diào)用它的write的方法實則是調(diào)用內(nèi)部屬性 writeOnlySubject的方法,writeOnlySubject是一個Subject(RxJava的東西),通過SerializedSubject保證其寫入的順序性,調(diào)用其share()方法獲得一個Observable也就是readOnlyStream,讓外界能夠讀這個Subject的數(shù)據(jù)。總結(jié)下Subject是連接兩個Observable之間的橋梁,它有兩個泛型元素標(biāo)識著進出數(shù)據(jù)類型,全部都是HystrixCommandCompletion類型

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {this.commandKey = commandKey;this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());this.readOnlyStream = writeOnlySubject.share();}

我們從源頭開始梳理,明白了這個HystrixCommandCompletion數(shù)據(jù)流是如何寫入的(其它類型的的思路一致,就不一一解釋了),那它是如何被搜集起來呢?

三、指標(biāo)數(shù)據(jù)搜集

追溯至AbstractCommand初始化

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {........省略代碼........this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);........省略代碼........ }

初始化command指標(biāo)

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {super(null);this.key = key;this.group = commandGroup;this.threadPoolKey = threadPoolKey;this.properties = properties;healthCountsStream = HealthCountsStream.getInstance(key, properties);rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); }

有很多各種 XXXStream.getInstance(),這些Stream就是針對各類用途進行指標(biāo)搜集,統(tǒng)計的具體實現(xiàn),下面可以看下他們的UML類圖

BucketedCounterStream實現(xiàn)了基本的桶計數(shù)器,BucketedCumulativeCounterStream基于父類實現(xiàn)了累計計數(shù),BucketedRollingCounterStream基于父類實現(xiàn)了滑動窗口計數(shù)。兩者的子類就是對特定指標(biāo)的具體實現(xiàn)。

接下來分兩塊累計計數(shù)和滑動窗口計數(shù),挑選其對應(yīng)的CumulativeCommandEventCounterStream和HealthCountsStream進行詳細(xì)說明。

3.1、BucketedCounterStream 基本桶的實現(xiàn)

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {this.numBuckets = numBuckets;this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {@Overridepublic Observable<Bucket> call(Observable<Event> eventBucket) {return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);}};final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();for (int i = 0; i < numBuckets; i++) {emptyEventCountsToStart.add(getEmptyBucketSummary());}this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {@Overridepublic Observable<Bucket> call() {return inputEventStream.observe().window(bucketSizeInMs, TimeUnit.MILLISECONDS).flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); }}); }

這里父類的構(gòu)造方法主要成三個部分分別是
I. reduceBucketToSummary 每個桶如何計算聚合的數(shù)據(jù)

appendRawEventToBucket的實現(xiàn)由其子類決定,不過大同小異,我們自行拔下代碼看下HealthCountsStream, 可以看到他用的是HystrixCommandMetrics.appendEventToBucket

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {@Overridepublic long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {ExecutionResult.EventCounts eventCounts = execution.getEventCounts();for (HystrixEventType eventType: ALL_EVENT_TYPES) {switch (eventType) {case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work heredefault:initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);break;}}return initialCountArray;}}; }

這個方法就是將一個桶時長內(nèi)的數(shù)據(jù)進行累計計數(shù)相加。initialCountArray可以看出一個桶內(nèi)前面的n個數(shù)據(jù)流的計算結(jié)果,數(shù)組的下標(biāo)就是HystrixEventType 枚舉里事件的下標(biāo)值。

II. emptyEventCountsToStart 第一個桶的定義,裝逼點叫創(chuàng)世桶

III. window窗口的定義,這里第一個參數(shù)就是每個桶的時長,第二個參數(shù)時間的單位。利用RxJava的window幫我們做聚合數(shù)據(jù)。

.window(bucketSizeInMs, TimeUnit.MILLISECONDS)

Bucket 時長如何計算
每個桶的時長如何得出的?這個也是基于我們的配置得出,拿HealthCountsStream舉例子。
metrics.rollingStats.timeInMilliseconds 滑動窗口時長 默認(rèn)10000ms
metrics.healthSnapshot.intervalInMilliseconds 檢測健康狀態(tài)的時間片,默認(rèn)500ms 在這里對應(yīng)一個bucket的時長

滑動窗口內(nèi)桶的個數(shù) = 滑動窗口時長 / bucket時長

而 CumulativeCommandEventCounterStream
metrics.rollingStats.timeInMilliseconds 滑動窗口時長 默認(rèn)10000ms
metrics.rollingStats.numBuckets 滑動窗口要切的桶個數(shù)

bucket時長 = 滑動窗口時長 / 桶個數(shù)

不同職能的 XXXStream對應(yīng)的算法和對應(yīng)的配置也不一樣,不過都一個套路,就不一一去展示了。

inputEventStream
inputEventStream 可以認(rèn)為是窗口采集的數(shù)據(jù)流,這個數(shù)據(jù)流由其子類去傳遞,大致看了下

//HealthCountsStream private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator); }//RollingThreadPoolEventCounterStream private RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs,Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,Func2<long[], long[], long[]> reduceBucket) {super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket); }

我們發(fā)現(xiàn)這個 inputEventStream,其實就是 HystrixCommandCompletionStream、HystrixThreadPoolCompletionStream或者其它的,我們挑其中HystrixCommandCompletionStream看下,這個就是上面第二部分指標(biāo)數(shù)據(jù)上傳里講的寫數(shù)據(jù)那個stream,inputEventStream.observe()也就是 HystrixCommandCompletionStream的 readOnlyStream,Subject的只讀Observable。(這里如果沒明白可以回到第二點看下結(jié)尾的部分)

3.2、累計計數(shù)器之CumulativeCommandEventCounterStream

先看下累計計數(shù)器的父類BucketedCumulativeCounterStream

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,Func2<Bucket, Event, Bucket> reduceCommandCompletion,Func2<Output, Bucket, Output> reduceBucket) {super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);this.sourceStream = bucketedStream.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets)........省略代碼........}

bucketedStream就是3.1里的數(shù)據(jù)匯聚后的一個一個桶流,這里執(zhí)行了scan方法,scan方法的意思就是會將當(dāng)前窗口內(nèi)已經(jīng)提交的數(shù)據(jù)流進行按照順序進行遍歷并執(zhí)行指定的function邏輯,scan里有兩個參數(shù)第一個參數(shù)表示上一次執(zhí)行function的結(jié)果,第二個參數(shù)就是每次遍歷要執(zhí)行的function,scan完畢后skip numBuckets 個bucket,可以認(rèn)為丟棄掉已經(jīng)計算過的bucket。

scan里的function是如何實現(xiàn)呢?它也是實現(xiàn)累計計數(shù)的關(guān)鍵,由子類實現(xiàn),本小節(jié)也就是CumulativeCommandEventCounterStream來實現(xiàn)

CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);

發(fā)現(xiàn)調(diào)用的是 HystrixCommandMetrics.bucketAggregator,我們看下其函數(shù)體

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {@Overridepublic long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {for (HystrixEventType eventType: ALL_EVENT_TYPES) {switch (eventType) {case EXCEPTION_THROWN:for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];}break;default:cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];break;}}return cumulativeEvents;} };

call() 方法有兩個參數(shù)第一個參數(shù)指的之前的計算結(jié)果,第二個參數(shù)指的當(dāng)前桶內(nèi)的計數(shù),方法體不難理解,就是對各個時間的count計數(shù)累加。

如此,一個command的計數(shù)就實現(xiàn)了,其它累計計數(shù)也雷同。

3.3、滑動窗口之HealthCountsStream

直接父類代碼

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,final Func2<Bucket, Event, Bucket> appendRawEventToBucket,final Func2<Output, Bucket, Output> reduceBucket) {super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {@Overridepublic Observable<Output> call(Observable<Bucket> window) {return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);}};this.sourceStream = bucketedStream .window(numBuckets, 1) .flatMap(reduceWindowToSummary) ........省略代碼........ }

依然像累計計數(shù)器一樣對父級的桶流數(shù)據(jù)進行操作,這里用的是window(),第一個參數(shù)表示桶的個數(shù),第二個參數(shù)表示一次移動的個數(shù)。這里numBuckets就是我們的滑動窗口桶個數(shù)

第一排我們可以認(rèn)為是移動前的滑動窗口的數(shù)據(jù),在執(zhí)行完 flatMap里的function之后,滑動窗口向前移動一個桶位,那么 23 5 2 0 這個桶就被丟棄了,然后新進了最新的桶 45 6 2 0。
那么每次滑動窗口內(nèi)的數(shù)據(jù)是如何被處理呢?就是flatMap里的function做的,reduceWindowToSummary 最終被具體的子類stream實現(xiàn),我們就研究下HealthCountsStream

private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {@Overridepublic HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {return healthCounts.plus(bucketEventCounts);} };//HystrixCommandMetrics.HealthCounts#plus public HealthCounts plus(long[] eventTypeCounts) {long updatedTotalCount = totalCount;long updatedErrorCount = errorCount;long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);return new HealthCounts(updatedTotalCount, updatedErrorCount); }

方法的實現(xiàn)也顯而易見,統(tǒng)計了當(dāng)前滑動窗口內(nèi)成功數(shù)、失敗數(shù)、線程拒絕數(shù),超時數(shù).....

該stream的職責(zé)就是探測服務(wù)的可用性,也是Hystrix熔斷器是否生效依賴的數(shù)據(jù)源。

四、回顧

Hystrix的滑動窗口設(shè)計相對于其它可能稍微偏難理解些,其主要原因還是因為我們對RxJava的了解不夠,不過這不重要,只要耐心的多看幾遍就沒有什么問題。

本篇主要從指標(biāo)數(shù)據(jù)上報到指標(biāo)數(shù)據(jù)收集來逐步解開Hystrix指標(biāo)搜集的神秘面紗。最后借用一大牛的圖匯總下本篇的內(nèi)容

參考文檔
官方文檔-How it works
官方文檔-configuration
Hystrix 1.5 滑動窗口實現(xiàn)原理總結(jié)


系列文章推薦
Hystrix常用功能介紹
Hystrix執(zhí)行原理
Hystrix熔斷器執(zhí)行機制
Hystrix超時實現(xiàn)機制

總結(jié)

以上是生活随笔為你收集整理的Hystrix指标窗口实现原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。